Skip to main content

Overview

Messages in Broccoli are wrapped in a BrokerMessage<T> struct that includes metadata alongside your custom payload.

BrokerMessage structure

pub struct BrokerMessage<T: Clone> {
    /// Unique identifier for the message
    pub task_id: uuid::Uuid,
    
    /// Your custom message content
    pub payload: T,
    
    /// Number of processing attempts made
    pub attempts: u8,
    
    /// Optional disambiguator for fairness queues
    pub disambiguator: Option<String>,
}

Creating message payloads

Your payload type must implement Clone, Serialize, and Deserialize:
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct JobPayload {
    id: String,
    task_name: String,
    parameters: serde_json::Value,
    created_at: chrono::DateTime<chrono::Utc>,
}

Publishing messages

When you publish a message, Broccoli automatically:
  1. Generates a unique task_id
  2. Wraps your payload in a BrokerMessage
  3. Serializes and sends to the broker
let job = JobPayload {
    id: "job-123".to_string(),
    task_name: "process_data".to_string(),
    parameters: serde_json::json!({"input": "data.csv"}),
    created_at: chrono::Utc::now(),
};

// publish() returns the wrapped message with its task_id
let message = queue.publish("jobs", None, &job, None).await?;
println!("Published with ID: {}", message.task_id);

Consuming messages

When consuming, you receive the full BrokerMessage:
let message: BrokerMessage<JobPayload> = queue.consume("jobs", None).await?;

// Access metadata
println!("Task ID: {}", message.task_id);
println!("Attempts: {}", message.attempts);

// Access your payload
println!("Job ID: {}", message.payload.id);
println!("Task: {}", message.payload.task_name);

Message acknowledgment

After processing a message, you must acknowledge or reject it:

Acknowledge (success)

Removes the message from the processing queue:
queue.acknowledge("jobs", message).await?;

Reject (failure)

Moves the message to the retry queue or failed queue:
queue.reject("jobs", message).await?;
The message will be:
  • Re-queued if attempts < max_retries
  • Moved to failed queue if attempts >= max_retries

Cancel

Remove a message by ID without processing:
queue.cancel("jobs", message_id.to_string()).await?;

Message ordering

Broccoli does not guarantee strict message ordering. Messages may be processed out of order, especially with multiple workers.
If you need ordered processing:
  • Use a single worker (concurrency: Some(1))
  • Or implement ordering logic in your application

Complex payload example

use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct EmailJob {
    recipient: String,
    subject: String,
    body: String,
    attachments: Vec<Attachment>,
    priority: Priority,
    scheduled_for: Option<DateTime<Utc>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Attachment {
    filename: String,
    content_type: String,
    data: Vec<u8>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum Priority {
    Low,
    Normal,
    High,
    Urgent,
}