Skip to content

Operator (v0.4.0)

J-Loudet edited this page Jan 2, 2023 · 2 revisions

In Zenoh-Flow, an Operator processes data, received from Sources or other Operators, and forwards the results of this processing to Sinks or other Operators. An Operator is typically where one would write the core of their business logic. For instance, in an object detection pipeline, a Source would fetch the images while the actual detection would be done in an Operator.

There are — for now — two ways to create a Operator in Zenoh-Flow, depending on the programming language you favor:

  • a shared library (Rust)
  • a script (Python)

For Zenoh-Flow to be able to load our Operator, it must be accompanied by a descriptor.

Descriptor

The content of the descriptor is relatively straight-forward, it must specify:

  1. an id (for display purposes),
  2. [optional] some configuration,
  3. [optional] some vars,
  4. its input(s) --- i.e. the data it requires,
  5. its output(s) --- i.e. the data it will produce,
  6. an uri --- i.e. where to find its actual implementation.

Below is a descriptor fit for the code we are going to write next:

id: my-operator

# This configuration is not used and serves as an example.
configuration:
  value: not-used

# This vars section is not used and serves as an example.
vars:
  FOO: not-used
  
inputs:
  - id: input
    type: String

outputs:
  - id: output
    type: String

# Linux:
uri: file:///absolute/path/to/the/implementation/libmy_operator.so
# MacOS:
# uri: file:///absolute/path/to/the/implementation/libmy_operator.dylib
# Windows:
# uri: file:///absolute/path/to/the/implementation/my_operator.dll

Shared library

Assuming you want to create an Operator called my-operator, enter the following in a terminal:

cargo new --lib my-operator

Modify the Cargo.toml to add these dependencies and tell rustc that you want a library that can be dynamically loaded:

[dependencies]
async-trait = "0.1.50"  # Zenoh-Flow’s nodes traits are asynchronous
zenoh-flow = { git = "https://github.com/ZettaScaleLabs/zenoh-flow.git", branch = "dev/v0.4.0" }

[lib]
crate-type=["cdylib"]

⚠️ Once we release 0.4.0 on crates.io, the Zenoh-Flow dependency will simply be zenoh-flow = "0.4".

Now modify lib.rs to (i) implement the Zenoh-Flow traits and (ii) include your logic.

Below you can find commented boilerplate code to do (i).

use async_trait::async_trait;
use zenoh_flow::prelude::*;

// MyOperator is where you implement your business' logic.
//
// `Input` and `Output` are structures provided by Zenoh-Flow through which you, respectively,
// receive data from upstream nodes and send data to donwstream nodes.
//
// The way to pass an `Output` and an `Input` is through its Constructor --- see below.
//
// That structure is also where a state can be saved. For concurrency reasons, the state must
// implement `Send` and `Sync` (`Arc` and `Mutex` structures can be helpful, in particular their
// `async_std` variant).
//
// The `export_operator` macro is required to properly expose the symbol and information about the
// version of the Rust compiler and Zenoh-Flow, to Zenoh-Flow.
//
// It allows Zenoh-Flow to detect, at runtime, a version mismatch between the Zenoh-Flow daemon and
// the shared library (be it on the version of the Rust compiler or of Zenoh-Flow itself).
#[export_operator]
struct MyOperator {
    input: Input<String>,
    output: Output<String>,
}

#[async_trait]
impl Node for MyOperator {
    async fn iteration(&self) -> Result<()> {
        // Add your business logic here.
        let data_to_process = self.input.recv().await?;
        let processed_data = format!("{} From Zenoh-Flow!", data_to_process);
        self.output.send(processed_data, None).await
    }
}

#[async_trait]
impl Operator for MyOperator {
    async fn new(
        // The `context` provides information about the Zenoh-Flow daemon on which the generated
        // node MyOperator will be executed.
        context: Context,
        // The `configuration`(1) is a re-export of `serde_json::Value`(2). It behaves as a
        // dictionary and allows accessing configuration variables defined in the descriptor.
        configuration: Option<Configuration>,
        // The `Inputs` are encapsulated `flume::Receivers` created by Zenoh-Flow. It is a HashMap
        // whose keys match what was defined in the descriptor file.
        mut inputs: Inputs,
        // The `Outputs` are encapsulated `flume::Senders` that were created by Zenoh-Flow. It is
        // a HashMap whose keys match what was defined in the descriptor file.
        mut outputs: Outputs,
    ) -> Result<Self> {
        let input = inputs.take("input").expect("No input named 'input' found");
        let output = outputs.take("output").expect("No output named 'output' found");
        Ok(MyOperator { input, output })
    }
}

(1): Configuration (2): serde_json::Value

Python script

TODO: Add a reference to the auto-generated Python docs.

Clone this wiki locally