Skip to main content

BrokerMessage

A wrapper for messages that includes metadata for processing.
use broccoli_queue::brokers::broker::BrokerMessage;

Structure

pub struct BrokerMessage<T: Clone> {
    /// Unique identifier for the message
    pub task_id: uuid::Uuid,
    
    /// The actual message content
    pub payload: T,
    
    /// Number of processing attempts made
    pub attempts: u8,
    
    /// Disambiguator for message fairness
    pub disambiguator: Option<String>,
}

Fields

FieldTypeDescription
task_idUuidUnique message identifier, auto-generated on publish
payloadTYour custom message data
attemptsu8Number of times this message has been processed
disambiguatorOption<String>Optional identifier for fairness queue routing

Creating messages

Messages are automatically created when publishing:
let job = JobPayload { id: "1".into(), task: "process".into() };

// publish() creates and returns the BrokerMessage
let message = queue.publish("jobs", None, &job, None).await?;
println!("Task ID: {}", message.task_id);

Accessing message data

queue.process_messages("jobs", Some(1), None, |msg: BrokerMessage<JobPayload>| async move {
    // Access metadata
    println!("ID: {}", msg.task_id);
    println!("Attempts: {}", msg.attempts);
    println!("Disambiguator: {:?}", msg.disambiguator);
    
    // Access payload
    let job = &msg.payload;
    println!("Job: {} - {}", job.id, job.task);
    
    Ok(())
}).await?;

Payload requirements

Your message payload type must implement:
  • Clone
  • serde::Serialize
  • serde::Deserialize

Example payload

use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobPayload {
    pub id: String,
    pub task_name: String,
    pub parameters: serde_json::Value,
    pub created_at: chrono::DateTime<chrono::Utc>,
}

Complex payload

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
    pub recipient: String,
    pub subject: String,
    pub body: String,
    pub attachments: Vec<Attachment>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Attachment {
    pub filename: String,
    pub content_type: String,
    #[serde(with = "base64_serde")]
    pub data: Vec<u8>,
}

InternalBrokerMessage

Internal message representation used by broker implementations. You typically don’t interact with this directly.
pub struct InternalBrokerMessage {
    pub task_id: String,
    pub payload: String,  // JSON serialized
    pub attempts: u8,
    pub disambiguator: Option<String>,
}

BrokerConfig

Configuration options for broker behavior.
pub struct BrokerConfig {
    /// Maximum retry attempts (default: 3)
    pub retry_attempts: Option<u8>,
    
    /// Whether to retry failed messages (default: true)
    pub retry_failed: Option<bool>,
    
    /// Connection pool size (default: 10)
    pub pool_connections: Option<u8>,
    
    /// Enable message scheduling (default: false)
    pub enable_scheduling: Option<bool>,
}

Default values

impl Default for BrokerConfig {
    fn default() -> Self {
        Self {
            retry_attempts: Some(3),
            retry_failed: Some(true),
            pool_connections: Some(10),
            enable_scheduling: Some(false),
        }
    }
}

BrokerType

Enum representing supported broker types.
pub enum BrokerType {
    #[cfg(feature = "redis")]
    Redis,
    
    #[cfg(feature = "rabbitmq")]
    RabbitMQ,
    
    #[cfg(feature = "surrealdb")]
    SurrealDB,
}

Broker trait

The Broker trait defines the interface that all broker implementations must satisfy. This is internal to Broccoli but useful for understanding the abstraction.
#[async_trait]
pub trait Broker: Send + Sync {
    async fn connect(&mut self, broker_url: &str) -> Result<(), BroccoliError>;
    
    async fn publish(
        &self,
        queue_name: &str,
        disambiguator: Option<String>,
        message: &[InternalBrokerMessage],
        options: Option<PublishOptions>,
    ) -> Result<Vec<InternalBrokerMessage>, BroccoliError>;
    
    async fn consume(
        &self,
        queue_name: &str,
        options: Option<ConsumeOptions>,
    ) -> Result<InternalBrokerMessage, BroccoliError>;
    
    async fn try_consume(
        &self,
        queue_name: &str,
        options: Option<ConsumeOptions>,
    ) -> Result<Option<InternalBrokerMessage>, BroccoliError>;
    
    async fn acknowledge(
        &self,
        queue_name: &str,
        message: InternalBrokerMessage,
    ) -> Result<(), BroccoliError>;
    
    async fn reject(
        &self,
        queue_name: &str,
        message: InternalBrokerMessage,
    ) -> Result<(), BroccoliError>;
    
    async fn cancel(
        &self,
        queue_name: &str,
        message_id: String,
    ) -> Result<(), BroccoliError>;
    
    async fn size(
        &self,
        queue_name: &str,
    ) -> Result<HashMap<String, u64>, BroccoliError>;
}