Skip to main content

Overview

The BroccoliQueue is the main entry point for interacting with Broccoli. It provides methods for publishing messages, consuming messages, and processing messages with handlers.

Creating a queue

Use the builder pattern to create a queue instance:
use broccoli_queue::queue::BroccoliQueue;

let queue = BroccoliQueue::builder("redis://localhost:6379")
    .pool_connections(5)
    .failed_message_retry_strategy(Default::default())
    .build()
    .await?;

Builder options

OptionDescriptionDefault
pool_connections(n)Number of connections in the pool10
failed_message_retry_strategy(strategy)Configure retry behavior3 retries, enabled
enable_scheduling(bool)Enable message schedulingfalse

Publishing messages

Single message

use broccoli_queue::queue::BroccoliQueue;

let job = JobPayload { id: "1".into(), task: "process".into() };

// Basic publish
queue.publish("jobs", None, &job, None).await?;

// With disambiguator for fairness
queue.publish("jobs", Some("tenant-123".into()), &job, None).await?;

Batch publishing

let jobs = vec![
    JobPayload { id: "1".into(), task: "task-a".into() },
    JobPayload { id: "2".into(), task: "task-b".into() },
];

queue.publish_batch("jobs", None, jobs, None).await?;

Consuming messages

Blocking consume

Waits until a message is available:
let message: BrokerMessage<JobPayload> = queue.consume("jobs", None).await?;
println!("Received: {:?}", message.payload);

// Process the message...

// Acknowledge success
queue.acknowledge("jobs", message).await?;

Non-blocking consume

Returns immediately with None if no message is available:
if let Some(message) = queue.try_consume::<JobPayload>("jobs", None).await? {
    println!("Got message: {:?}", message.payload);
    queue.acknowledge("jobs", message).await?;
} else {
    println!("No messages available");
}

Batch consume

Consume multiple messages with a timeout:
use time::Duration;

let messages = queue.consume_batch::<JobPayload>(
    "jobs",
    10,                          // batch size
    Duration::seconds(5),        // timeout
    None,
).await?;

for message in messages {
    // Process and acknowledge each message
    queue.acknowledge("jobs", message).await?;
}

Processing messages

The process_messages method provides a convenient way to consume and process messages in a loop:
queue.process_messages(
    "jobs",
    Some(4),     // 4 concurrent workers
    None,        // default consume options
    |message: BrokerMessage<JobPayload>| async move {
        println!("Processing: {:?}", message.payload);
        // Return Ok(()) on success, Err on failure
        Ok(())
    },
).await?;

With custom handlers

For more control over success and error handling:
queue.process_messages_with_handlers(
    "jobs",
    Some(4),
    None,
    // Message handler
    |msg| async move {
        process_job(msg.payload).await
    },
    // Success handler
    |msg, result| async move {
        println!("Job {} completed", msg.task_id);
        Ok(())
    },
    // Error handler
    |msg, error| async move {
        eprintln!("Job {} failed: {:?}", msg.task_id, error);
        Ok(())
    },
).await?;

Message lifecycle

Queue size

Get the current size of a queue:
let sizes = queue.size("jobs").await?;
for (queue_name, size) in sizes {
    println!("{}: {} messages", queue_name, size);
}
For fairness queues, this returns sizes for each disambiguator sub-queue.