Skip to Content
SDKsRust

Rust SDK

The Rust SDK ships as two independent crates:

CratePurpose
orrery-clientHTTP client — start instances, send messages, query state
orrery-workerWorker abstraction — subscribe to topics, poll, auto-complete

Installation

[dependencies] orrery-client = { git = "https://github.com/orrery-io/orrery" } orrery-worker = { git = "https://github.com/orrery-io/orrery" }

orrery-client

Creating a client

use orrery_client::OrreryClient; let client = OrreryClient::new("http://localhost:3000"); // Do not include /v1 — the client appends it automatically.

Starting a process instance

use orrery_client::{OrreryClient, StartInstanceRequest}; use std::collections::HashMap; use serde_json::json; let client = OrreryClient::new("http://localhost:3000"); let instance = client.start_instance(StartInstanceRequest { process_definition_id: "order-process:1".into(), business_key: Some("order-42".into()), variables: HashMap::from([ ("orderId".into(), json!("order-42")), ("amount".into(), json!(99.0)), ]), }).await?; println!("{} — {}", instance.id, instance.state);

Getting and cancelling instances

// Get by id let instance = client.get_instance(&instance.id).await?; // List — all filters are optional let page = client.list_instances( Some("order-process:1"), // definition_id Some("RUNNING"), // state None, // version Some(1), // page Some(20), // page_size ).await?; // Cancel client.cancel_instance(&instance.id).await?; // Retry a failed instance client.retry_instance(&instance.id).await?;

Updating variables

client.update_instance_variables( &instance.id, json!({ "approved": true, "reviewedBy": "alice" }), ).await?;

Sending a message

use orrery_client::SendMessageRequest; client.send_message(SendMessageRequest { message_ref: "payment.received".into(), correlation_data: HashMap::from([ ("orderId".into(), json!("order-42")), ]), }).await?;

Broadcasting a signal

use orrery_client::BroadcastSignalRequest; let result = client.broadcast_signal( "system.shutdown", BroadcastSignalRequest { variables: HashMap::from([ ("reason".into(), json!("maintenance")), ]), }, ).await?; println!("Woken {} instances", result.woken_count);

Deploying a process definition

let xml = std::fs::read_to_string("order-process.bpmn")?; let def = client.deploy_definition(xml).await?; println!("Deployed {} version {}", def.id, def.version);

Error type

use orrery_client::ClientError; match client.get_instance("missing").await { Err(ClientError::Http { status, body }) => println!("{status}: {body}"), Err(ClientError::Network(e)) => println!("network: {e}"), Err(ClientError::Json(e)) => println!("decode: {e}"), Ok(instance) => { /* ... */ } }

orrery-worker

orrery-worker wraps the fetch-and-lock polling loop. Handlers are async closures that return output variables as serde_json::Value. The worker automatically extends locks (heartbeat at 50% of lock_duration), completes tasks on success, and fails them on error.

Minimal example

use orrery_worker::Worker; #[tokio::main] async fn main() -> anyhow::Result<()> { Worker::new("http://localhost:3000") .subscribe( "validatePayment", vec![], // process definition IDs to filter on; empty = any definition |task| async move { let amount = task.variables["amount"].as_f64().unwrap_or(0.0); Ok(serde_json::json!({ "valid": amount > 0.0 })) }, ) .run() .await }

run() blocks until SIGTERM or SIGINT.

Builder options

use std::time::Duration; Worker::new("http://localhost:3000") .worker_id("payment-worker-1") // default: auto-generated UUID .lock_duration(Duration::from_secs(60)) // default: 30 s .concurrency(8) // default: 4 .subscribe(/* ... */) .run() .await?;

If base_url is empty, the worker reads ORRERY_URL from the environment and falls back to http://localhost:3000.

Multiple topics

The second argument to .subscribe() is a list of process definition IDs to filter on. Pass an empty vec![] to receive tasks from any process definition.

Worker::new("http://localhost:3000") .subscribe("validatePayment", vec![], |task| async move { // any definition Ok(serde_json::json!({ "valid": true })) }) .subscribe("chargeCard", vec!["order-process"], |task| async move { // only order-process Ok(serde_json::json!({ "chargeId": "ch_abc123" })) }) .subscribe("sendReceipt", vec![], |task| async move { Ok(serde_json::json!({})) }) .run() .await?;

Error handling

Return Err(anyhow::Error) to fail the task. The error message is stored and the task is retried according to its retry configuration.

Worker::new("http://localhost:3000") .subscribe("chargeCard", vec![], |task| async move { let amount = task.variables["amount"] .as_f64() .ok_or_else(|| anyhow::anyhow!("missing amount variable"))?; if amount > 10_000.0 { return Err(anyhow::anyhow!("amount exceeds limit")); } Ok(serde_json::json!({ "charged": true, "amount": amount })) }) .run() .await?;

WorkerFactory is the preferred way to wire up a worker service. Each register call binds a handler to a (topic, process_definition_id) pair — providing built-in filtering by process definition and clean separation of handlers across multiple topics:

use orrery_worker::WorkerFactory; WorkerFactory::new("http://localhost:3000") .register("validatePayment", "order-process", handlers::validate_payment::handle) .register("chargeCard", "order-process", handlers::charge_card::handle) .register("sendReceipt", "order-process", handlers::send_receipt::handle) .build() .run() .await?;

The factory groups registrations by topic, builds a single Worker with one subscribe call per topic, and uses a routing closure to dispatch to the correct handler based on task.process_definition_id.

Task context

The handler receives an ExternalTaskResponse with the full task context:

Worker::new("http://localhost:3000") .subscribe("processOrder", vec![], |task| async move { println!("task: {}", task.id); println!("topic: {}", task.topic); println!("instance: {}", task.process_instance_id); println!("definition: {}", task.process_definition_id); println!("retry: {}/{}", task.retry_count, task.max_retries); // Variables are serde_json::Value let order_id = &task.variables["orderId"]; let amount = task.variables["amount"].as_f64().unwrap_or(0.0); Ok(serde_json::json!({ "total": amount })) }) .run() .await?;

Source: crates/orrery-client/ and crates/orrery-worker/ in the main repository.

Last updated on