Skip to main content

Overview

RabbitMQ is a robust message broker with advanced routing capabilities, making it ideal for complex messaging scenarios.

Installation

Enable RabbitMQ support in your Cargo.toml:
[dependencies]
broccoli_queue = { version = "0.4", default-features = false, features = ["rabbitmq"] }
Or alongside Redis:
[dependencies]
broccoli_queue = { version = "0.4", features = ["rabbitmq"] }

Connection

use broccoli_queue::queue::BroccoliQueue;

let queue = BroccoliQueue::builder("amqp://localhost:5672")
    .pool_connections(10)
    .build()
    .await?;

Connection URL formats

amqp://localhost:5672                  # Basic
amqp://user:password@localhost:5672    # With credentials
amqp://user:password@localhost:5672/vhost  # With virtual host
amqps://localhost:5671                 # TLS connection

Starting RabbitMQ

Docker

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:management
Access the management UI at http://localhost:15672 (guest/guest).

With custom credentials

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=myuser \
  -e RABBITMQ_DEFAULT_PASS=mypassword \
  rabbitmq:management

Features

Connection pooling

RabbitMQ uses deadpool-lapin for connection pooling:
let queue = BroccoliQueue::builder("amqp://localhost:5672")
    .pool_connections(10)
    .build()
    .await?;

Message scheduling

Message scheduling with RabbitMQ requires the delayed-exchange plugin.

Install the plugin

# In Docker
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# Or on host
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
See the RabbitMQ scheduling guide for details.

Use scheduling

use broccoli_queue::queue::PublishOptions;
use time::Duration;

let queue = BroccoliQueue::builder("amqp://localhost:5672")
    .enable_scheduling(true)  // Must be enabled
    .build()
    .await?;

let options = PublishOptions::builder()
    .delay(Duration::minutes(5))
    .build();

queue.publish("jobs", None, &job, Some(options)).await?;

RabbitMQ concepts

Broccoli abstracts RabbitMQ concepts, but understanding them helps with debugging:
BroccoliRabbitMQ
Queue nameQueue name
publish()Publish to default exchange
consume()Basic consume with prefetch
acknowledge()Basic ack
reject()Basic nack (with requeue based on retry config)

Management API

With the management feature:
#[cfg(feature = "management")]
{
    let status = queue.queue_status("jobs".into(), None).await?;
    println!("Queue status: {:?}", status);
}

Configuration example

use broccoli_queue::queue::{BroccoliQueue, RetryStrategy};

let queue = BroccoliQueue::builder("amqp://user:pass@localhost:5672/myapp")
    .pool_connections(15)
    .failed_message_retry_strategy(
        RetryStrategy::new()
            .with_attempts(5)
            .retry_failed(true)
    )
    .enable_scheduling(true)
    .build()
    .await?;

Best practices

Separate environments using virtual hosts:
// Development
BroccoliQueue::builder("amqp://localhost:5672/dev")

// Production
BroccoliQueue::builder("amqp://localhost:5672/prod")
RabbitMQ provides publisher confirms for guaranteed delivery. Broccoli uses these internally for reliability.
Use the RabbitMQ management UI (localhost:15672) to:
  • View queue depths
  • Monitor connection counts
  • Check message rates
  • Debug delivery issues
Set RabbitMQ memory limits to prevent OOM:
docker run -d --name rabbitmq \
  -e RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.6 \
  rabbitmq:management

Troubleshooting

Connection refused

Connection refused (os error 111)
Verify RabbitMQ is running:
docker ps | grep rabbitmq
# or
rabbitmqctl status

ACCESS_REFUSED

ACCESS_REFUSED - Login was refused
Check credentials in your connection URL:
BroccoliQueue::builder("amqp://user:password@localhost:5672")

NOT_FOUND for scheduled messages

NOT_FOUND - no exchange 'delayed'
Enable the delayed message exchange plugin:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

High memory usage

RabbitMQ can accumulate messages in memory. Monitor and set limits:
# Check status
rabbitmqctl status

# Set watermark (fraction of available RAM)
rabbitmqctl set_vm_memory_high_watermark 0.5