Overview
Messages in Broccoli are wrapped in a BrokerMessage<T> struct that includes metadata alongside your custom payload.
BrokerMessage structure
pub struct BrokerMessage<T: Clone> {
/// Unique identifier for the message
pub task_id: uuid::Uuid,
/// Your custom message content
pub payload: T,
/// Number of processing attempts made
pub attempts: u8,
/// Optional disambiguator for fairness queues
pub disambiguator: Option<String>,
}
Creating message payloads
Your payload type must implement Clone, Serialize, and Deserialize:
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct JobPayload {
id: String,
task_name: String,
parameters: serde_json::Value,
created_at: chrono::DateTime<chrono::Utc>,
}
Publishing messages
When you publish a message, Broccoli automatically:
- Generates a unique
task_id
- Wraps your payload in a
BrokerMessage
- Serializes and sends to the broker
let job = JobPayload {
id: "job-123".to_string(),
task_name: "process_data".to_string(),
parameters: serde_json::json!({"input": "data.csv"}),
created_at: chrono::Utc::now(),
};
// publish() returns the wrapped message with its task_id
let message = queue.publish("jobs", None, &job, None).await?;
println!("Published with ID: {}", message.task_id);
Consuming messages
When consuming, you receive the full BrokerMessage:
let message: BrokerMessage<JobPayload> = queue.consume("jobs", None).await?;
// Access metadata
println!("Task ID: {}", message.task_id);
println!("Attempts: {}", message.attempts);
// Access your payload
println!("Job ID: {}", message.payload.id);
println!("Task: {}", message.payload.task_name);
Message acknowledgment
After processing a message, you must acknowledge or reject it:
Acknowledge (success)
Removes the message from the processing queue:
queue.acknowledge("jobs", message).await?;
Reject (failure)
Moves the message to the retry queue or failed queue:
queue.reject("jobs", message).await?;
The message will be:
- Re-queued if
attempts < max_retries
- Moved to failed queue if
attempts >= max_retries
Cancel
Remove a message by ID without processing:
queue.cancel("jobs", message_id.to_string()).await?;
Message ordering
Broccoli does not guarantee strict message ordering. Messages may be processed out of order, especially with multiple workers.
If you need ordered processing:
- Use a single worker (
concurrency: Some(1))
- Or implement ordering logic in your application
Complex payload example
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct EmailJob {
recipient: String,
subject: String,
body: String,
attachments: Vec<Attachment>,
priority: Priority,
scheduled_for: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Attachment {
filename: String,
content_type: String,
data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum Priority {
Low,
Normal,
High,
Urgent,
}