Skip to main content

Prerequisites

Before you begin, make sure you have:
  • Rust installed (1.70+ recommended)
  • A running message broker (Redis, RabbitMQ, or SurrealDB)

Install Broccoli

Add Broccoli to your Cargo.toml:
[dependencies]
broccoli_queue = "0.4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
By default, Broccoli uses Redis. To use a different broker, specify features:
[dependencies]
broccoli_queue = "0.4"

Define your message type

Create a struct that represents your job payload. It must implement Serialize and Deserialize:
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct JobPayload {
    id: String,
    task_name: String,
}

Create a producer

Publish messages to a queue:
use broccoli_queue::queue::BroccoliQueue;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to Redis
    let queue = BroccoliQueue::builder("redis://localhost:6379")
        .pool_connections(5)
        .build()
        .await?;

    // Create a job
    let job = JobPayload {
        id: "job-1".to_string(),
        task_name: "process_data".to_string(),
    };

    // Publish to the "jobs" queue
    queue.publish("jobs", None, &job, None).await?;
    println!("Job published!");

    Ok(())
}

Create a consumer

Process messages from the queue:
use broccoli_queue::{queue::BroccoliQueue, brokers::broker::BrokerMessage};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to Redis
    let queue = BroccoliQueue::builder("redis://localhost:6379")
        .pool_connections(5)
        .failed_message_retry_strategy(Default::default())
        .build()
        .await?;

    // Process messages with 2 concurrent workers
    queue.process_messages(
        "jobs",              // Queue name
        Some(2),             // Number of workers
        None,                // Consume options
        |message: BrokerMessage<JobPayload>| async move {
            println!("Processing job: {:?}", message.payload);
            // Your processing logic here
            Ok(())
        },
    ).await?;

    Ok(())
}

Run your application

  1. Start your message broker (e.g., Redis):
    docker run -d -p 6379:6379 redis
    
  2. Run the consumer:
    cargo run --bin consumer
    
  3. Run the producer:
    cargo run --bin producer
    

Next steps