Skip to main content

Prerequisites

Before you begin, make sure you have:
  • Rust installed (1.70+ recommended)
  • A running message broker (Redis, RabbitMQ, or SurrealDB)

Install Broccoli

Add Broccoli to your Cargo.toml:
[dependencies]
broccoli_queue = "0.4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
By default, Broccoli uses Redis. To use a different broker, specify features:
[dependencies]
broccoli_queue = "0.4"

Define your message type

Create a struct that represents your job payload. It must implement Serialize and Deserialize:
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct JobPayload {
    id: String,
    task_name: String,
}

Create a producer

Publish messages to a queue:
use broccoli_queue::queue::BroccoliQueue;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to Redis
    let queue = BroccoliQueue::builder("redis://localhost:6379")
        .pool_connections(5)
        .build()
        .await?;

    // Create a job
    let job = JobPayload {
        id: "job-1".to_string(),
        task_name: "process_data".to_string(),
    };

    // Publish to the "jobs" queue
    queue.publish("jobs", None, &job, None).await?;
    println!("Job published!");

    Ok(())
}

Create a consumer

Process messages from the queue:
use broccoli_queue::{queue::BroccoliQueue, brokers::broker::BrokerMessage};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to Redis
    let queue = BroccoliQueue::builder("redis://localhost:6379")
        .pool_connections(5)
        .failed_message_retry_strategy(Default::default())
        .build()
        .await?;

    // Process messages with 2 concurrent workers
    queue.process_messages(
        "jobs",              // Queue name
        Some(2),             // Number of workers
        None,                // Consume options
        |message: BrokerMessage<JobPayload>| async move {
            println!("Processing job: {:?}", message.payload);
            // Your processing logic here
            Ok(())
        },
    ).await?;

    Ok(())
}

Run your application

  1. Start your message broker (e.g., Redis):
    docker run -d -p 6379:6379 redis
    
  2. Run the consumer:
    cargo run --bin consumer
    
  3. Run the producer:
    cargo run --bin producer
    

Next steps

Core Concepts

Learn about queues, messages, and the processing model

Retry Strategies

Configure how failed messages are retried

Brokers

Choose and configure your message broker

Scheduling

Schedule messages for delayed delivery