Overview
RabbitMQ is a robust message broker with advanced routing capabilities, making it ideal for complex messaging scenarios.
Installation
Enable RabbitMQ support in your Cargo.toml:
[ dependencies ]
broccoli_queue = { version = "0.4" , default-features = false , features = [ "rabbitmq" ] }
Or alongside Redis:
[ dependencies ]
broccoli_queue = { version = "0.4" , features = [ "rabbitmq" ] }
Connection
use broccoli_queue :: queue :: BroccoliQueue ;
let queue = BroccoliQueue :: builder ( "amqp://localhost:5672" )
. pool_connections ( 10 )
. build ()
. await ? ;
amqp://localhost:5672 # Basic
amqp://user:password@localhost:5672 # With credentials
amqp://user:password@localhost:5672/vhost # With virtual host
amqps://localhost:5671 # TLS connection
Starting RabbitMQ
Docker
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:management
Access the management UI at http://localhost:15672 (guest/guest).
With custom credentials
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=myuser \
-e RABBITMQ_DEFAULT_PASS=mypassword \
rabbitmq:management
Features
Connection pooling
RabbitMQ uses deadpool-lapin for connection pooling:
let queue = BroccoliQueue :: builder ( "amqp://localhost:5672" )
. pool_connections ( 10 )
. build ()
. await ? ;
Message scheduling
Message scheduling with RabbitMQ requires the delayed-exchange plugin .
Install the plugin
# In Docker
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# Or on host
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
See the RabbitMQ scheduling guide for details.
Use scheduling
use broccoli_queue :: queue :: PublishOptions ;
use time :: Duration ;
let queue = BroccoliQueue :: builder ( "amqp://localhost:5672" )
. enable_scheduling ( true ) // Must be enabled
. build ()
. await ? ;
let options = PublishOptions :: builder ()
. delay ( Duration :: minutes ( 5 ))
. build ();
queue . publish ( "jobs" , None , & job , Some ( options )) . await ? ;
RabbitMQ concepts
Broccoli abstracts RabbitMQ concepts, but understanding them helps with debugging:
Broccoli RabbitMQ Queue name Queue name publish()Publish to default exchange consume()Basic consume with prefetch acknowledge()Basic ack reject()Basic nack (with requeue based on retry config)
Management API
With the management feature:
#[cfg(feature = "management" )]
{
let status = queue . queue_status ( "jobs" . into (), None ) . await ? ;
println! ( "Queue status: {:?}" , status );
}
Configuration example
use broccoli_queue :: queue :: { BroccoliQueue , RetryStrategy };
let queue = BroccoliQueue :: builder ( "amqp://user:pass@localhost:5672/myapp" )
. pool_connections ( 15 )
. failed_message_retry_strategy (
RetryStrategy :: new ()
. with_attempts ( 5 )
. retry_failed ( true )
)
. enable_scheduling ( true )
. build ()
. await ? ;
Best practices
Separate environments using virtual hosts: // Development
BroccoliQueue :: builder ( "amqp://localhost:5672/dev" )
// Production
BroccoliQueue :: builder ( "amqp://localhost:5672/prod" )
Enable publisher confirms
RabbitMQ provides publisher confirms for guaranteed delivery. Broccoli uses these internally for reliability.
Monitor via management UI
Use the RabbitMQ management UI (localhost:15672) to:
View queue depths
Monitor connection counts
Check message rates
Debug delivery issues
Troubleshooting
Connection refused
Connection refused (os error 111)
Verify RabbitMQ is running:
docker ps | grep rabbitmq
# or
rabbitmqctl status
ACCESS_REFUSED
ACCESS_REFUSED - Login was refused
Check credentials in your connection URL:
BroccoliQueue :: builder ( "amqp://user:password@localhost:5672" )
NOT_FOUND for scheduled messages
NOT_FOUND - no exchange 'delayed'
Enable the delayed message exchange plugin:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
High memory usage
RabbitMQ can accumulate messages in memory. Monitor and set limits:
# Check status
rabbitmqctl status
# Set watermark (fraction of available RAM)
rabbitmqctl set_vm_memory_high_watermark 0.5