From f0f4660928c4d92e15c71f71d32b8d28c09a9353 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Mon, 1 Dec 2025 17:21:02 +0100 Subject: [PATCH 1/6] Node service operator plugins. --- CLI.md | 2 + linera-service/src/cli/command.rs | 13 +- linera-service/src/cli/main.rs | 29 ++- linera-service/src/lib.rs | 1 + linera-service/src/task_processor.rs | 323 +++++++++++++++++++++++++++ 5 files changed, 364 insertions(+), 4 deletions(-) create mode 100644 linera-service/src/task_processor.rs diff --git a/CLI.md b/CLI.md index caad0786bb48..c874d6c4086a 100644 --- a/CLI.md +++ b/CLI.md @@ -736,6 +736,8 @@ Run a GraphQL service to explore and extend the chains of the wallet Default value: `0` * `--port ` — The port on which to run the server +* `--operator-application-ids ` — Application IDs of operator applications to watch. When specified, a task processor is started alongside the node service +* `--operators ` — Supported operators and their binary paths. Format: `name=path` or just `name` (uses name as path). Example: `--operators my-operator=/path/to/binary` diff --git a/linera-service/src/cli/command.rs b/linera-service/src/cli/command.rs index c608b0f79525..54245408f7d5 100644 --- a/linera-service/src/cli/command.rs +++ b/linera-service/src/cli/command.rs @@ -20,7 +20,7 @@ use linera_client::{ }; use linera_rpc::config::CrossChainConfig; -use crate::cli::validator; +use crate::{cli::validator, task_processor::parse_operator}; const DEFAULT_TOKENS_PER_CHAIN: Amount = Amount::from_millis(100); const DEFAULT_TRANSACTIONS_PER_BLOCK: usize = 1; @@ -723,6 +723,17 @@ pub enum ClientCommand { #[cfg(with_metrics)] #[arg(long)] metrics_port: NonZeroU16, + + /// Application IDs of operator applications to watch. + /// When specified, a task processor is started alongside the node service. + #[arg(long = "operator-application-ids")] + operator_application_ids: Vec, + + /// Supported operators and their binary paths. + /// Format: `name=path` or just `name` (uses name as path). + /// Example: `--operators my-operator=/path/to/binary` + #[arg(long = "operators", value_parser = parse_operator)] + operators: Vec<(String, PathBuf)>, }, /// Run a GraphQL service that exposes a faucet where users can claim tokens. diff --git a/linera-service/src/cli/main.rs b/linera-service/src/cli/main.rs index 92919de9359d..4079d5119fd8 100644 --- a/linera-service/src/cli/main.rs +++ b/linera-service/src/cli/main.rs @@ -71,6 +71,7 @@ use linera_service::{ node_service::NodeService, project::{self, Project}, storage::{CommonStorageOptions, Runnable, RunnableWithStore, StorageConfig}, + task_processor::TaskProcessor, util, wallet, }; use linera_storage::{DbStorage, Storage}; @@ -1112,20 +1113,42 @@ impl Runnable for Job { port, #[cfg(with_metrics)] metrics_port, + operator_application_ids, + operators, } => { let context = options.create_client_context(storage, wallet, signer.into_value()); let default_chain = context.wallet().default_chain(); + let chain_id = + default_chain.expect("Service requires a default chain in the wallet"); + + let cancellation_token = CancellationToken::new(); + tokio::spawn(listen_for_shutdown_signals(cancellation_token.clone())); + + // Start the task processor if operator applications are specified. + if !operator_application_ids.is_empty() { + let operators = Arc::new(operators.into_iter().collect()); + info!("Supported operators: {:?}", operators); + + let chain_client = context.make_chain_client(chain_id); + let processor = TaskProcessor::new( + chain_id, + operator_application_ids, + chain_client, + cancellation_token.clone(), + operators, + ); + tokio::spawn(processor.run()); + } + let service = NodeService::new( config, port, #[cfg(with_metrics)] metrics_port, - default_chain, + Some(chain_id), context, ); - let cancellation_token = CancellationToken::new(); - tokio::spawn(listen_for_shutdown_signals(cancellation_token.clone())); service.run(cancellation_token).await?; } diff --git a/linera-service/src/lib.rs b/linera-service/src/lib.rs index 6b8481eb5057..c459eb95bd75 100644 --- a/linera-service/src/lib.rs +++ b/linera-service/src/lib.rs @@ -10,6 +10,7 @@ pub mod config; pub mod node_service; pub mod project; pub mod storage; +pub mod task_processor; pub mod tracing; pub mod util; pub mod wallet; diff --git a/linera-service/src/task_processor.rs b/linera-service/src/task_processor.rs new file mode 100644 index 000000000000..7b55b3098f7b --- /dev/null +++ b/linera-service/src/task_processor.rs @@ -0,0 +1,323 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Task processor for executing off-chain operators on behalf of on-chain applications. +//! +//! The task processor watches specified applications for requests to execute off-chain tasks, +//! runs external operator binaries, and submits the results back to the chain. + +use std::{ + cmp::Reverse, + collections::{BTreeMap, BinaryHeap}, + path::PathBuf, + sync::Arc, +}; + +use async_graphql::{scalar, InputType as _}; +use futures::{stream::StreamExt, FutureExt}; +use linera_base::{ + data_types::{TimeDelta, Timestamp}, + identifiers::{ApplicationId, ChainId}, +}; +use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info}; + +/// A map from operator names to their binary paths. +pub type OperatorMap = Arc>; + +/// The off-chain actions requested by the service of an on-chain application. +/// +/// On-chain applications should be ready to respond to GraphQL queries of the form: +/// ```ignore +/// query { +/// nextActions(lastRequestedCallback: Timestamp, now: Timestamp!): ProcessorActions! +/// } +/// +/// query { +/// processTaskOutcome(outcome: TaskOutcome!) +/// } +/// ``` +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct ProcessorActions { + /// The application is requesting to be called back no later than the given timestamp. + pub request_callback: Option, + /// The application is requesting the execution of the given tasks. + pub execute_tasks: Vec, +} + +scalar!(ProcessorActions); + +/// An off-chain task requested by an on-chain application. +#[derive(Debug, Serialize, Deserialize)] +pub struct Task { + /// The operator handling the task. + pub operator: String, + /// The input argument in JSON. + pub input: String, +} + +/// The result of executing an off-chain operator. +#[derive(Debug, Serialize, Deserialize)] +pub struct TaskOutcome { + /// The operator handling the task. + pub operator: String, + /// The JSON output. + pub output: String, +} + +scalar!(TaskOutcome); + +/// Parse an operator mapping in the format `name=path` or just `name`. +/// If only `name` is provided, the path defaults to the name itself. +pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> { + if let Some((name, path)) = s.split_once('=') { + Ok((name.to_string(), PathBuf::from(path))) + } else { + Ok((s.to_string(), PathBuf::from(s))) + } +} + +type Deadline = Reverse<(Timestamp, Option)>; + +/// A task processor that watches applications and executes off-chain operators. +pub struct TaskProcessor { + chain_id: ChainId, + application_ids: Vec, + last_requested_callbacks: BTreeMap, + chain_client: ChainClient, + cancellation_token: CancellationToken, + notifications: NotificationStream, + outcome_sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>, + outcome_receiver: mpsc::UnboundedReceiver<(ApplicationId, TaskOutcome)>, + deadlines: BinaryHeap, + operators: OperatorMap, +} + +impl TaskProcessor { + /// Creates a new task processor. + pub fn new( + chain_id: ChainId, + application_ids: Vec, + chain_client: ChainClient, + cancellation_token: CancellationToken, + operators: OperatorMap, + ) -> Self { + let notifications = chain_client.subscribe().expect("client subscription"); + let (outcome_sender, outcome_receiver) = mpsc::unbounded_channel(); + Self { + chain_id, + application_ids, + last_requested_callbacks: BTreeMap::new(), + chain_client, + cancellation_token, + outcome_sender, + outcome_receiver, + notifications, + deadlines: BinaryHeap::new(), + operators, + } + } + + /// Runs the task processor until the cancellation token is triggered. + pub async fn run(mut self) { + info!("Watching for notifications for chain {}", self.chain_id); + self.process_actions(self.application_ids.clone()).await; + loop { + select! { + Some(notification) = self.notifications.next() => { + if let Reason::NewBlock { .. } = notification.reason { + debug!("Processing notification"); + self.process_actions(self.application_ids.clone()).await; + } + } + _ = tokio::time::sleep(Self::duration_until_next_deadline(&self.deadlines)) => { + debug!("Processing event"); + let application_ids = self.process_events(); + self.process_actions(application_ids).await; + } + Some((application_id, outcome)) = self.outcome_receiver.recv() => { + if let Err(e) = self.submit_task_outcome(application_id, &outcome).await { + error!("Error while processing task outcome {outcome:?}: {e}"); + } + } + _ = self.cancellation_token.cancelled().fuse() => { + break; + } + } + } + debug!("Notification stream ended."); + } + + fn duration_until_next_deadline(deadlines: &BinaryHeap) -> tokio::time::Duration { + deadlines + .peek() + .map_or(tokio::time::Duration::MAX, |Reverse((x, _))| { + x.delta_since(Timestamp::now()).as_duration() + }) + } + + fn process_events(&mut self) -> Vec { + let now = Timestamp::now(); + let mut application_ids = Vec::new(); + while let Some(deadline) = self.deadlines.pop() { + if let Reverse((_, Some(id))) = deadline { + application_ids.push(id); + } + let Some(Reverse((ts, _))) = self.deadlines.peek() else { + break; + }; + if *ts > now { + break; + } + } + application_ids + } + + async fn process_actions(&mut self, application_ids: Vec) { + for application_id in application_ids { + debug!("Processing actions for {application_id}"); + let now = Timestamp::now(); + let last_requested_callback = + self.last_requested_callbacks.get(&application_id).cloned(); + let actions = match self + .query_actions(application_id, last_requested_callback, now) + .await + { + Ok(actions) => actions, + Err(error) => { + error!("Error reading application actions: {error}"); + // Retry in at most 1 minute. + self.deadlines.push(Reverse(( + now.saturating_add(TimeDelta::from_secs(60)), + None, + ))); + continue; + } + }; + if let Some(timestamp) = actions.request_callback { + self.last_requested_callbacks.insert(application_id, now); + self.deadlines + .push(Reverse((timestamp, Some(application_id)))); + } + for task in actions.execute_tasks { + let sender = self.outcome_sender.clone(); + let operators = self.operators.clone(); + tokio::spawn(async move { + if let Err(e) = Self::execute_task( + application_id, + task.operator, + task.input, + sender, + operators, + ) + .await + { + error!("Error executing task for {application_id}: {e}"); + } + }); + } + } + } + + async fn execute_task( + application_id: ApplicationId, + operator: String, + input: String, + sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>, + operators: OperatorMap, + ) -> Result<(), anyhow::Error> { + let binary_path = operators + .get(&operator) + .ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?; + debug!("Executing task {operator} ({binary_path:?}) for {application_id}"); + let mut child = Command::new(binary_path) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .spawn()?; + + let mut stdin = child.stdin.take().expect("stdin should be configured"); + stdin.write_all(input.as_bytes()).await?; + drop(stdin); + + let output = child.wait_with_output().await?; + anyhow::ensure!( + output.status.success(), + "operator {} exited with status: {}", + operator, + output.status + ); + let outcome = TaskOutcome { + operator, + output: String::from_utf8_lossy(&output.stdout).into(), + }; + debug!("Done executing task for {application_id}"); + sender.send((application_id, outcome))?; + Ok(()) + } + + async fn query_actions( + &mut self, + application_id: ApplicationId, + last_requested_callback: Option, + now: Timestamp, + ) -> Result { + let query = format!( + "query {{ nextActions(lastRequestedCallback: {}, now: {}) }}", + last_requested_callback.to_value(), + now.to_value(), + ); + let bytes = serde_json::to_vec(&json!({"query": query}))?; + let query = linera_execution::Query::User { + application_id, + bytes, + }; + let linera_execution::QueryOutcome { + response, + operations: _, + } = self.chain_client.query_application(query, None).await?; + let linera_execution::QueryResponse::User(response) = response else { + anyhow::bail!("cannot get a system response for a user query"); + }; + let mut response: serde_json::Value = serde_json::from_slice(&response)?; + let actions: ProcessorActions = + serde_json::from_value(response["data"]["nextActions"].take())?; + Ok(actions) + } + + async fn submit_task_outcome( + &mut self, + application_id: ApplicationId, + task_outcome: &TaskOutcome, + ) -> Result<(), anyhow::Error> { + info!("Submitting task outcome for {application_id}: {task_outcome:?}"); + let query = format!( + "query {{ processTaskOutcome(outcome: {{ operator: {}, output: {} }}) }}", + task_outcome.operator.to_value(), + task_outcome.output.to_value(), + ); + let bytes = serde_json::to_vec(&json!({"query": query}))?; + let query = linera_execution::Query::User { + application_id, + bytes, + }; + let linera_execution::QueryOutcome { + response: _, + operations, + } = self.chain_client.query_application(query, None).await?; + if !operations.is_empty() { + if let Err(e) = self + .chain_client + .execute_operations(operations, vec![]) + .await + { + // TODO: handle leader timeouts. + error!("Failed to execute on-chain operations for {application_id}: {e}"); + } + } + Ok(()) + } +} From 0079ce2ebe8d59d97379e58c123f57aaddf24695 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Mon, 1 Dec 2025 18:15:59 +0100 Subject: [PATCH 2/6] Add a test. --- linera-service/src/cli_wrappers/wallet.rs | 18 ++++++++ linera-service/tests/local_net_tests.rs | 55 +++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/linera-service/src/cli_wrappers/wallet.rs b/linera-service/src/cli_wrappers/wallet.rs index c2a6624fbec2..8e01449129f7 100644 --- a/linera-service/src/cli_wrappers/wallet.rs +++ b/linera-service/src/cli_wrappers/wallet.rs @@ -471,6 +471,18 @@ impl ClientWrapper { &self, port: impl Into>, process_inbox: ProcessInbox, + ) -> Result { + self.run_node_service_with_options(port, process_inbox, &[], &[]) + .await + } + + /// Runs `linera service` with optional task processor configuration. + pub async fn run_node_service_with_options( + &self, + port: impl Into>, + process_inbox: ProcessInbox, + operator_application_ids: &[ApplicationId], + operators: &[(String, PathBuf)], ) -> Result { let port = port.into().unwrap_or(8080); let mut command = self.command().await?; @@ -481,6 +493,12 @@ impl ClientWrapper { if let Ok(var) = env::var(CLIENT_SERVICE_ENV) { command.args(var.split_whitespace()); } + for app_id in operator_application_ids { + command.args(["--operator-application-ids", &app_id.to_string()]); + } + for (name, path) in operators { + command.args(["--operators", &format!("{}={}", name, path.display())]); + } let child = command .args(["--port".to_string(), port.to_string()]) .spawn_into()?; diff --git a/linera-service/tests/local_net_tests.rs b/linera-service/tests/local_net_tests.rs index fe8358322388..ae9f38c098bc 100644 --- a/linera-service/tests/local_net_tests.rs +++ b/linera-service/tests/local_net_tests.rs @@ -1232,3 +1232,58 @@ impl EthereumTrackerApp { self.0.mutate(mutation).await.unwrap(); } } + +/// Test that the node service can start with task processor options. +/// This is a basic smoke test that verifies the CLI arguments are accepted +/// and the service starts correctly with task processor configuration. +#[cfg(feature = "storage-service")] +#[test_log::test(tokio::test)] +async fn test_node_service_with_task_processor_options() -> Result<()> { + use std::{io::Write, os::unix::fs::PermissionsExt}; + + use linera_base::identifiers::ApplicationId; + + let _guard = INTEGRATION_TEST_GUARD.lock().await; + tracing::info!("Starting test {}", test_name!()); + + let config = LocalNetConfig::new_test(Database::Service, Network::Grpc); + let (mut net, client) = config.instantiate().await?; + + // Publish and create the counter application. + let example_dir = ClientWrapper::example_path("counter")?; + let app_id_str = client + .project_publish(example_dir, vec![], None, &0) + .await?; + let app_id: ApplicationId = app_id_str.trim().parse()?; + + // Create a simple echo operator script. + let tmp_dir = tempfile::tempdir()?; + let operator_path = tmp_dir.path().join("echo-operator"); + { + let mut file = std::fs::File::create(&operator_path)?; + // Script that reads stdin and writes it to stdout. + writeln!(file, "#!/bin/sh")?; + writeln!(file, "cat")?; + } + std::fs::set_permissions(&operator_path, std::fs::Permissions::from_mode(0o755))?; + + // Start the node service with task processor options. + let port = get_node_port().await; + let operators = vec![("echo".to_string(), operator_path)]; + let mut node_service = client + .run_node_service_with_options(port, ProcessInbox::Skip, &[app_id], &operators) + .await?; + + node_service.ensure_is_running()?; + + // The counter app doesn't implement the task processor interface (nextActions, etc.), + // so the task processor will fail to query it. But we've verified that: + // 1. The CLI arguments are accepted. + // 2. The node service starts successfully. + // 3. The task processor is initialized (even if it can't query the app). + + net.ensure_is_running().await?; + net.terminate().await?; + + Ok(()) +} From 6fa868db10dd68417ddba5d3d37bccccabac569d Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Tue, 2 Dec 2025 11:48:25 +0100 Subject: [PATCH 3/6] Add an example app to test the task processor. --- examples/Cargo.lock | 10 ++ examples/Cargo.toml | 1 + examples/task-processor/Cargo.toml | 22 ++++ examples/task-processor/src/contract.rs | 69 +++++++++++ examples/task-processor/src/lib.rs | 33 ++++++ examples/task-processor/src/service.rs | 146 ++++++++++++++++++++++++ examples/task-processor/src/state.rs | 26 +++++ linera-service/tests/local_net_tests.rs | 63 ++++++++-- 8 files changed, 358 insertions(+), 12 deletions(-) create mode 100644 examples/task-processor/Cargo.toml create mode 100644 examples/task-processor/src/contract.rs create mode 100644 examples/task-processor/src/lib.rs create mode 100644 examples/task-processor/src/service.rs create mode 100644 examples/task-processor/src/state.rs diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 9975f367310c..ca83e3f544ce 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -6145,6 +6145,16 @@ version = "0.12.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" +[[package]] +name = "task-processor" +version = "0.1.0" +dependencies = [ + "async-graphql", + "linera-sdk", + "serde", + "serde_json", +] + [[package]] name = "tempfile" version = "3.20.0" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index e5e4fc1b4ef3..cb9d0a84e131 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -17,6 +17,7 @@ members = [ "non-fungible", "rfq", "social", + "task-processor", ] [workspace.dependencies] diff --git a/examples/task-processor/Cargo.toml b/examples/task-processor/Cargo.toml new file mode 100644 index 000000000000..24be32e71c39 --- /dev/null +++ b/examples/task-processor/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "task-processor" +version = "0.1.0" +authors = ["Linera "] +edition = "2021" + +[dependencies] +async-graphql.workspace = true +linera-sdk.workspace = true +serde.workspace = true +serde_json.workspace = true + +[dev-dependencies] +linera-sdk = { workspace = true, features = ["test"] } + +[[bin]] +name = "task_processor_contract" +path = "src/contract.rs" + +[[bin]] +name = "task_processor_service" +path = "src/service.rs" diff --git a/examples/task-processor/src/contract.rs b/examples/task-processor/src/contract.rs new file mode 100644 index 000000000000..3d874e3e001b --- /dev/null +++ b/examples/task-processor/src/contract.rs @@ -0,0 +1,69 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(target_arch = "wasm32", no_main)] + +mod state; + +use linera_sdk::{ + linera_base_types::WithContractAbi, + views::{RootView, View}, + Contract, ContractRuntime, +}; +use task_processor::{TaskProcessorAbi, TaskProcessorOperation}; + +use self::state::{PendingTask, TaskProcessorState}; + +pub struct TaskProcessorContract { + state: TaskProcessorState, + runtime: ContractRuntime, +} + +linera_sdk::contract!(TaskProcessorContract); + +impl WithContractAbi for TaskProcessorContract { + type Abi = TaskProcessorAbi; +} + +impl Contract for TaskProcessorContract { + type Message = (); + type InstantiationArgument = (); + type Parameters = (); + type EventValue = (); + + async fn load(runtime: ContractRuntime) -> Self { + let state = TaskProcessorState::load(runtime.root_view_storage_context()) + .await + .expect("Failed to load state"); + TaskProcessorContract { state, runtime } + } + + async fn instantiate(&mut self, _argument: ()) { + self.runtime.application_parameters(); + } + + async fn execute_operation(&mut self, operation: TaskProcessorOperation) { + match operation { + TaskProcessorOperation::RequestTask { operator, input } => { + self.state + .pending_tasks + .push_back(PendingTask { operator, input }); + } + TaskProcessorOperation::StoreResult { result } => { + // Remove the first pending task (the one that was just processed). + self.state.pending_tasks.delete_front(); + self.state.results.push_back(result); + let count = self.state.task_count.get() + 1; + self.state.task_count.set(count); + } + } + } + + async fn execute_message(&mut self, _message: ()) { + panic!("Task processor application doesn't support any cross-chain messages"); + } + + async fn store(mut self) { + self.state.save().await.expect("Failed to save state"); + } +} diff --git a/examples/task-processor/src/lib.rs b/examples/task-processor/src/lib.rs new file mode 100644 index 000000000000..b9991a3fb79b --- /dev/null +++ b/examples/task-processor/src/lib.rs @@ -0,0 +1,33 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! ABI of the Task Processor Example Application. +//! +//! This application demonstrates the off-chain task processor functionality. +//! It requests tasks to be executed by an external "echo" operator and stores +//! the results in its state. + +use async_graphql::{Request, Response}; +use linera_sdk::linera_base_types::{ContractAbi, ServiceAbi}; +use serde::{Deserialize, Serialize}; + +pub struct TaskProcessorAbi; + +/// Operations that can be executed on the contract. +#[derive(Debug, Deserialize, Serialize)] +pub enum TaskProcessorOperation { + /// Request a task to be processed by the given operator with the given input. + RequestTask { operator: String, input: String }, + /// Store the result of a completed task. + StoreResult { result: String }, +} + +impl ContractAbi for TaskProcessorAbi { + type Operation = TaskProcessorOperation; + type Response = (); +} + +impl ServiceAbi for TaskProcessorAbi { + type Query = Request; + type QueryResponse = Response; +} diff --git a/examples/task-processor/src/service.rs b/examples/task-processor/src/service.rs new file mode 100644 index 000000000000..97f98a2a956c --- /dev/null +++ b/examples/task-processor/src/service.rs @@ -0,0 +1,146 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(target_arch = "wasm32", no_main)] + +mod state; + +use std::sync::Arc; + +use async_graphql::{EmptySubscription, InputObject, Object, Request, Response, Schema}; +use linera_sdk::{ + linera_base_types::{Timestamp, WithServiceAbi}, + views::View, + Service, ServiceRuntime, +}; +use task_processor::{TaskProcessorAbi, TaskProcessorOperation}; + +use self::state::TaskProcessorState; + +pub struct TaskProcessorService { + state: Arc, + runtime: Arc>, +} + +linera_sdk::service!(TaskProcessorService); + +impl WithServiceAbi for TaskProcessorService { + type Abi = TaskProcessorAbi; +} + +impl Service for TaskProcessorService { + type Parameters = (); + + async fn new(runtime: ServiceRuntime) -> Self { + let state = TaskProcessorState::load(runtime.root_view_storage_context()) + .await + .expect("Failed to load state"); + TaskProcessorService { + state: Arc::new(state), + runtime: Arc::new(runtime), + } + } + + async fn handle_query(&self, request: Request) -> Response { + let schema = Schema::build( + QueryRoot { + state: self.state.clone(), + runtime: self.runtime.clone(), + }, + MutationRoot { + runtime: self.runtime.clone(), + }, + EmptySubscription, + ) + .finish(); + schema.execute(request).await + } +} + +struct QueryRoot { + state: Arc, + runtime: Arc>, +} + +/// The actions requested by this application for off-chain processing. +#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] +struct ProcessorActions { + /// Request a callback at the given timestamp. + request_callback: Option, + /// Tasks to execute off-chain. + execute_tasks: Vec, +} + +async_graphql::scalar!(ProcessorActions); + +/// A task to be executed by an off-chain operator. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Task { + /// The name of the operator to execute. + operator: String, + /// The input to pass to the operator (JSON string). + input: String, +} + +/// The outcome of executing an off-chain task. +#[derive(Debug, InputObject, serde::Serialize, serde::Deserialize)] +struct TaskOutcome { + /// The name of the operator that executed the task. + operator: String, + /// The output from the operator (JSON string). + output: String, +} + +#[Object] +impl QueryRoot { + /// Returns the current task count. + async fn task_count(&self) -> u64 { + *self.state.task_count.get() + } + + /// Returns the pending tasks and callback requests for the task processor. + async fn next_actions( + &self, + _last_requested_callback: Option, + _now: Timestamp, + ) -> ProcessorActions { + let mut actions = ProcessorActions::default(); + + // Get all pending tasks from the queue. + let count = self.state.pending_tasks.count(); + if let Ok(pending_tasks) = self.state.pending_tasks.read_front(count).await { + for pending in pending_tasks { + actions.execute_tasks.push(Task { + operator: pending.operator, + input: pending.input, + }); + } + } + + actions + } + + /// Processes the outcome of a completed task and schedules operations. + async fn process_task_outcome(&self, outcome: TaskOutcome) -> bool { + // Schedule an operation to store the result and remove the pending task. + let operation = TaskProcessorOperation::StoreResult { + result: outcome.output, + }; + self.runtime.schedule_operation(&operation); + true + } +} + +struct MutationRoot { + runtime: Arc>, +} + +#[Object] +impl MutationRoot { + /// Requests a task to be processed by an off-chain operator. + async fn request_task(&self, operator: String, input: String) -> [u8; 0] { + let operation = TaskProcessorOperation::RequestTask { operator, input }; + self.runtime.schedule_operation(&operation); + [] + } +} diff --git a/examples/task-processor/src/state.rs b/examples/task-processor/src/state.rs new file mode 100644 index 000000000000..183a8a8b6f8d --- /dev/null +++ b/examples/task-processor/src/state.rs @@ -0,0 +1,26 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use linera_sdk::views::{linera_views, QueueView, RegisterView, RootView, ViewStorageContext}; +use serde::{Deserialize, Serialize}; + +/// A pending task stored in the application state. +#[derive(Clone, Debug, Serialize, Deserialize, async_graphql::SimpleObject)] +pub struct PendingTask { + /// The operator to execute the task. + pub operator: String, + /// The input to pass to the operator. + pub input: String, +} + +/// The application state. +#[derive(RootView, async_graphql::SimpleObject)] +#[view(context = ViewStorageContext)] +pub struct TaskProcessorState { + /// Pending tasks to be executed. + pub pending_tasks: QueueView, + /// Results from completed tasks. + pub results: QueueView, + /// Counter for tracking how many tasks have been processed. + pub task_count: RegisterView, +} diff --git a/linera-service/tests/local_net_tests.rs b/linera-service/tests/local_net_tests.rs index ae9f38c098bc..be47ac3c9a52 100644 --- a/linera-service/tests/local_net_tests.rs +++ b/linera-service/tests/local_net_tests.rs @@ -1238,30 +1238,39 @@ impl EthereumTrackerApp { /// and the service starts correctly with task processor configuration. #[cfg(feature = "storage-service")] #[test_log::test(tokio::test)] -async fn test_node_service_with_task_processor_options() -> Result<()> { +async fn test_node_service_with_task_processor() -> Result<()> { use std::{io::Write, os::unix::fs::PermissionsExt}; - use linera_base::identifiers::ApplicationId; + use linera_base::{abi::ContractAbi, identifiers::ApplicationId}; + + // Dummy ABI type for the task-processor application. + // The actual ABI doesn't matter for the GraphQL interface. + struct TaskProcessorAbi; + + impl ContractAbi for TaskProcessorAbi { + type Operation = (); + type Response = (); + } let _guard = INTEGRATION_TEST_GUARD.lock().await; tracing::info!("Starting test {}", test_name!()); let config = LocalNetConfig::new_test(Database::Service, Network::Grpc); let (mut net, client) = config.instantiate().await?; + let chain = client.load_wallet()?.default_chain().unwrap(); - // Publish and create the counter application. - let example_dir = ClientWrapper::example_path("counter")?; + // Publish and create the task-processor example application. + let example_dir = ClientWrapper::example_path("task-processor")?; let app_id_str = client - .project_publish(example_dir, vec![], None, &0) + .project_publish(example_dir, vec![], None, &()) .await?; let app_id: ApplicationId = app_id_str.trim().parse()?; - // Create a simple echo operator script. + // Create an echo operator script that reads stdin and writes it to stdout. let tmp_dir = tempfile::tempdir()?; let operator_path = tmp_dir.path().join("echo-operator"); { let mut file = std::fs::File::create(&operator_path)?; - // Script that reads stdin and writes it to stdout. writeln!(file, "#!/bin/sh")?; writeln!(file, "cat")?; } @@ -1276,11 +1285,41 @@ async fn test_node_service_with_task_processor_options() -> Result<()> { node_service.ensure_is_running()?; - // The counter app doesn't implement the task processor interface (nextActions, etc.), - // so the task processor will fail to query it. But we've verified that: - // 1. The CLI arguments are accepted. - // 2. The node service starts successfully. - // 3. The task processor is initialized (even if it can't query the app). + // Query the initial task count (should be 0). + let app = + node_service.make_application::(&chain, &app_id.with_abi::())?; + let task_count: u64 = app.query_json("taskCount").await?; + assert_eq!(task_count, 0); + + // Submit a mutation to request a task. + app.mutate(r#"requestTask(operator: "echo", input: "hello world")"#) + .await?; + + // The task should now be pending. Query nextActions to verify. + let response = app.query("nextActions(now: 0)").await?; + let actions = &response["nextActions"]; + assert!( + actions["execute_tasks"] + .as_array() + .is_some_and(|arr| !arr.is_empty()), + "Expected at least one task in execute_tasks, got: {actions}" + ); + + // Give the task processor time to execute the task and submit the outcome. + // The processor polls on new blocks and timeouts, so we need to wait. + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + + // Check that the task was processed (task count should increase). + // Note: This may take some time as it depends on block processing. + let mut task_count = 0u64; + for _ in 0..10 { + task_count = app.query_json("taskCount").await?; + if task_count > 0 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + assert_eq!(task_count, 1, "Expected task count to be 1 after processing"); net.ensure_is_running().await?; net.terminate().await?; From 62c4f15eaf85a1fa117040ff0959a8b95cefa590 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Tue, 2 Dec 2025 11:52:53 +0100 Subject: [PATCH 4/6] Avoid using sleep. --- examples/Cargo.lock | 1 - examples/task-processor/Cargo.toml | 1 - linera-service/tests/local_net_tests.rs | 25 ++++++++++++------------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/examples/Cargo.lock b/examples/Cargo.lock index ca83e3f544ce..47db096b4102 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -6152,7 +6152,6 @@ dependencies = [ "async-graphql", "linera-sdk", "serde", - "serde_json", ] [[package]] diff --git a/examples/task-processor/Cargo.toml b/examples/task-processor/Cargo.toml index 24be32e71c39..a9214c858a9b 100644 --- a/examples/task-processor/Cargo.toml +++ b/examples/task-processor/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" async-graphql.workspace = true linera-sdk.workspace = true serde.workspace = true -serde_json.workspace = true [dev-dependencies] linera-sdk = { workspace = true, features = ["test"] } diff --git a/linera-service/tests/local_net_tests.rs b/linera-service/tests/local_net_tests.rs index be47ac3c9a52..a63f72533d00 100644 --- a/linera-service/tests/local_net_tests.rs +++ b/linera-service/tests/local_net_tests.rs @@ -1285,6 +1285,9 @@ async fn test_node_service_with_task_processor() -> Result<()> { node_service.ensure_is_running()?; + // Subscribe to notifications for the chain. + let mut notifications = Box::pin(node_service.notifications(chain).await?); + // Query the initial task count (should be 0). let app = node_service.make_application::(&chain, &app_id.with_abi::())?; @@ -1292,9 +1295,13 @@ async fn test_node_service_with_task_processor() -> Result<()> { assert_eq!(task_count, 0); // Submit a mutation to request a task. + // This creates a block with the RequestTask operation. app.mutate(r#"requestTask(operator: "echo", input: "hello world")"#) .await?; + // Wait for the block containing the RequestTask operation. + notifications.wait_for_block(None).await?; + // The task should now be pending. Query nextActions to verify. let response = app.query("nextActions(now: 0)").await?; let actions = &response["nextActions"]; @@ -1305,20 +1312,12 @@ async fn test_node_service_with_task_processor() -> Result<()> { "Expected at least one task in execute_tasks, got: {actions}" ); - // Give the task processor time to execute the task and submit the outcome. - // The processor polls on new blocks and timeouts, so we need to wait. - tokio::time::sleep(std::time::Duration::from_secs(3)).await; + // The task processor will execute the task and submit a StoreResult operation. + // Wait for the block containing the StoreResult operation. + notifications.wait_for_block(None).await?; - // Check that the task was processed (task count should increase). - // Note: This may take some time as it depends on block processing. - let mut task_count = 0u64; - for _ in 0..10 { - task_count = app.query_json("taskCount").await?; - if task_count > 0 { - break; - } - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } + // Check that the task was processed (task count should be 1). + let task_count: u64 = app.query_json("taskCount").await?; assert_eq!(task_count, 1, "Expected task count to be 1 after processing"); net.ensure_is_running().await?; From 6fcbd8d1b7ee0ccdaef95d25603de16381354615 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Tue, 2 Dec 2025 12:15:08 +0100 Subject: [PATCH 5/6] Make info log more user friendly. --- linera-service/src/cli/main.rs | 15 ++++++++++++--- linera-service/tests/local_net_tests.rs | 9 ++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/linera-service/src/cli/main.rs b/linera-service/src/cli/main.rs index 4079d5119fd8..fa53480102d9 100644 --- a/linera-service/src/cli/main.rs +++ b/linera-service/src/cli/main.rs @@ -24,7 +24,13 @@ pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0 #[export_name = "_rjem_malloc_conf"] pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0"; -use std::{collections::BTreeSet, env, path::PathBuf, process, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + env, + path::PathBuf, + process, + sync::Arc, +}; use anyhow::{anyhow, bail, ensure, Context, Error}; use async_trait::async_trait; @@ -1127,8 +1133,11 @@ impl Runnable for Job { // Start the task processor if operator applications are specified. if !operator_application_ids.is_empty() { - let operators = Arc::new(operators.into_iter().collect()); - info!("Supported operators: {:?}", operators); + let operators: BTreeMap = operators.into_iter().collect(); + for (name, path) in &operators { + info!("Operator '{}' -> {}", name, path.display()); + } + let operators = Arc::new(operators); let chain_client = context.make_chain_client(chain_id); let processor = TaskProcessor::new( diff --git a/linera-service/tests/local_net_tests.rs b/linera-service/tests/local_net_tests.rs index a63f72533d00..aa4a5aabeeb4 100644 --- a/linera-service/tests/local_net_tests.rs +++ b/linera-service/tests/local_net_tests.rs @@ -1289,8 +1289,8 @@ async fn test_node_service_with_task_processor() -> Result<()> { let mut notifications = Box::pin(node_service.notifications(chain).await?); // Query the initial task count (should be 0). - let app = - node_service.make_application::(&chain, &app_id.with_abi::())?; + let app = node_service + .make_application::(&chain, &app_id.with_abi::())?; let task_count: u64 = app.query_json("taskCount").await?; assert_eq!(task_count, 0); @@ -1318,7 +1318,10 @@ async fn test_node_service_with_task_processor() -> Result<()> { // Check that the task was processed (task count should be 1). let task_count: u64 = app.query_json("taskCount").await?; - assert_eq!(task_count, 1, "Expected task count to be 1 after processing"); + assert_eq!( + task_count, 1, + "Expected task count to be 1 after processing" + ); net.ensure_is_running().await?; net.terminate().await?; From 6d4d2676f25beeeb18fd9f0877725160d121d260 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Tue, 2 Dec 2025 12:21:53 +0100 Subject: [PATCH 6/6] Test cleanup. --- linera-service/tests/local_net_tests.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/linera-service/tests/local_net_tests.rs b/linera-service/tests/local_net_tests.rs index aa4a5aabeeb4..16186d0fbf2d 100644 --- a/linera-service/tests/local_net_tests.rs +++ b/linera-service/tests/local_net_tests.rs @@ -1289,8 +1289,7 @@ async fn test_node_service_with_task_processor() -> Result<()> { let mut notifications = Box::pin(node_service.notifications(chain).await?); // Query the initial task count (should be 0). - let app = node_service - .make_application::(&chain, &app_id.with_abi::())?; + let app = node_service.make_application(&chain, &app_id.with_abi::())?; let task_count: u64 = app.query_json("taskCount").await?; assert_eq!(task_count, 0); @@ -1318,10 +1317,7 @@ async fn test_node_service_with_task_processor() -> Result<()> { // Check that the task was processed (task count should be 1). let task_count: u64 = app.query_json("taskCount").await?; - assert_eq!( - task_count, 1, - "Expected task count to be 1 after processing" - ); + assert_eq!(task_count, 1); net.ensure_is_running().await?; net.terminate().await?;