Skip to main content

Overview

Fairness queues prevent one category of messages from monopolizing processing resources. By using disambiguators, you can ensure messages are processed fairly across different tenants, users, or categories.
Fairness queues are currently only supported with the Redis broker.

The problem

Without fairness, a burst of messages from one source can delay others: Tenant B and C must wait for all of Tenant A’s messages to process.

With fairness

Fairness distributes processing across categories:

Using disambiguators

Publishing with disambiguators

Include a disambiguator when publishing:
// Each tenant gets their own sub-queue
queue.publish("jobs", Some("tenant-a".into()), &job_a, None).await?;
queue.publish("jobs", Some("tenant-b".into()), &job_b, None).await?;
queue.publish("jobs", Some("tenant-c".into()), &job_c, None).await?;

Consuming with fairness

Enable fairness in consume options:
use broccoli_queue::queue::ConsumeOptions;

let options = ConsumeOptions::builder()
    .fairness(true)
    .build();

queue.process_messages("jobs", Some(4), Some(options), |msg| async move {
    println!("Processing for {:?}", msg.disambiguator);
    Ok(())
}).await?;

How it works

With fairness enabled:
  1. Messages are stored in sub-queues based on disambiguator
  2. The consumer rotates through sub-queues in round-robin fashion
  3. Each consume operation pulls from the next sub-queue

Use cases

Multi-tenant applications

Prevent one tenant from affecting others:
async fn handle_tenant_job(tenant_id: &str, job: TenantJob) {
    queue.publish(
        "tenant-jobs",
        Some(tenant_id.to_string()),
        &job,
        None
    ).await?;
}

// Consumer processes fairly across tenants
let options = ConsumeOptions::builder().fairness(true).build();
queue.process_messages("tenant-jobs", Some(8), Some(options), handler).await?;

Priority levels

Implement soft priority with multiple queues:
// High priority gets its own queue
queue.publish("jobs", Some("priority-high".into()), &urgent_job, None).await?;

// Normal priority
queue.publish("jobs", Some("priority-normal".into()), &normal_job, None).await?;

// With fairness, both get equal attention
// For true priority, use separate queues or priority options

User-based fairness

Prevent a single user from overwhelming the system:
let user_id = request.user_id;
queue.publish("user-jobs", Some(user_id), &job, None).await?;

Queue size with fairness

When using fairness, queue.size() returns sizes for each sub-queue:
let sizes = queue.size("jobs").await?;

for (queue_name, size) in sizes {
    println!("{}: {} messages", queue_name, size);
}
// Output:
// jobs:fairness:tenant-a: 45 messages
// jobs:fairness:tenant-b: 12 messages
// jobs:fairness:tenant-c: 8 messages

Configuration options

let options = ConsumeOptions::builder()
    .fairness(true)
    .auto_ack(false)      // Manual acknowledgment
    .consume_wait(Duration::from_millis(10))  // Wait between iterations
    .build();

Best practices

Use identifiers that represent your fairness boundaries:
  • Tenant ID for multi-tenant apps
  • User ID for user fairness
  • Region for geographic distribution
  • Priority level for soft prioritization
Track the size of each sub-queue to detect imbalances:
let sizes = queue.size("jobs").await?;
for (name, size) in sizes {
    metrics::gauge!("queue.size", size as f64, "queue" => name);
}
Too many unique disambiguators can impact performance. Consider bucketing:
// Instead of per-user, bucket by user hash
let bucket = format!("bucket-{}", user_id.hash() % 100);
queue.publish("jobs", Some(bucket), &job, None).await?;

Limitations

  • Only available with Redis broker
  • Round-robin is fixed (no weighted fairness)
  • Empty sub-queues are checked (slight overhead)

Alternatives

If fairness queues don’t fit your needs:
  1. Separate queues: Create distinct queues per category
  2. Priority option: Use PublishOptions::priority() for priority-based ordering
  3. Custom routing: Implement your own routing logic with multiple queues