Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,8 @@ Run a GraphQL service to explore and extend the chains of the wallet

Default value: `0`
* `--port <PORT>` — The port on which to run the server
* `--operator-application-ids <OPERATOR_APPLICATION_IDS>` — Application IDs of operator applications to watch. When specified, a task processor is started alongside the node service
* `--operators <OPERATORS>` — Supported operators and their binary paths. Format: `name=path` or just `name` (uses name as path). Example: `--operators my-operator=/path/to/binary`



Expand Down
9 changes: 9 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"non-fungible",
"rfq",
"social",
"task-processor",
]

[workspace.dependencies]
Expand Down
21 changes: 21 additions & 0 deletions examples/task-processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "task-processor"
version = "0.1.0"
authors = ["Linera <[email protected]>"]
edition = "2021"

[dependencies]
async-graphql.workspace = true
linera-sdk.workspace = true
serde.workspace = true

[dev-dependencies]
linera-sdk = { workspace = true, features = ["test"] }

[[bin]]
name = "task_processor_contract"
path = "src/contract.rs"

[[bin]]
name = "task_processor_service"
path = "src/service.rs"
69 changes: 69 additions & 0 deletions examples/task-processor/src/contract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![cfg_attr(target_arch = "wasm32", no_main)]

mod state;

use linera_sdk::{
linera_base_types::WithContractAbi,
views::{RootView, View},
Contract, ContractRuntime,
};
use task_processor::{TaskProcessorAbi, TaskProcessorOperation};

use self::state::{PendingTask, TaskProcessorState};

pub struct TaskProcessorContract {
state: TaskProcessorState,
runtime: ContractRuntime<Self>,
}

linera_sdk::contract!(TaskProcessorContract);

impl WithContractAbi for TaskProcessorContract {
type Abi = TaskProcessorAbi;
}

impl Contract for TaskProcessorContract {
type Message = ();
type InstantiationArgument = ();
type Parameters = ();
type EventValue = ();

async fn load(runtime: ContractRuntime<Self>) -> Self {
let state = TaskProcessorState::load(runtime.root_view_storage_context())
.await
.expect("Failed to load state");
TaskProcessorContract { state, runtime }
}

async fn instantiate(&mut self, _argument: ()) {
self.runtime.application_parameters();
}

async fn execute_operation(&mut self, operation: TaskProcessorOperation) {
match operation {
TaskProcessorOperation::RequestTask { operator, input } => {
self.state
.pending_tasks
.push_back(PendingTask { operator, input });
}
TaskProcessorOperation::StoreResult { result } => {
// Remove the first pending task (the one that was just processed).
self.state.pending_tasks.delete_front();
self.state.results.push_back(result);
let count = self.state.task_count.get() + 1;
self.state.task_count.set(count);
}
}
}

async fn execute_message(&mut self, _message: ()) {
panic!("Task processor application doesn't support any cross-chain messages");
}

async fn store(mut self) {
self.state.save().await.expect("Failed to save state");
}
}
33 changes: 33 additions & 0 deletions examples/task-processor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

//! ABI of the Task Processor Example Application.
//!
//! This application demonstrates the off-chain task processor functionality.
//! It requests tasks to be executed by an external "echo" operator and stores
//! the results in its state.

use async_graphql::{Request, Response};
use linera_sdk::linera_base_types::{ContractAbi, ServiceAbi};
use serde::{Deserialize, Serialize};

pub struct TaskProcessorAbi;

/// Operations that can be executed on the contract.
#[derive(Debug, Deserialize, Serialize)]
pub enum TaskProcessorOperation {
/// Request a task to be processed by the given operator with the given input.
RequestTask { operator: String, input: String },
/// Store the result of a completed task.
StoreResult { result: String },
}

impl ContractAbi for TaskProcessorAbi {
type Operation = TaskProcessorOperation;
type Response = ();
}

impl ServiceAbi for TaskProcessorAbi {
type Query = Request;
type QueryResponse = Response;
}
146 changes: 146 additions & 0 deletions examples/task-processor/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![cfg_attr(target_arch = "wasm32", no_main)]

mod state;

use std::sync::Arc;

use async_graphql::{EmptySubscription, InputObject, Object, Request, Response, Schema};
use linera_sdk::{
linera_base_types::{Timestamp, WithServiceAbi},
views::View,
Service, ServiceRuntime,
};
use task_processor::{TaskProcessorAbi, TaskProcessorOperation};

use self::state::TaskProcessorState;

pub struct TaskProcessorService {
state: Arc<TaskProcessorState>,
runtime: Arc<ServiceRuntime<Self>>,
}

linera_sdk::service!(TaskProcessorService);

impl WithServiceAbi for TaskProcessorService {
type Abi = TaskProcessorAbi;
}

impl Service for TaskProcessorService {
type Parameters = ();

async fn new(runtime: ServiceRuntime<Self>) -> Self {
let state = TaskProcessorState::load(runtime.root_view_storage_context())
.await
.expect("Failed to load state");
TaskProcessorService {
state: Arc::new(state),
runtime: Arc::new(runtime),
}
}

async fn handle_query(&self, request: Request) -> Response {
let schema = Schema::build(
QueryRoot {
state: self.state.clone(),
runtime: self.runtime.clone(),
},
MutationRoot {
runtime: self.runtime.clone(),
},
EmptySubscription,
)
.finish();
schema.execute(request).await
}
}

struct QueryRoot {
state: Arc<TaskProcessorState>,
runtime: Arc<ServiceRuntime<TaskProcessorService>>,
}

/// The actions requested by this application for off-chain processing.
#[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
struct ProcessorActions {
/// Request a callback at the given timestamp.
request_callback: Option<Timestamp>,
/// Tasks to execute off-chain.
execute_tasks: Vec<Task>,
}

async_graphql::scalar!(ProcessorActions);

/// A task to be executed by an off-chain operator.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Task {
/// The name of the operator to execute.
operator: String,
/// The input to pass to the operator (JSON string).
input: String,
}

/// The outcome of executing an off-chain task.
#[derive(Debug, InputObject, serde::Serialize, serde::Deserialize)]
struct TaskOutcome {
/// The name of the operator that executed the task.
operator: String,
/// The output from the operator (JSON string).
output: String,
}

#[Object]
impl QueryRoot {
/// Returns the current task count.
async fn task_count(&self) -> u64 {
*self.state.task_count.get()
}

/// Returns the pending tasks and callback requests for the task processor.
async fn next_actions(
&self,
_last_requested_callback: Option<Timestamp>,
_now: Timestamp,
) -> ProcessorActions {
let mut actions = ProcessorActions::default();

// Get all pending tasks from the queue.
let count = self.state.pending_tasks.count();
if let Ok(pending_tasks) = self.state.pending_tasks.read_front(count).await {
for pending in pending_tasks {
actions.execute_tasks.push(Task {
operator: pending.operator,
input: pending.input,
});
}
}

actions
}

/// Processes the outcome of a completed task and schedules operations.
async fn process_task_outcome(&self, outcome: TaskOutcome) -> bool {
// Schedule an operation to store the result and remove the pending task.
let operation = TaskProcessorOperation::StoreResult {
result: outcome.output,
};
self.runtime.schedule_operation(&operation);
true
}
}

struct MutationRoot {
runtime: Arc<ServiceRuntime<TaskProcessorService>>,
}

#[Object]
impl MutationRoot {
/// Requests a task to be processed by an off-chain operator.
async fn request_task(&self, operator: String, input: String) -> [u8; 0] {
let operation = TaskProcessorOperation::RequestTask { operator, input };
self.runtime.schedule_operation(&operation);
[]
}
}
26 changes: 26 additions & 0 deletions examples/task-processor/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use linera_sdk::views::{linera_views, QueueView, RegisterView, RootView, ViewStorageContext};
use serde::{Deserialize, Serialize};

/// A pending task stored in the application state.
#[derive(Clone, Debug, Serialize, Deserialize, async_graphql::SimpleObject)]
pub struct PendingTask {
/// The operator to execute the task.
pub operator: String,
/// The input to pass to the operator.
pub input: String,
}

/// The application state.
#[derive(RootView, async_graphql::SimpleObject)]
#[view(context = ViewStorageContext)]
pub struct TaskProcessorState {
/// Pending tasks to be executed.
pub pending_tasks: QueueView<PendingTask>,
/// Results from completed tasks.
pub results: QueueView<String>,
/// Counter for tracking how many tasks have been processed.
pub task_count: RegisterView<u64>,
}
13 changes: 12 additions & 1 deletion linera-service/src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use linera_client::{
};
use linera_rpc::config::CrossChainConfig;

use crate::cli::validator;
use crate::{cli::validator, task_processor::parse_operator};

const DEFAULT_TOKENS_PER_CHAIN: Amount = Amount::from_millis(100);
const DEFAULT_TRANSACTIONS_PER_BLOCK: usize = 1;
Expand Down Expand Up @@ -723,6 +723,17 @@ pub enum ClientCommand {
#[cfg(with_metrics)]
#[arg(long)]
metrics_port: NonZeroU16,

/// Application IDs of operator applications to watch.
/// When specified, a task processor is started alongside the node service.
#[arg(long = "operator-application-ids")]
operator_application_ids: Vec<ApplicationId>,

/// Supported operators and their binary paths.
/// Format: `name=path` or just `name` (uses name as path).
/// Example: `--operators my-operator=/path/to/binary`
#[arg(long = "operators", value_parser = parse_operator)]
operators: Vec<(String, PathBuf)>,
},

/// Run a GraphQL service that exposes a faucet where users can claim tokens.
Expand Down
Loading
Loading