Overview
The BroccoliQueue is the main entry point for interacting with Broccoli. It provides methods for publishing messages, consuming messages, and processing messages with handlers.
Creating a queue
Use the builder pattern to create a queue instance:
use broccoli_queue::queue::BroccoliQueue;
let queue = BroccoliQueue::builder("redis://localhost:6379")
.pool_connections(5)
.failed_message_retry_strategy(Default::default())
.build()
.await?;
Builder options
| Option | Description | Default |
|---|
pool_connections(n) | Number of connections in the pool | 10 |
failed_message_retry_strategy(strategy) | Configure retry behavior | 3 retries, enabled |
enable_scheduling(bool) | Enable message scheduling | false |
Publishing messages
Single message
use broccoli_queue::queue::BroccoliQueue;
let job = JobPayload { id: "1".into(), task: "process".into() };
// Basic publish
queue.publish("jobs", None, &job, None).await?;
// With disambiguator for fairness
queue.publish("jobs", Some("tenant-123".into()), &job, None).await?;
Batch publishing
let jobs = vec![
JobPayload { id: "1".into(), task: "task-a".into() },
JobPayload { id: "2".into(), task: "task-b".into() },
];
queue.publish_batch("jobs", None, jobs, None).await?;
Consuming messages
Blocking consume
Waits until a message is available:
let message: BrokerMessage<JobPayload> = queue.consume("jobs", None).await?;
println!("Received: {:?}", message.payload);
// Process the message...
// Acknowledge success
queue.acknowledge("jobs", message).await?;
Non-blocking consume
Returns immediately with None if no message is available:
if let Some(message) = queue.try_consume::<JobPayload>("jobs", None).await? {
println!("Got message: {:?}", message.payload);
queue.acknowledge("jobs", message).await?;
} else {
println!("No messages available");
}
Batch consume
Consume multiple messages with a timeout:
use time::Duration;
let messages = queue.consume_batch::<JobPayload>(
"jobs",
10, // batch size
Duration::seconds(5), // timeout
None,
).await?;
for message in messages {
// Process and acknowledge each message
queue.acknowledge("jobs", message).await?;
}
Processing messages
The process_messages method provides a convenient way to consume and process messages in a loop:
queue.process_messages(
"jobs",
Some(4), // 4 concurrent workers
None, // default consume options
|message: BrokerMessage<JobPayload>| async move {
println!("Processing: {:?}", message.payload);
// Return Ok(()) on success, Err on failure
Ok(())
},
).await?;
With custom handlers
For more control over success and error handling:
queue.process_messages_with_handlers(
"jobs",
Some(4),
None,
// Message handler
|msg| async move {
process_job(msg.payload).await
},
// Success handler
|msg, result| async move {
println!("Job {} completed", msg.task_id);
Ok(())
},
// Error handler
|msg, error| async move {
eprintln!("Job {} failed: {:?}", msg.task_id, error);
Ok(())
},
).await?;
Message lifecycle
Queue size
Get the current size of a queue:
let sizes = queue.size("jobs").await?;
for (queue_name, size) in sizes {
println!("{}: {} messages", queue_name, size);
}
For fairness queues, this returns sizes for each disambiguator sub-queue.