Skip to main content

BroccoliQueue

The main entry point for interacting with Broccoli message queues.
use broccoli_queue::queue::BroccoliQueue;

Creating a queue

builder

Creates a new BroccoliQueueBuilder for configuring the queue.
pub fn builder(broker_url: impl Into<String>) -> BroccoliQueueBuilder
Parameters:
  • broker_url - Connection URL for the message broker
Example:
let queue = BroccoliQueue::builder("redis://localhost:6379")
    .pool_connections(5)
    .build()
    .await?;

builder_with (SurrealDB only)

Creates a builder with an existing SurrealDB connection.
#[cfg(feature = "surrealdb")]
pub fn builder_with(db: Surreal<Any>) -> BroccoliQueueBuilder

Publishing

publish

Publishes a single message to a queue.
pub async fn publish<T>(
    &self,
    topic: &str,
    disambiguator: Option<String>,
    message: &T,
    options: Option<PublishOptions>,
) -> Result<BrokerMessage<T>, BroccoliError>
where
    T: Clone + Serialize + DeserializeOwned
Parameters:
  • topic - Queue name
  • disambiguator - Optional identifier for fairness queues
  • message - The message payload
  • options - Optional publish configuration
Returns: The wrapped BrokerMessage with generated task_id Example:
let message = queue.publish("jobs", None, &job, None).await?;
println!("Published: {}", message.task_id);

publish_batch

Publishes multiple messages to a queue.
pub async fn publish_batch<T>(
    &self,
    topic: &str,
    disambiguator: Option<String>,
    messages: impl IntoIterator<Item = T>,
    options: Option<PublishOptions>,
) -> Result<Vec<BrokerMessage<T>>, BroccoliError>
where
    T: Clone + Serialize + DeserializeOwned
Example:
let jobs = vec![job1, job2, job3];
let messages = queue.publish_batch("jobs", None, jobs, None).await?;

Consuming

consume

Consumes a message, blocking until one is available.
pub async fn consume<T>(
    &self,
    topic: &str,
    options: Option<ConsumeOptions>,
) -> Result<BrokerMessage<T>, BroccoliError>
where
    T: Clone + Serialize + DeserializeOwned
Example:
let message: BrokerMessage<JobPayload> = queue.consume("jobs", None).await?;

try_consume

Attempts to consume a message without blocking.
pub async fn try_consume<T>(
    &self,
    topic: &str,
    options: Option<ConsumeOptions>,
) -> Result<Option<BrokerMessage<T>>, BroccoliError>
where
    T: Clone + Serialize + DeserializeOwned
Returns: Some(message) if available, None otherwise Example:
if let Some(message) = queue.try_consume::<JobPayload>("jobs", None).await? {
    // Process message
}

consume_batch

Consumes multiple messages with a timeout.
pub async fn consume_batch<T>(
    &self,
    topic: &str,
    batch_size: usize,
    timeout: Duration,
    options: Option<ConsumeOptions>,
) -> Result<Vec<BrokerMessage<T>>, BroccoliError>
where
    T: Clone + Serialize + DeserializeOwned
Example:
let messages = queue.consume_batch::<JobPayload>(
    "jobs",
    10,
    Duration::seconds(5),
    None
).await?;

try_consume_batch

Attempts to consume multiple messages without blocking.
pub async fn try_consume_batch<T>(
    &self,
    topic: &str,
    batch_size: usize,
    options: Option<ConsumeOptions>,
) -> Result<Vec<BrokerMessage<T>>, BroccoliError>

Message handling

acknowledge

Acknowledges successful processing of a message.
pub async fn acknowledge<T>(
    &self,
    topic: &str,
    message: BrokerMessage<T>,
) -> Result<(), BroccoliError>
where
    T: Clone + Serialize + DeserializeOwned
Example:
queue.acknowledge("jobs", message).await?;

reject

Rejects a message, triggering retry or failure handling.
pub async fn reject<T>(
    &self,
    topic: &str,
    message: BrokerMessage<T>,
) -> Result<(), BroccoliError>
where
    T: Clone + Serialize + DeserializeOwned

cancel

Cancels a message by ID.
pub async fn cancel(
    &self,
    topic: &str,
    message_id: String,
) -> Result<(), BroccoliError>

Processing

process_messages

Processes messages in a loop with a handler function.
pub async fn process_messages<T, F, Fut>(
    &self,
    topic: &str,
    concurrency: Option<usize>,
    consume_options: Option<ConsumeOptions>,
    handler: F,
) -> Result<(), BroccoliError>
where
    T: DeserializeOwned + Send + Clone + Serialize + 'static,
    F: Fn(BrokerMessage<T>) -> Fut + Send + Sync + Clone + 'static,
    Fut: Future<Output = Result<(), BroccoliError>> + Send + 'static
Parameters:
  • topic - Queue name
  • concurrency - Number of concurrent workers (None for single-threaded)
  • consume_options - Optional consume configuration
  • handler - Async function to process each message
Example:
queue.process_messages("jobs", Some(4), None, |msg: BrokerMessage<Job>| async move {
    println!("Processing: {:?}", msg.payload);
    Ok(())
}).await?;

process_messages_with_handlers

Processes messages with separate success and error handlers.
pub async fn process_messages_with_handlers<T, F, MessageFut, SuccessFut, ErrorFut, S, E, R>(
    &self,
    topic: &str,
    concurrency: Option<usize>,
    consume_options: Option<ConsumeOptions>,
    message_handler: F,
    on_success: S,
    on_error: E,
) -> Result<(), BroccoliError>
Example:
queue.process_messages_with_handlers(
    "jobs",
    Some(4),
    None,
    |msg| async move { process(msg.payload).await },
    |msg, result| async move { log_success(&msg).await; Ok(()) },
    |msg, error| async move { log_error(&msg, &error).await; Ok(()) },
).await?;

Queue information

size

Returns the size of queue(s).
pub async fn size(
    &self,
    queue_name: &str,
) -> Result<HashMap<String, u64>, BroccoliError>
Returns: Map of queue names to message counts

queue_status (management feature)

Returns detailed queue status.
#[cfg(feature = "management")]
pub async fn queue_status(
    &self,
    queue_name: String,
    disambiguator: Option<String>,
) -> Result<QueueStatus, BroccoliError>

BroccoliQueueBuilder

Builder for configuring BroccoliQueue.

pool_connections

Sets the connection pool size.
pub const fn pool_connections(mut self, connections: u8) -> Self
Default: 10

failed_message_retry_strategy

Configures retry behavior for failed messages.
pub const fn failed_message_retry_strategy(mut self, strategy: RetryStrategy) -> Self

enable_scheduling

Enables message scheduling.
pub const fn enable_scheduling(mut self, enable: bool) -> Self
Default: false

build

Builds the queue and connects to the broker.
pub async fn build(self) -> Result<BroccoliQueue, BroccoliError>