Skip to main content

BroccoliError

The error type for all Broccoli operations.
use broccoli_queue::error::BroccoliError;

Variants

#[derive(Debug, thiserror::Error)]
pub enum BroccoliError {
    /// Broker connection or operation errors
    #[error("Broker error: {0}")]
    Broker(String),
    
    /// Non-idempotent concurrent operation (SurrealDB)
    #[error("Broker error: non-idempotent operation {0}")]
    BrokerNonIdempotentOp(String),
    
    /// Retriable non-idempotent operation (SurrealDB)
    #[error("Broker error: non-idempotent retriable operation {0}")]
    BrokerNonIdempotentRetriableOp(String),
    
    /// Failed to publish message
    #[error("Failed to publish message: {0}")]
    Publish(String),
    
    /// Failed to consume message
    #[error("Failed to consume message: {0}")]
    Consume(String),
    
    /// Failed to acknowledge message
    #[error("Failed to acknowledge message: {0}")]
    Acknowledge(String),
    
    /// Failed to reject message
    #[error("Failed to reject message: {0}")]
    Reject(String),
    
    /// Failed to cancel message
    #[error("Failed to cancel message: {0}")]
    Cancel(String),
    
    /// Failed to get message position
    #[error("Failed to get message position: {0}")]
    GetMessagePosition(String),
    
    /// JSON serialization/deserialization error
    #[error("Serialization error: {0}")]
    Serialization(#[from] serde_json::Error),
    
    /// Redis-specific error (with redis feature)
    #[cfg(feature = "redis")]
    #[error("Redis error: {0}")]
    Redis(#[from] redis::RedisError),
    
    /// SurrealDB-specific error (with surrealdb feature)
    #[cfg(feature = "surrealdb")]
    #[error("SurrealDB error: {0}")]
    SurrealDB(#[from] surrealdb::Error),
    
    /// Job processing error
    #[error("Job error: {0}")]
    Job(String),
    
    /// Queue status retrieval error
    #[error("Queue status error: {0}")]
    QueueStatus(String),
    
    /// Connection timeout
    #[error("Connection timeout after {0} retries")]
    ConnectionTimeout(u32),
    
    /// Feature not implemented for broker
    #[error("Feature not implemented")]
    NotImplemented,
}

Error handling

Basic error handling

use broccoli_queue::error::BroccoliError;

async fn publish_job(queue: &BroccoliQueue, job: &Job) -> Result<(), BroccoliError> {
    queue.publish("jobs", None, job, None).await?;
    Ok(())
}

// Usage
match publish_job(&queue, &job).await {
    Ok(_) => println!("Published successfully"),
    Err(BroccoliError::Publish(msg)) => eprintln!("Publish failed: {}", msg),
    Err(BroccoliError::Broker(msg)) => eprintln!("Broker error: {}", msg),
    Err(e) => eprintln!("Other error: {}", e),
}

In message handlers

queue.process_messages("jobs", Some(4), None, |msg: BrokerMessage<Job>| async move {
    // Return BroccoliError::Job for business logic failures
    if !is_valid(&msg.payload) {
        return Err(BroccoliError::Job("Invalid job data".into()));
    }
    
    // Process the job
    process(&msg.payload).await.map_err(|e| {
        BroccoliError::Job(format!("Processing failed: {}", e))
    })?;
    
    Ok(())
}).await?;

Converting from other errors

async fn process_job(job: &Job) -> Result<(), BroccoliError> {
    // From serde_json::Error
    let data: Value = serde_json::from_str(&job.data)?;
    
    // From custom errors
    external_service(&data)
        .await
        .map_err(|e| BroccoliError::Job(e.to_string()))?;
    
    Ok(())
}

Common error scenarios

Connection errors

let result = BroccoliQueue::builder("redis://invalid:6379")
    .build()
    .await;

match result {
    Err(BroccoliError::Broker(msg)) => {
        eprintln!("Failed to connect: {}", msg);
        // Handle reconnection or fallback
    }
    _ => {}
}

Serialization errors

// This would fail if Job doesn't implement Serialize
let result = queue.publish("jobs", None, &invalid_job, None).await;

match result {
    Err(BroccoliError::Serialization(e)) => {
        eprintln!("Failed to serialize message: {}", e);
    }
    _ => {}
}

Timeout errors

match result {
    Err(BroccoliError::ConnectionTimeout(retries)) => {
        eprintln!("Connection timed out after {} retries", retries);
    }
    _ => {}
}

Feature not implemented

// Some operations aren't available on all brokers
match queue.some_operation().await {
    Err(BroccoliError::NotImplemented) => {
        eprintln!("This feature isn't supported by your broker");
    }
    _ => {}
}

Error recovery patterns

Retry with backoff

async fn publish_with_retry(
    queue: &BroccoliQueue,
    job: &Job,
    max_retries: u32,
) -> Result<(), BroccoliError> {
    let mut attempts = 0;
    
    loop {
        match queue.publish("jobs", None, job, None).await {
            Ok(_) => return Ok(()),
            Err(BroccoliError::Broker(_)) if attempts < max_retries => {
                attempts += 1;
                let delay = std::time::Duration::from_millis(100 * 2_u64.pow(attempts));
                tokio::time::sleep(delay).await;
            }
            Err(e) => return Err(e),
        }
    }
}

Graceful degradation

async fn process_with_fallback(job: &Job) -> Result<(), BroccoliError> {
    match primary_processing(job).await {
        Ok(_) => Ok(()),
        Err(BroccoliError::Job(_)) => {
            // Try fallback processing
            fallback_processing(job).await
        }
        Err(e) => Err(e),  // Propagate other errors
    }
}

Logging errors

use log::{error, warn};

queue.process_messages("jobs", Some(4), None, |msg: BrokerMessage<Job>| async move {
    match process_job(&msg.payload).await {
        Ok(_) => Ok(()),
        Err(e) => {
            error!(
                "Job {} failed (attempt {}): {:?}",
                msg.task_id, msg.attempts, e
            );
            Err(e)
        }
    }
}).await?;

Custom error types

If you need richer error information in your handlers:
#[derive(Debug)]
enum JobError {
    ValidationFailed(String),
    ExternalServiceError(String),
    ResourceNotFound(String),
}

impl From<JobError> for BroccoliError {
    fn from(e: JobError) -> Self {
        BroccoliError::Job(format!("{:?}", e))
    }
}

async fn process_job(job: &Job) -> Result<(), JobError> {
    if !is_valid(job) {
        return Err(JobError::ValidationFailed("Invalid fields".into()));
    }
    Ok(())
}

// In handler
queue.process_messages("jobs", Some(4), None, |msg| async move {
    process_job(&msg.payload).await.map_err(Into::into)
}).await?;