Overview
SurrealDB can be used as a message broker, which is useful when you already use SurrealDB in your application or need an embedded queue for testing.
Installation
Enable SurrealDB support in your Cargo.toml:
[ dependencies ]
broccoli_queue = { version = "0.4" , default-features = false , features = [ "surrealdb" ] }
Connection
WebSocket connection
use broccoli_queue :: queue :: BroccoliQueue ;
let queue = BroccoliQueue :: builder ( "ws://localhost:8000" )
. pool_connections ( 5 )
. build ()
. await ? ;
In-memory (testing)
let queue = BroccoliQueue :: builder ( "mem://" )
. build ()
. await ? ;
Reuse existing connection
If you already have a SurrealDB connection, you can reuse it:
use surrealdb :: Surreal ;
use surrealdb :: engine :: any :: Any ;
// Your existing SurrealDB connection
let db : Surreal < Any > = // ...
let queue = BroccoliQueue :: builder_with ( db )
. failed_message_retry_strategy ( Default :: default ())
. build ()
. await ? ;
Starting SurrealDB
Docker
docker run -d --name surrealdb \
-p 8000:8000 \
surrealdb/surrealdb:latest \
start --user root --pass root
With file persistence
docker run -d --name surrealdb \
-p 8000:8000 \
-v surrealdb-data:/data \
surrealdb/surrealdb:latest \
start --user root --pass root file:/data/mydb.db
Features
In-memory mode
Perfect for testing and development:
#[tokio :: test]
async fn test_message_processing () {
let queue = BroccoliQueue :: builder ( "mem://" )
. build ()
. await
. unwrap ();
// Run your tests with an ephemeral queue
queue . publish ( "test" , None , & "test message" , None ) . await . unwrap ();
}
Message scheduling
SurrealDB supports delayed message delivery:
use broccoli_queue :: queue :: PublishOptions ;
use time :: Duration ;
let queue = BroccoliQueue :: builder ( "ws://localhost:8000" )
. enable_scheduling ( true )
. build ()
. await ? ;
let options = PublishOptions :: builder ()
. delay ( Duration :: seconds ( 60 ))
. build ();
queue . publish ( "jobs" , None , & job , Some ( options )) . await ? ;
SurrealDB data model
Broccoli stores messages in SurrealDB tables:
Table Purpose broccoli_{queue}Main queue messages broccoli_{queue}_processingMessages being processed broccoli_{queue}_failedFailed messages broccoli_{queue}_scheduledScheduled messages
Configuration example
use broccoli_queue :: queue :: { BroccoliQueue , RetryStrategy };
let queue = BroccoliQueue :: builder ( "ws://localhost:8000" )
. pool_connections ( 5 )
. failed_message_retry_strategy (
RetryStrategy :: new ()
. with_attempts ( 3 )
)
. enable_scheduling ( true )
. build ()
. await ? ;
Best practices
The mem:// scheme is ideal for unit and integration tests: #[cfg(test)]
mod tests {
use super ::* ;
async fn setup_test_queue () -> BroccoliQueue {
BroccoliQueue :: builder ( "mem://" )
. build ()
. await
. unwrap ()
}
}
If your application already uses SurrealDB, reuse the connection: let queue = BroccoliQueue :: builder_with ( existing_db )
. build ()
. await ? ;
Consider concurrency limits
SurrealDB may have different concurrency characteristics than dedicated message brokers. Test with your expected load.
Limitations
SurrealDB as a message broker has some limitations compared to Redis or RabbitMQ:
No fairness queues - Disambiguator-based fairness is not supported
No management API - Queue status monitoring is not available
Performance - May not match dedicated brokers for high throughput
When to use SurrealDB
Good use cases:
You already use SurrealDB and want to minimize dependencies
Testing and development with in-memory queues
Embedded applications with moderate queue requirements
Consider Redis or RabbitMQ for:
High-throughput production workloads
Complex routing requirements
When you need fairness queues or management APIs
Troubleshooting
Connection failed
Failed to connect to broker
Verify SurrealDB is running:
curl http://localhost:8000/health
Concurrency errors
Broker error: non-idempotent operation
This can occur with parallel consumers. Consider reducing concurrency:
queue . process_messages ( "jobs" , Some ( 2 ), None , handler ) . await ? ;