Overview
The management feature provides APIs to inspect queue status and monitor your message processing system.
Enabling management
Add the management feature to your Cargo.toml:
[ dependencies ]
broccoli_queue = { version = "0.4" , features = [ "management" ] }
Management is currently supported for Redis and RabbitMQ brokers only.
Queue status
Get the status of a queue:
#[cfg(feature = "management" )]
async fn check_queue_health ( queue : & BroccoliQueue ) -> Result <(), BroccoliError > {
let status = queue . queue_status ( "jobs" . into (), None ) . await ? ;
println! ( "Queue: jobs" );
println! ( " Pending: {}" , status . pending_count);
println! ( " Processing: {}" , status . processing_count);
println! ( " Failed: {}" , status . failed_count);
Ok (())
}
With disambiguator
For fairness queues, specify the disambiguator:
let status = queue . queue_status (
"jobs" . into (),
Some ( "tenant-123" . into ())
) . await ? ;
Queue size
Get queue sizes (available without management feature):
let sizes = queue . size ( "jobs" ) . await ? ;
for ( queue_name , size ) in sizes {
println! ( "{}: {} messages" , queue_name , size );
}
For fairness queues, this returns sizes for each sub-queue.
Building a monitoring dashboard
Example of exposing queue metrics:
use std :: collections :: HashMap ;
#[derive( Serialize )]
struct QueueMetrics {
name : String ,
pending : u64 ,
processing : u64 ,
failed : u64 ,
}
async fn get_metrics ( queue : & BroccoliQueue , queue_names : & [ & str ]) -> Vec < QueueMetrics > {
let mut metrics = Vec :: new ();
for name in queue_names {
#[cfg(feature = "management" )]
if let Ok ( status ) = queue . queue_status ( name . to_string (), None ) . await {
metrics . push ( QueueMetrics {
name : name . to_string (),
pending : status . pending_count,
processing : status . processing_count,
failed : status . failed_count,
});
}
}
metrics
}
Health checks
Implement health checks for your queue system:
async fn health_check ( queue : & BroccoliQueue ) -> bool {
// Try to get queue size as a connectivity check
match queue . size ( "health-check" ) . await {
Ok ( _ ) => true ,
Err ( e ) => {
log :: error! ( "Queue health check failed: {:?}" , e );
false
}
}
}
Alerting on failed messages
Monitor the failed queue for alerting:
#[cfg(feature = "management" )]
async fn check_failed_messages ( queue : & BroccoliQueue ) -> Result <(), BroccoliError > {
let status = queue . queue_status ( "jobs" . into (), None ) . await ? ;
if status . failed_count > 100 {
// Send alert
alert :: send ( "High failed message count" , & format! (
"Queue 'jobs' has {} failed messages" ,
status . failed_count
)) . await ;
}
Ok (())
}
Metrics integration
Prometheus
use prometheus :: { IntGauge , register_int_gauge};
lazy_static! {
static ref QUEUE_PENDING : IntGauge = register_int_gauge! (
"broccoli_queue_pending" ,
"Number of pending messages"
) . unwrap ();
static ref QUEUE_PROCESSING : IntGauge = register_int_gauge! (
"broccoli_queue_processing" ,
"Number of messages being processed"
) . unwrap ();
static ref QUEUE_FAILED : IntGauge = register_int_gauge! (
"broccoli_queue_failed" ,
"Number of failed messages"
) . unwrap ();
}
#[cfg(feature = "management" )]
async fn update_metrics ( queue : & BroccoliQueue ) {
if let Ok ( status ) = queue . queue_status ( "jobs" . into (), None ) . await {
QUEUE_PENDING . set ( status . pending_count as i64 );
QUEUE_PROCESSING . set ( status . processing_count as i64 );
QUEUE_FAILED . set ( status . failed_count as i64 );
}
}
OpenTelemetry
use opentelemetry :: metrics :: Meter ;
async fn record_metrics ( meter : & Meter , queue : & BroccoliQueue ) {
let pending_gauge = meter . i64_gauge ( "broccoli.queue.pending" ) . init ();
let failed_gauge = meter . i64_gauge ( "broccoli.queue.failed" ) . init ();
#[cfg(feature = "management" )]
if let Ok ( status ) = queue . queue_status ( "jobs" . into (), None ) . await {
pending_gauge . record ( status . pending_count as i64 , & []);
failed_gauge . record ( status . failed_count as i64 , & []);
}
}
Redis CLI inspection
For Redis, you can also inspect queues directly:
# Main queue size
redis-cli LLEN jobs
# Processing queue
redis-cli ZCARD jobs:processing
# Failed queue
redis-cli LLEN jobs:failed
# Scheduled messages
redis-cli ZCARD jobs:scheduled
# List failed messages
redis-cli LRANGE jobs:failed 0 10
Best practices
Poll metrics periodically
Don’t query status on every request. Use a background task: tokio :: spawn ( async move {
let mut interval = tokio :: time :: interval ( Duration :: from_secs ( 30 ));
loop {
interval . tick () . await ;
update_metrics ( & queue ) . await ;
}
});
Set up alerts for failed messages
Monitor the failed queue and alert when it exceeds thresholds.
Measure how long messages spend in processing state to detect stuck jobs.