Skip to main content

Overview

SurrealDB can be used as a message broker, which is useful when you already use SurrealDB in your application or need an embedded queue for testing.

Installation

Enable SurrealDB support in your Cargo.toml:
[dependencies]
broccoli_queue = { version = "0.4", default-features = false, features = ["surrealdb"] }

Connection

WebSocket connection

use broccoli_queue::queue::BroccoliQueue;

let queue = BroccoliQueue::builder("ws://localhost:8000")
    .pool_connections(5)
    .build()
    .await?;

In-memory (testing)

let queue = BroccoliQueue::builder("mem://")
    .build()
    .await?;

Reuse existing connection

If you already have a SurrealDB connection, you can reuse it:
use surrealdb::Surreal;
use surrealdb::engine::any::Any;

// Your existing SurrealDB connection
let db: Surreal<Any> = // ...

let queue = BroccoliQueue::builder_with(db)
    .failed_message_retry_strategy(Default::default())
    .build()
    .await?;

Starting SurrealDB

Docker

docker run -d --name surrealdb \
  -p 8000:8000 \
  surrealdb/surrealdb:latest \
  start --user root --pass root

With file persistence

docker run -d --name surrealdb \
  -p 8000:8000 \
  -v surrealdb-data:/data \
  surrealdb/surrealdb:latest \
  start --user root --pass root file:/data/mydb.db

Features

In-memory mode

Perfect for testing and development:
#[tokio::test]
async fn test_message_processing() {
    let queue = BroccoliQueue::builder("mem://")
        .build()
        .await
        .unwrap();
    
    // Run your tests with an ephemeral queue
    queue.publish("test", None, &"test message", None).await.unwrap();
}

Message scheduling

SurrealDB supports delayed message delivery:
use broccoli_queue::queue::PublishOptions;
use time::Duration;

let queue = BroccoliQueue::builder("ws://localhost:8000")
    .enable_scheduling(true)
    .build()
    .await?;

let options = PublishOptions::builder()
    .delay(Duration::seconds(60))
    .build();

queue.publish("jobs", None, &job, Some(options)).await?;

SurrealDB data model

Broccoli stores messages in SurrealDB tables:
TablePurpose
broccoli_{queue}Main queue messages
broccoli_{queue}_processingMessages being processed
broccoli_{queue}_failedFailed messages
broccoli_{queue}_scheduledScheduled messages

Configuration example

use broccoli_queue::queue::{BroccoliQueue, RetryStrategy};

let queue = BroccoliQueue::builder("ws://localhost:8000")
    .pool_connections(5)
    .failed_message_retry_strategy(
        RetryStrategy::new()
            .with_attempts(3)
    )
    .enable_scheduling(true)
    .build()
    .await?;

Best practices

The mem:// scheme is ideal for unit and integration tests:
#[cfg(test)]
mod tests {
    use super::*;
    
    async fn setup_test_queue() -> BroccoliQueue {
        BroccoliQueue::builder("mem://")
            .build()
            .await
            .unwrap()
    }
}
If your application already uses SurrealDB, reuse the connection:
let queue = BroccoliQueue::builder_with(existing_db)
    .build()
    .await?;
SurrealDB may have different concurrency characteristics than dedicated message brokers. Test with your expected load.

Limitations

SurrealDB as a message broker has some limitations compared to Redis or RabbitMQ:
  • No fairness queues - Disambiguator-based fairness is not supported
  • No management API - Queue status monitoring is not available
  • Performance - May not match dedicated brokers for high throughput

When to use SurrealDB

Good use cases:
  • You already use SurrealDB and want to minimize dependencies
  • Testing and development with in-memory queues
  • Embedded applications with moderate queue requirements
Consider Redis or RabbitMQ for:
  • High-throughput production workloads
  • Complex routing requirements
  • When you need fairness queues or management APIs

Troubleshooting

Connection failed

Failed to connect to broker
Verify SurrealDB is running:
curl http://localhost:8000/health

Concurrency errors

Broker error: non-idempotent operation
This can occur with parallel consumers. Consider reducing concurrency:
queue.process_messages("jobs", Some(2), None, handler).await?;