Skip to main content

Overview

Broccoli includes built-in retry handling for failed messages. When a message handler returns an error, the message can be automatically re-queued for another attempt.

Default behavior

By default, Broccoli:
  • Enables retries for failed messages
  • Allows up to 3 retry attempts
  • Moves messages to a failed queue after exhausting retries

Configuring retry strategy

Use RetryStrategy when building your queue:
use broccoli_queue::queue::{BroccoliQueue, RetryStrategy};

let queue = BroccoliQueue::builder("redis://localhost:6379")
    .failed_message_retry_strategy(
        RetryStrategy::new()
            .with_attempts(5)      // Allow up to 5 retries
            .retry_failed(true)    // Enable retries
    )
    .build()
    .await?;

RetryStrategy options

with_attempts(n)

Set the maximum number of retry attempts:
RetryStrategy::new().with_attempts(10)  // Up to 10 retries

retry_failed(bool)

Enable or disable retries entirely:
// Disable retries - failed messages go directly to failed queue
RetryStrategy::new().retry_failed(false)

How retries work

Tracking attempts

Each BrokerMessage includes an attempts field:
queue.process_messages("jobs", Some(1), None, |message: BrokerMessage<JobPayload>| async move {
    println!("Attempt #{} for job {}", message.attempts + 1, message.task_id);
    
    if message.attempts >= 2 {
        // Different handling for repeated failures
        eprintln!("Job has failed multiple times, attempting recovery...");
    }
    
    process_job(message.payload).await
}).await?;

Custom error handling

For fine-grained control, use process_messages_with_handlers:
queue.process_messages_with_handlers(
    "jobs",
    Some(4),
    None,
    // Main handler
    |msg| async move {
        risky_operation(msg.payload).await
    },
    // Success handler
    |msg, result| async move {
        log::info!("Job {} succeeded after {} attempts", msg.task_id, msg.attempts);
        Ok(())
    },
    // Error handler - called before retry/rejection
    |msg, error| async move {
        log::error!("Job {} failed (attempt {}): {:?}", msg.task_id, msg.attempts, error);
        
        // You could send alerts, update metrics, etc.
        if msg.attempts >= 2 {
            alert_on_repeated_failure(&msg, &error).await;
        }
        
        Ok(())
    },
).await?;

Best practices

Design your message handlers to be idempotent. Since messages may be retried, the same message could be processed multiple times.
async fn process_job(job: JobPayload) -> Result<(), BroccoliError> {
    // Check if already processed
    if is_already_processed(&job.id).await? {
        return Ok(());  // Skip duplicate processing
    }
    
    // Process the job
    do_work(&job).await?;
    
    // Mark as processed
    mark_processed(&job.id).await?;
    
    Ok(())
}
Consider whether failures are transient (network issues, temporary unavailability) or permanent (invalid data, business rule violations).
async fn process_job(job: JobPayload) -> Result<(), BroccoliError> {
    match validate_job(&job) {
        Err(ValidationError::InvalidData) => {
            // Permanent failure - don't retry
            log::error!("Invalid job data: {:?}", job);
            return Err(BroccoliError::Job("Invalid data - will not retry".into()));
        }
        Err(ValidationError::ServiceUnavailable) => {
            // Transient failure - let it retry
            return Err(BroccoliError::Job("Service unavailable".into()));
        }
        Ok(_) => {}
    }
    
    do_work(&job).await
}
Broccoli doesn’t include built-in backoff, but you can implement it in your handler:
async fn process_with_backoff(msg: BrokerMessage<JobPayload>) -> Result<(), BroccoliError> {
    // Add delay based on attempt count
    let delay_ms = 100 * 2_u64.pow(msg.attempts as u32);
    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
    
    process_job(msg.payload).await
}

Failed message queue

Messages that exceed the retry limit are moved to a failed queue ({queue_name}:failed). You can:
  • Monitor this queue for alerting
  • Manually inspect and retry failed messages
  • Use the management API to retrieve queue status
// With management feature enabled
#[cfg(feature = "management")]
{
    let status = queue.queue_status("jobs".into(), None).await?;
    println!("Failed messages: {}", status.failed_count);
}