Skip to main content

Overview

The management feature provides APIs to inspect queue status and monitor your message processing system.

Enabling management

Add the management feature to your Cargo.toml:
[dependencies]
broccoli_queue = { version = "0.4", features = ["management"] }
Management is currently supported for Redis and RabbitMQ brokers only.

Queue status

Get the status of a queue:
#[cfg(feature = "management")]
async fn check_queue_health(queue: &BroccoliQueue) -> Result<(), BroccoliError> {
    let status = queue.queue_status("jobs".into(), None).await?;
    
    println!("Queue: jobs");
    println!("  Pending: {}", status.pending_count);
    println!("  Processing: {}", status.processing_count);
    println!("  Failed: {}", status.failed_count);
    
    Ok(())
}

With disambiguator

For fairness queues, specify the disambiguator:
let status = queue.queue_status(
    "jobs".into(),
    Some("tenant-123".into())
).await?;

Queue size

Get queue sizes (available without management feature):
let sizes = queue.size("jobs").await?;

for (queue_name, size) in sizes {
    println!("{}: {} messages", queue_name, size);
}
For fairness queues, this returns sizes for each sub-queue.

Building a monitoring dashboard

Example of exposing queue metrics:
use std::collections::HashMap;

#[derive(Serialize)]
struct QueueMetrics {
    name: String,
    pending: u64,
    processing: u64,
    failed: u64,
}

async fn get_metrics(queue: &BroccoliQueue, queue_names: &[&str]) -> Vec<QueueMetrics> {
    let mut metrics = Vec::new();
    
    for name in queue_names {
        #[cfg(feature = "management")]
        if let Ok(status) = queue.queue_status(name.to_string(), None).await {
            metrics.push(QueueMetrics {
                name: name.to_string(),
                pending: status.pending_count,
                processing: status.processing_count,
                failed: status.failed_count,
            });
        }
    }
    
    metrics
}

Health checks

Implement health checks for your queue system:
async fn health_check(queue: &BroccoliQueue) -> bool {
    // Try to get queue size as a connectivity check
    match queue.size("health-check").await {
        Ok(_) => true,
        Err(e) => {
            log::error!("Queue health check failed: {:?}", e);
            false
        }
    }
}

Alerting on failed messages

Monitor the failed queue for alerting:
#[cfg(feature = "management")]
async fn check_failed_messages(queue: &BroccoliQueue) -> Result<(), BroccoliError> {
    let status = queue.queue_status("jobs".into(), None).await?;
    
    if status.failed_count > 100 {
        // Send alert
        alert::send("High failed message count", &format!(
            "Queue 'jobs' has {} failed messages",
            status.failed_count
        )).await;
    }
    
    Ok(())
}

Metrics integration

Prometheus

use prometheus::{IntGauge, register_int_gauge};

lazy_static! {
    static ref QUEUE_PENDING: IntGauge = register_int_gauge!(
        "broccoli_queue_pending",
        "Number of pending messages"
    ).unwrap();
    
    static ref QUEUE_PROCESSING: IntGauge = register_int_gauge!(
        "broccoli_queue_processing",
        "Number of messages being processed"
    ).unwrap();
    
    static ref QUEUE_FAILED: IntGauge = register_int_gauge!(
        "broccoli_queue_failed",
        "Number of failed messages"
    ).unwrap();
}

#[cfg(feature = "management")]
async fn update_metrics(queue: &BroccoliQueue) {
    if let Ok(status) = queue.queue_status("jobs".into(), None).await {
        QUEUE_PENDING.set(status.pending_count as i64);
        QUEUE_PROCESSING.set(status.processing_count as i64);
        QUEUE_FAILED.set(status.failed_count as i64);
    }
}

OpenTelemetry

use opentelemetry::metrics::Meter;

async fn record_metrics(meter: &Meter, queue: &BroccoliQueue) {
    let pending_gauge = meter.i64_gauge("broccoli.queue.pending").init();
    let failed_gauge = meter.i64_gauge("broccoli.queue.failed").init();
    
    #[cfg(feature = "management")]
    if let Ok(status) = queue.queue_status("jobs".into(), None).await {
        pending_gauge.record(status.pending_count as i64, &[]);
        failed_gauge.record(status.failed_count as i64, &[]);
    }
}

Redis CLI inspection

For Redis, you can also inspect queues directly:
# Main queue size
redis-cli LLEN jobs

# Processing queue
redis-cli ZCARD jobs:processing

# Failed queue
redis-cli LLEN jobs:failed

# Scheduled messages
redis-cli ZCARD jobs:scheduled

# List failed messages
redis-cli LRANGE jobs:failed 0 10

Best practices

Don’t query status on every request. Use a background task:
tokio::spawn(async move {
    let mut interval = tokio::time::interval(Duration::from_secs(30));
    loop {
        interval.tick().await;
        update_metrics(&queue).await;
    }
});
Monitor the failed queue and alert when it exceeds thresholds.
Measure how long messages spend in processing state to detect stuck jobs.