Skip to main content

Overview

Redis is the default broker for Broccoli. It provides fast, in-memory message storage with persistence options.

Installation

Redis support is enabled by default:
[dependencies]
broccoli_queue = "0.4"
Or explicitly:
[dependencies]
broccoli_queue = { version = "0.4", features = ["redis"] }

Connection

use broccoli_queue::queue::BroccoliQueue;

let queue = BroccoliQueue::builder("redis://localhost:6379")
    .pool_connections(10)
    .build()
    .await?;

Connection URL formats

redis://localhost:6379              # Basic
redis://user:password@localhost:6379  # With auth
redis://localhost:6379/2            # Database number
rediss://localhost:6379             # TLS connection

Starting Redis

Docker

docker run -d --name redis -p 6379:6379 redis:latest

With persistence

docker run -d --name redis -p 6379:6379 \
  -v redis-data:/data \
  redis:latest redis-server --appendonly yes

With password

docker run -d --name redis -p 6379:6379 \
  redis:latest redis-server --requirepass yourpassword

Features

Connection pooling

Redis uses bb8-redis for connection pooling:
let queue = BroccoliQueue::builder("redis://localhost:6379")
    .pool_connections(20)  // 20 connections in pool
    .build()
    .await?;

Fairness queues

Redis supports fairness queues through disambiguators, ensuring fair processing across different message categories:
// Publish with disambiguator
queue.publish("jobs", Some("tenant-a".into()), &job_a, None).await?;
queue.publish("jobs", Some("tenant-b".into()), &job_b, None).await?;

// Consume with fairness enabled
let options = ConsumeOptions::builder()
    .fairness(true)
    .build();

queue.process_messages("jobs", Some(4), Some(options), |msg| async move {
    // Messages are distributed fairly across tenant-a and tenant-b
    Ok(())
}).await?;

Message scheduling

Redis supports delayed message delivery:
use broccoli_queue::queue::PublishOptions;
use time::Duration;

let queue = BroccoliQueue::builder("redis://localhost:6379")
    .enable_scheduling(true)
    .build()
    .await?;

// Delay message by 30 seconds
let options = PublishOptions::builder()
    .delay(Duration::seconds(30))
    .build();

queue.publish("jobs", None, &job, Some(options)).await?;

Redis data structures

Broccoli uses the following Redis keys:
Key patternTypePurpose
{queue}ListMain queue
{queue}:processingSorted SetMessages being processed
{queue}:failedListFailed messages
{queue}:scheduledSorted SetScheduled messages
{queue}:fairness:{disambiguator}ListFairness sub-queues

Management API

With the management feature enabled:
#[cfg(feature = "management")]
{
    let status = queue.queue_status("jobs".into(), None).await?;
    println!("Pending: {}", status.pending_count);
    println!("Processing: {}", status.processing_count);
    println!("Failed: {}", status.failed_count);
}

Best practices

Set pool connections based on your workload:
  • Low traffic: 5-10 connections
  • Medium traffic: 10-20 connections
  • High traffic: 20-50 connections
.pool_connections(20)
Use Redis persistence (RDB or AOF) in production to prevent message loss:
redis-server --appendonly yes
Monitor Redis memory, especially with large message payloads:
redis-cli INFO memory
For high-volume applications, consider Redis Cluster for horizontal scaling.

Troubleshooting

Connection refused

Connection refused (os error 111)
Ensure Redis is running and accessible:
redis-cli ping

Authentication failed

NOAUTH Authentication required
Include credentials in your connection URL:
BroccoliQueue::builder("redis://user:password@localhost:6379")

Pool exhausted

If you see timeout errors, increase pool size:
.pool_connections(30)