diff --git a/CHANGELOG.md b/CHANGELOG.md index 78d2530e3..133593b3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Feat: Worker::perform_later() now returns job IDs for status tracking. Returns `Result>` with the job ID when using background queue mode. [https://github.com/loco-rs/loco/issues/1623](https://github.com/loco-rs/loco/issues/1623) ## v0.16.4 - Feat: decouple JWT authentication from database dependency. [https://github.com/loco-rs/loco/pull/1546](https://github.com/loco-rs/loco/pull/1546) diff --git a/docs-site/content/docs/processing/workers.md b/docs-site/content/docs/processing/workers.md index 21c771dac..0156716ec 100644 --- a/docs-site/content/docs/processing/workers.md +++ b/docs-site/content/docs/processing/workers.md @@ -139,13 +139,19 @@ To use a worker, we mainly think about adding a job to the queue, so you `use` t ```rust // .. in your controller .. - DownloadWorker::perform_later( + let job_id = DownloadWorker::perform_later( &ctx, DownloadWorkerArgs { user_guid: "foo".to_string(), }, ) - .await + .await?; + + // The job ID can be used for tracking job status + if let Some(id) = job_id { + println!("Job queued with ID: {}", id); + // You can store this ID to check job status later + } ``` Unlike Rails and Ruby, with Rust you can enjoy _strongly typed_ job arguments which gets serialized and pushed into the queue. @@ -231,7 +237,7 @@ The `BackgroundWorker` trait is the core interface for defining background worke - `queue() -> Option`: Optional method to specify a custom queue for the worker (returns `None` by default). - `tags() -> Vec`: Optional method to specify tags for this worker (returns an empty vector by default). - `class_name() -> String`: Returns the worker's class name (automatically derived from the struct name). -- `perform_later(ctx: &AppContext, args: A) -> Result<()>`: Static method to enqueue a job to be performed later. +- `perform_later(ctx: &AppContext, args: A) -> Result>`: Static method to enqueue a job to be performed later. Returns `Some(job_id)` when using background queue mode with a provider, `None` otherwise. ### Generate a Worker diff --git a/src/bgworker/mod.rs b/src/bgworker/mod.rs index 98d2e2422..26b72097b 100644 --- a/src/bgworker/mod.rs +++ b/src/bgworker/mod.rs @@ -91,9 +91,13 @@ pub enum Queue { impl Queue { /// Add a job to the queue /// + /// Returns the job ID if the queue provider supports it: + /// - `Some(String)` - Job ID for Redis, `PostgreSQL`, and `SQLite` providers + /// - `None` - When using `Queue::None` or if the provider doesn't support job IDs + /// /// # Errors /// - /// This function will return an error if fails + /// This function will return an error if the enqueue operation fails #[allow(unused_variables)] pub async fn enqueue( &self, @@ -101,15 +105,15 @@ impl Queue { queue: Option, args: A, tags: Option>, - ) -> Result<()> { + ) -> Result> { tracing::debug!(worker = class, queue = ?queue, tags = ?tags, "Enqueuing background job"); - match self { + let job_id = match self { #[cfg(feature = "bg_redis")] Self::Redis(pool, _, _, _) => { - redis::enqueue(pool, class, queue, args, tags).await?; + Some(redis::enqueue(pool, class, queue, args, tags).await?) } #[cfg(feature = "bg_pg")] - Self::Postgres(pool, _, _, _) => { + Self::Postgres(pool, _, _, _) => Some( pg::enqueue( pool, &class, @@ -119,10 +123,10 @@ impl Queue { tags, ) .await - .map_err(Box::from)?; - } + .map_err(Box::from)?, + ), #[cfg(feature = "bg_sqlt")] - Self::Sqlite(pool, _, _, _) => { + Self::Sqlite(pool, _, _, _) => Some( sqlt::enqueue( pool, &class, @@ -132,11 +136,11 @@ impl Queue { tags, ) .await - .map_err(Box::from)?; - } - _ => {} - } - Ok(()) + .map_err(Box::from)?, + ), + _ => None, + }; + Ok(job_id) } /// Register a worker @@ -209,7 +213,8 @@ impl Queue { } _ => { tracing::error!( - "No queue provider is configured: compile with at least one queue provider feature" + "No queue provider is configured: compile with at least one queue provider \ + feature" ); } } @@ -381,20 +386,24 @@ impl Queue { } Self::None => { tracing::error!( - "No queue provider is configured: compile with at least one queue provider feature" + "No queue provider is configured: compile with at least one queue provider \ + feature" ); Err(Error::string("provider not configured")) } } } - /// Cancels jobs based on the given job name for the configured queue provider. + /// Cancels jobs based on the given job name for the configured queue + /// provider. /// /// # Errors - /// - If no queue provider is configured, it will return an error indicating the lack of configuration. - /// - If the Redis provider is selected, it will return an error stating that cancellation is not supported. - /// - Any error in the underlying provider's cancellation logic will propagate from the respective function. - /// + /// - If no queue provider is configured, it will return an error indicating + /// the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating + /// that cancellation is not supported. + /// - Any error in the underlying provider's cancellation logic will + /// propagate from the respective function. pub async fn cancel_jobs(&self, job_name: &str) -> Result<()> { tracing::info!(job_name = job_name, "Cancelling jobs by name"); @@ -407,20 +416,24 @@ impl Queue { Self::Redis(pool, _, _, _) => redis::cancel_jobs_by_name(pool, job_name).await, Self::None => { tracing::error!( - "No queue provider is configured: compile with at least one queue provider feature" + "No queue provider is configured: compile with at least one queue provider \ + feature" ); Err(Error::string("provider not configured")) } } } - /// Clears jobs older than a specified number of days for the configured queue provider. + /// Clears jobs older than a specified number of days for the configured + /// queue provider. /// /// # Errors - /// - If no queue provider is configured, it will return an error indicating the lack of configuration. - /// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported. - /// - Any error in the underlying provider's job clearing logic will propagate from the respective function. - /// + /// - If no queue provider is configured, it will return an error indicating + /// the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating + /// that clearing jobs is not supported. + /// - Any error in the underlying provider's job clearing logic will + /// propagate from the respective function. pub async fn clear_jobs_older_than( &self, age_days: i64, @@ -443,7 +456,8 @@ impl Queue { } Self::None => { tracing::error!( - "No queue provider is configured: compile with at least one queue provider feature" + "No queue provider is configured: compile with at least one queue provider \ + feature" ); Err(Error::string("provider not configured")) } @@ -453,9 +467,12 @@ impl Queue { /// Clears jobs based on their status for the configured queue provider. /// /// # Errors - /// - If no queue provider is configured, it will return an error indicating the lack of configuration. - /// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported. - /// - Any error in the underlying provider's job clearing logic will propagate from the respective function. + /// - If no queue provider is configured, it will return an error indicating + /// the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating + /// that clearing jobs is not supported. + /// - Any error in the underlying provider's job clearing logic will + /// propagate from the respective function. pub async fn clear_by_status(&self, status: Vec) -> Result<()> { tracing::info!(status = ?status, "Clearing jobs by status"); match self { @@ -467,7 +484,8 @@ impl Queue { Self::Redis(pool, _, _, _) => redis::clear_by_status(pool, status).await, Self::None => { tracing::error!( - "No queue provider is configured: compile with at least one queue provider feature" + "No queue provider is configured: compile with at least one queue provider \ + feature" ); Err(Error::string("provider not configured")) } @@ -477,9 +495,12 @@ impl Queue { /// Requeued job with the given minutes ages. /// /// # Errors - /// - If no queue provider is configured, it will return an error indicating the lack of configuration. - /// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported. - /// - Any error in the underlying provider's job clearing logic will propagate from the respective function. + /// - If no queue provider is configured, it will return an error indicating + /// the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating + /// that clearing jobs is not supported. + /// - Any error in the underlying provider's job clearing logic will + /// propagate from the respective function. pub async fn requeue(&self, age_minutes: &i64) -> Result<()> { tracing::info!(age_minutes = age_minutes, "Requeuing stale jobs"); match self { @@ -491,7 +512,8 @@ impl Queue { Self::Redis(pool, _, _, _) => redis::requeue(pool, age_minutes).await, Self::None => { tracing::error!( - "No queue provider is configured: compile with at least one queue provider feature" + "No queue provider is configured: compile with at least one queue provider \ + feature" ); Err(Error::string("provider not configured")) } @@ -500,12 +522,13 @@ impl Queue { /// Dumps the list of jobs to a YAML file at the specified path. /// - /// This function retrieves jobs from the queue, optionally filtered by their status, and - /// writes the job data to a YAML file. + /// This function retrieves jobs from the queue, optionally filtered by + /// their status, and writes the job data to a YAML file. /// /// # Errors /// - If the specified path cannot be created, an error will be returned. - /// - If the job retrieval or YAML serialization fails, an error will be returned. + /// - If the job retrieval or YAML serialization fails, an error will be + /// returned. /// - If there is an issue creating the dump file, an error will be returned pub async fn dump( &self, @@ -537,14 +560,16 @@ impl Queue { /// Imports jobs from a YAML file into the configured queue provider. /// - /// This function reads job data from a YAML file located at the specified `path` and imports - /// the jobs into the queue. + /// This function reads job data from a YAML file located at the specified + /// `path` and imports the jobs into the queue. /// /// # Errors - /// - If there is an issue opening or reading the YAML file, an error will be returned. - /// - If the queue provider is Redis or none, an error will be returned indicating the lack of support. - /// - If any issues occur while enqueuing the jobs, the function will return an error. - /// + /// - If there is an issue opening or reading the YAML file, an error will + /// be returned. + /// - If the queue provider is Redis or none, an error will be returned + /// indicating the lack of support. + /// - If any issues occur while enqueuing the jobs, the function will return + /// an error. pub async fn import(&self, path: &Path) -> Result<()> { tracing::info!(path = %path.display(), "Importing jobs from file"); @@ -579,7 +604,8 @@ impl Queue { } Self::None => { tracing::error!( - "No queue provider is configured: compile with at least one queue provider feature" + "No queue provider is configured: compile with at least one queue provider \ + feature" ); Err(Error::string("provider not configured")) } @@ -597,8 +623,8 @@ pub trait BackgroundWorker: Send + None } - /// Specifies tags associated with this worker. Workers might only process jobs - /// matching specific tags during startup. + /// Specifies tags associated with this worker. Workers might only process + /// jobs matching specific tags during startup. #[must_use] fn tags() -> Vec { Vec::new() @@ -615,26 +641,29 @@ pub trait BackgroundWorker: Send + let name = type_name.split("::").last().unwrap_or(type_name); name.to_upper_camel_case() } - async fn perform_later(ctx: &AppContext, args: A) -> crate::Result<()> + async fn perform_later(ctx: &AppContext, args: A) -> crate::Result> where Self: Sized, { - match &ctx.config.workers.mode { + let job_id = match &ctx.config.workers.mode { WorkerMode::BackgroundQueue => { if let Some(p) = &ctx.queue_provider { let tags = Self::tags(); let tags_option = if tags.is_empty() { None } else { Some(tags) }; + p.enqueue(Self::class_name(), Self::queue(), args, tags_option) - .await?; + .await? } else { tracing::error!( "perform_later: background queue is selected, but queue was not populated \ in context" ); + None } } WorkerMode::ForegroundBlocking => { Self::build(ctx).perform(args).await?; + None } WorkerMode::BackgroundAsync => { let dx = ctx.clone(); @@ -643,9 +672,10 @@ pub trait BackgroundWorker: Send + tracing::error!(err = err.to_string(), "worker failed to perform job"); } }); + None } - } - Ok(()) + }; + Ok(job_id) } async fn perform(&self, args: A) -> crate::Result<()>; @@ -729,11 +759,13 @@ pub async fn create_queue_provider(config: &Config) -> Result> )), } } else { - // tracing::warn!("Worker mode is BackgroundQueue but no queue configuration is present"); + // tracing::warn!("Worker mode is BackgroundQueue but no queue configuration is + // present"); Ok(None) } } else { - // tracing::debug!("Worker mode is not BackgroundQueue, skipping queue provider creation"); + // tracing::debug!("Worker mode is not BackgroundQueue, skipping queue provider + // creation"); Ok(None) } } @@ -765,6 +797,54 @@ mod tests { } } + #[tokio::test] + async fn queue_enqueue_returns_job_id() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + + // Test with SQLite provider - should return Some(job_id) + let qcfg = sqlite_config(tree_fs.root.as_path()); + let queue = sqlt::create_provider(&qcfg) + .await + .expect("create sqlite queue"); + + queue.setup().await.expect("setup sqlite db"); + + let job_id = queue + .enqueue( + "TestJob".to_string(), + None, + serde_json::json!({"test": "data"}), + None, + ) + .await + .expect("enqueue job"); + + assert!(job_id.is_some(), "SQLite provider should return job ID"); + let job_id = job_id.unwrap(); + assert!(!job_id.is_empty(), "Job ID should not be empty"); + assert!( + ulid::Ulid::from_string(&job_id).is_ok(), + "Job ID should be valid ULID" + ); + + // Test with None provider - should return None + let none_queue = Queue::None; + let job_id = none_queue + .enqueue( + "TestJob".to_string(), + None, + serde_json::json!({"test": "data"}), + None, + ) + .await + .expect("enqueue to None provider"); + + assert!(job_id.is_none(), "None provider should return None"); + } + #[tokio::test] async fn can_dump_jobs() { let tree_fs = tree_fs::TreeBuilder::default() diff --git a/src/bgworker/pg.rs b/src/bgworker/pg.rs index f1eb83145..515cff762 100644 --- a/src/bgworker/pg.rs +++ b/src/bgworker/pg.rs @@ -744,16 +744,20 @@ mod tests { ); let job_data: JobData = serde_json::json!({"user_id": 1}); - assert!(enqueue( + let job_id = enqueue( &pool, "PasswordChangeNotification", job_data, run_at, None, - None + None, ) - .await - .is_ok()); + .await; + assert!(job_id.is_ok()); + let job_id = job_id.unwrap(); + assert!(!job_id.is_empty()); + // Verify it's a valid ULID + assert!(ulid::Ulid::from_string(&job_id).is_ok()); let jobs = get_all_jobs(&pool).await; @@ -776,16 +780,19 @@ mod tests { ); let job_data: JobData = serde_json::json!({"user_id": 1}); - assert!(enqueue( + let job_id = enqueue( &pool, "PasswordChangeNotification", job_data, run_at, None, - None + None, ) - .await - .is_ok()); + .await; + assert!(job_id.is_ok()); + let job_id = job_id.unwrap(); + assert!(!job_id.is_empty()); + assert!(ulid::Ulid::from_string(&job_id).is_ok()); let job_before_dequeue = get_all_jobs(&pool) .await diff --git a/src/bgworker/redis.rs b/src/bgworker/redis.rs index 9aa8096a1..a6254f372 100644 --- a/src/bgworker/redis.rs +++ b/src/bgworker/redis.rs @@ -4,8 +4,6 @@ use std::{ time::Duration, }; -use super::{BackgroundWorker, JobStatus, Queue}; -use crate::{config::RedisQueueConfig, Error, Result}; use chrono::{DateTime, Utc}; use futures_util::FutureExt; use redis::{aio::MultiplexedConnection as Connection, AsyncCommands, Client, Script}; @@ -16,6 +14,9 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, trace}; use ulid::Ulid; +use super::{BackgroundWorker, JobStatus, Queue}; +use crate::{config::RedisQueueConfig, Error, Result}; + pub type RedisPool = Client; type JobId = String; type JobData = JsonValue; @@ -264,7 +265,7 @@ pub async fn enqueue( queue: Option, args: impl serde::Serialize + Send, tags: Option>, -) -> Result<()> { +) -> Result { let mut conn = get_connection(client).await?; let queue_name = queue.unwrap_or_else(|| "default".to_string()); let queue_key = format!("{QUEUE_KEY_PREFIX}{queue_name}"); @@ -287,7 +288,7 @@ pub async fn enqueue( let _: () = conn.set(&job_key, &job_json).await?; let _: () = conn.rpush(&queue_key, &job.id).await?; - Ok(()) + Ok(job.id) } const DEQUEUE_SCRIPT: &str = r#" @@ -534,9 +535,9 @@ fn should_include_job(job: &Job, status: Option<&Vec>, age_days: Opti /// Clears jobs based on their status from the Redis queue. /// -/// This function removes all jobs with a status matching any of the statuses provided -/// in the `status` argument. It searches through all queue keys and processing sets -/// and removes matching jobs. +/// This function removes all jobs with a status matching any of the statuses +/// provided in the `status` argument. It searches through all queue keys and +/// processing sets and removes matching jobs. /// /// # Errors /// @@ -620,9 +621,10 @@ pub async fn clear_by_status(client: &RedisPool, status: Vec) -> Resu /// Clears jobs older than the specified number of days from the Redis queue. /// -/// This function removes all jobs that were created more than `age_days` days ago -/// and have a status matching any of the statuses provided in the `status` argument. -/// It searches through all queue keys and processing sets and removes matching jobs. +/// This function removes all jobs that were created more than `age_days` days +/// ago and have a status matching any of the statuses provided in the `status` +/// argument. It searches through all queue keys and processing sets and removes +/// matching jobs. /// /// # Errors /// @@ -718,11 +720,12 @@ pub async fn clear_jobs_older_than( Ok(()) } -/// Requeues failed or stalled jobs that are older than a specified number of minutes. +/// Requeues failed or stalled jobs that are older than a specified number of +/// minutes. /// -/// This function finds jobs in processing sets that have been there for longer than -/// `age_minutes` and moves them back to their respective queues. This is useful for -/// recovering from job failures or worker crashes. +/// This function finds jobs in processing sets that have been there for longer +/// than `age_minutes` and moves them back to their respective queues. This is +/// useful for recovering from job failures or worker crashes. /// /// # Errors /// @@ -823,8 +826,9 @@ pub async fn requeue(client: &RedisPool, age_minutes: &i64) -> Result<()> { /// Cancels jobs with the specified name in the Redis queue. /// /// This function updates the status of jobs that match the provided `job_name` -/// from [`JobStatus::Queued`] to [`JobStatus::Cancelled`]. Jobs are searched for in all queue keys, -/// and only those that are currently in the [`JobStatus::Queued`] state will be affected. +/// from [`JobStatus::Queued`] to [`JobStatus::Cancelled`]. Jobs are searched +/// for in all queue keys, and only those that are currently in the +/// [`JobStatus::Queued`] state will be affected. /// /// # Errors /// @@ -921,11 +925,12 @@ pub async fn create_provider(qcfg: &RedisQueueConfig) -> Result { #[cfg(test)] mod tests { - use super::*; - use crate::tests_cfg::redis::setup_redis_container; use chrono::Utc; use testcontainers::{ContainerAsync, GenericImage}; + use super::*; + use crate::tests_cfg::redis::setup_redis_container; + async fn setup_redis() -> (RedisPool, ContainerAsync) { let (redis_url, container) = setup_redis_container().await; let client = connect(&redis_url).expect("connect to redis"); @@ -1038,11 +1043,14 @@ mod tests { // Test enqueue let args = serde_json::json!({"user_id": 42}); - assert!( - enqueue(&client, "PasswordReset".to_string(), None, args, None) - .await - .is_ok() - ); + + let job_id = enqueue(&client, "PasswordReset".to_string(), None, args, None).await; + + assert!(job_id.is_ok()); + let job_id = job_id.unwrap(); + // Verify we got a valid ULID as job ID + assert!(!job_id.is_empty()); + assert!(ulid::Ulid::from_string(&job_id).is_ok()); // Verify job was created let jobs = get_all_jobs(&client).await; @@ -1060,15 +1068,17 @@ mod tests { // Test enqueue with custom queue let args = serde_json::json!({"email": "user@example.com"}); - assert!(enqueue( + let job_id = enqueue( &client, "EmailNotification".to_string(), Some("mailer".to_string()), args, - None + None, ) - .await - .is_ok()); + .await; + assert!(job_id.is_ok()); + let job_id = job_id.unwrap(); + assert!(!job_id.is_empty()); // Verify job was created in correct queue first let mut conn = get_test_connection(&client).await; diff --git a/src/bgworker/sqlt.rs b/src/bgworker/sqlt.rs index 83f0590e6..15d0b1f6a 100644 --- a/src/bgworker/sqlt.rs +++ b/src/bgworker/sqlt.rs @@ -834,16 +834,20 @@ mod tests { let job_data = serde_json::json!({"user_id": 1}); let tags = Some(vec!["email".to_string(), "notification".to_string()]); - assert!(enqueue( + let job_id = enqueue( &pool, "PasswordChangeNotification", job_data, run_at, None, - tags + tags, ) - .await - .is_ok()); + .await; + assert!(job_id.is_ok()); + let job_id = job_id.unwrap(); + assert!(!job_id.is_empty()); + // Verify it's a valid ULID + assert!(ulid::Ulid::from_string(&job_id).is_ok()); let jobs = get_all_jobs(&pool).await; @@ -885,16 +889,19 @@ mod tests { ); let job_data = serde_json::json!({"user_id": 1}); - assert!(enqueue( + let job_id = enqueue( &pool, "PasswordChangeNotification", job_data, run_at, None, - None + None, ) - .await - .is_ok()); + .await; + assert!(job_id.is_ok()); + let job_id = job_id.unwrap(); + assert!(!job_id.is_empty()); + assert!(ulid::Ulid::from_string(&job_id).is_ok()); let job_before_dequeue = get_all_jobs(&pool) .await