BroccoliError
The error type for all Broccoli operations.Copy
use broccoli_queue::error::BroccoliError;
Variants
Copy
#[derive(Debug, thiserror::Error)]
pub enum BroccoliError {
/// Broker connection or operation errors
#[error("Broker error: {0}")]
Broker(String),
/// Non-idempotent concurrent operation (SurrealDB)
#[error("Broker error: non-idempotent operation {0}")]
BrokerNonIdempotentOp(String),
/// Retriable non-idempotent operation (SurrealDB)
#[error("Broker error: non-idempotent retriable operation {0}")]
BrokerNonIdempotentRetriableOp(String),
/// Failed to publish message
#[error("Failed to publish message: {0}")]
Publish(String),
/// Failed to consume message
#[error("Failed to consume message: {0}")]
Consume(String),
/// Failed to acknowledge message
#[error("Failed to acknowledge message: {0}")]
Acknowledge(String),
/// Failed to reject message
#[error("Failed to reject message: {0}")]
Reject(String),
/// Failed to cancel message
#[error("Failed to cancel message: {0}")]
Cancel(String),
/// Failed to get message position
#[error("Failed to get message position: {0}")]
GetMessagePosition(String),
/// JSON serialization/deserialization error
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
/// Redis-specific error (with redis feature)
#[cfg(feature = "redis")]
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
/// SurrealDB-specific error (with surrealdb feature)
#[cfg(feature = "surrealdb")]
#[error("SurrealDB error: {0}")]
SurrealDB(#[from] surrealdb::Error),
/// Job processing error
#[error("Job error: {0}")]
Job(String),
/// Queue status retrieval error
#[error("Queue status error: {0}")]
QueueStatus(String),
/// Connection timeout
#[error("Connection timeout after {0} retries")]
ConnectionTimeout(u32),
/// Feature not implemented for broker
#[error("Feature not implemented")]
NotImplemented,
}
Error handling
Basic error handling
Copy
use broccoli_queue::error::BroccoliError;
async fn publish_job(queue: &BroccoliQueue, job: &Job) -> Result<(), BroccoliError> {
queue.publish("jobs", None, job, None).await?;
Ok(())
}
// Usage
match publish_job(&queue, &job).await {
Ok(_) => println!("Published successfully"),
Err(BroccoliError::Publish(msg)) => eprintln!("Publish failed: {}", msg),
Err(BroccoliError::Broker(msg)) => eprintln!("Broker error: {}", msg),
Err(e) => eprintln!("Other error: {}", e),
}
In message handlers
Copy
queue.process_messages("jobs", Some(4), None, |msg: BrokerMessage<Job>| async move {
// Return BroccoliError::Job for business logic failures
if !is_valid(&msg.payload) {
return Err(BroccoliError::Job("Invalid job data".into()));
}
// Process the job
process(&msg.payload).await.map_err(|e| {
BroccoliError::Job(format!("Processing failed: {}", e))
})?;
Ok(())
}).await?;
Converting from other errors
Copy
async fn process_job(job: &Job) -> Result<(), BroccoliError> {
// From serde_json::Error
let data: Value = serde_json::from_str(&job.data)?;
// From custom errors
external_service(&data)
.await
.map_err(|e| BroccoliError::Job(e.to_string()))?;
Ok(())
}
Common error scenarios
Connection errors
Copy
let result = BroccoliQueue::builder("redis://invalid:6379")
.build()
.await;
match result {
Err(BroccoliError::Broker(msg)) => {
eprintln!("Failed to connect: {}", msg);
// Handle reconnection or fallback
}
_ => {}
}
Serialization errors
Copy
// This would fail if Job doesn't implement Serialize
let result = queue.publish("jobs", None, &invalid_job, None).await;
match result {
Err(BroccoliError::Serialization(e)) => {
eprintln!("Failed to serialize message: {}", e);
}
_ => {}
}
Timeout errors
Copy
match result {
Err(BroccoliError::ConnectionTimeout(retries)) => {
eprintln!("Connection timed out after {} retries", retries);
}
_ => {}
}
Feature not implemented
Copy
// Some operations aren't available on all brokers
match queue.some_operation().await {
Err(BroccoliError::NotImplemented) => {
eprintln!("This feature isn't supported by your broker");
}
_ => {}
}
Error recovery patterns
Retry with backoff
Copy
async fn publish_with_retry(
queue: &BroccoliQueue,
job: &Job,
max_retries: u32,
) -> Result<(), BroccoliError> {
let mut attempts = 0;
loop {
match queue.publish("jobs", None, job, None).await {
Ok(_) => return Ok(()),
Err(BroccoliError::Broker(_)) if attempts < max_retries => {
attempts += 1;
let delay = std::time::Duration::from_millis(100 * 2_u64.pow(attempts));
tokio::time::sleep(delay).await;
}
Err(e) => return Err(e),
}
}
}
Graceful degradation
Copy
async fn process_with_fallback(job: &Job) -> Result<(), BroccoliError> {
match primary_processing(job).await {
Ok(_) => Ok(()),
Err(BroccoliError::Job(_)) => {
// Try fallback processing
fallback_processing(job).await
}
Err(e) => Err(e), // Propagate other errors
}
}
Logging errors
Copy
use log::{error, warn};
queue.process_messages("jobs", Some(4), None, |msg: BrokerMessage<Job>| async move {
match process_job(&msg.payload).await {
Ok(_) => Ok(()),
Err(e) => {
error!(
"Job {} failed (attempt {}): {:?}",
msg.task_id, msg.attempts, e
);
Err(e)
}
}
}).await?;
Custom error types
If you need richer error information in your handlers:Copy
#[derive(Debug)]
enum JobError {
ValidationFailed(String),
ExternalServiceError(String),
ResourceNotFound(String),
}
impl From<JobError> for BroccoliError {
fn from(e: JobError) -> Self {
BroccoliError::Job(format!("{:?}", e))
}
}
async fn process_job(job: &Job) -> Result<(), JobError> {
if !is_valid(job) {
return Err(JobError::ValidationFailed("Invalid fields".into()));
}
Ok(())
}
// In handler
queue.process_messages("jobs", Some(4), None, |msg| async move {
process_job(&msg.payload).await.map_err(Into::into)
}).await?;