Skip to main content

Overview

Broccoli supports scheduling messages for delayed delivery. You can either delay a message by a duration or schedule it for a specific time.

Enabling scheduling

Enable scheduling when building your queue:
let queue = BroccoliQueue::builder("redis://localhost:6379")
    .enable_scheduling(true)
    .build()
    .await?;
For RabbitMQ, you must install the delayed-exchange plugin.

Delayed delivery

Delay a message by a specific duration:
use broccoli_queue::queue::PublishOptions;
use time::Duration;

let options = PublishOptions::builder()
    .delay(Duration::seconds(30))
    .build();

queue.publish("jobs", None, &job, Some(options)).await?;
The message will be delivered approximately 30 seconds after publishing.

Scheduled delivery

Schedule a message for a specific time:
use broccoli_queue::queue::PublishOptions;
use time::OffsetDateTime;

// Schedule for tomorrow at midnight
let scheduled_time = OffsetDateTime::now_utc()
    .replace_time(time::Time::MIDNIGHT)
    + time::Duration::days(1);

let options = PublishOptions::builder()
    .schedule_at(scheduled_time)
    .build();

queue.publish("jobs", None, &job, Some(options)).await?;

Batch scheduling

Schedule multiple messages:
use time::Duration;

let jobs = vec![
    JobPayload { id: "1".into(), task: "task-a".into() },
    JobPayload { id: "2".into(), task: "task-b".into() },
];

let options = PublishOptions::builder()
    .delay(Duration::minutes(5))
    .build();

queue.publish_batch("jobs", None, jobs, Some(options)).await?;

PublishOptions

The PublishOptions struct supports several scheduling-related options:
pub struct PublishOptions {
    /// Time-to-live for the message
    pub ttl: Option<Duration>,
    
    /// Message priority (1-5, where 1 is highest)
    pub priority: Option<u8>,
    
    /// Delay before delivery
    pub delay: Option<Duration>,
    
    /// Specific delivery time
    pub scheduled_at: Option<OffsetDateTime>,
}

Builder pattern

let options = PublishOptions::builder()
    .delay(Duration::seconds(60))
    .priority(1)  // High priority
    .ttl(Duration::hours(24))
    .build();

Use cases

Scheduled reports

#[derive(Clone, Serialize, Deserialize)]
struct ReportJob {
    report_type: String,
    recipients: Vec<String>,
}

// Schedule daily report for 6 AM
let tomorrow_6am = OffsetDateTime::now_utc()
    .replace_time(time::Time::from_hms(6, 0, 0)?)
    + time::Duration::days(1);

let report = ReportJob {
    report_type: "daily_summary".into(),
    recipients: vec!["admin@example.com".into()],
};

queue.publish(
    "reports",
    None,
    &report,
    Some(PublishOptions::builder().schedule_at(tomorrow_6am).build())
).await?;

Retry with backoff

async fn handle_with_retry(
    queue: &BroccoliQueue,
    job: JobPayload,
    attempt: u8,
) -> Result<(), BroccoliError> {
    match process_job(&job).await {
        Ok(_) => Ok(()),
        Err(e) if attempt < 5 => {
            // Exponential backoff: 1s, 2s, 4s, 8s, 16s
            let delay = Duration::seconds(2_i64.pow(attempt as u32));
            
            queue.publish(
                "jobs",
                None,
                &job,
                Some(PublishOptions::builder().delay(delay).build())
            ).await?;
            
            Ok(())
        }
        Err(e) => Err(e),
    }
}

Rate limiting

// Spread 100 emails over 10 minutes to avoid rate limits
for (i, email) in emails.iter().enumerate() {
    let delay = Duration::seconds((i as i64) * 6); // One every 6 seconds
    
    queue.publish(
        "emails",
        None,
        email,
        Some(PublishOptions::builder().delay(delay).build())
    ).await?;
}

How scheduling works

Broker-specific behavior

FeatureRedisRabbitMQSurrealDB
delay✅ (requires plugin)
scheduled_at✅ (requires plugin)
Precision~1 second~1 second~1 second

Best practices

For delays under a few hours, use delay:
PublishOptions::builder().delay(Duration::minutes(30)).build()
For calendar-based scheduling, use scheduled_at:
PublishOptions::builder().schedule_at(next_monday_9am).build()
scheduled_at uses OffsetDateTime. Be explicit about time zones:
use time::{OffsetDateTime, UtcOffset};

// Schedule for 9 AM in UTC-5 (EST)
let est = UtcOffset::from_hms(-5, 0, 0)?;
let scheduled = OffsetDateTime::now_utc()
    .to_offset(est)
    .replace_time(time::Time::from_hms(9, 0, 0)?);
With Redis, check the scheduled queue:
redis-cli ZCARD jobs:scheduled