zenoh-flow aims at providing a zenoh-based data-flow programming framework for computations that span from the cloud to the device.

Overview

Eclipse Zenoh-Flow

Join the chat at https://gitter.im/atolab/zenoh-flow

Zenoh-Flow provides a zenoh-based dataflow programming framework for computations that span from the cloud to the device.

⚠️ This software is still in alpha status and should not be used in production. Breaking changes are likely to happen and the API is not stable.


Description

Zenoh-Flow allow users to declare a dataflow graph, via a YAML file, and use tags to express location affinity and requirements for the operators that makeup the graph. When deploying the dataflow graph, Zenoh-Flow automatically deals with distribution by linking remote operators through zenoh.

A dataflow is composed of set of nodes: sources — producing data, operators — computing over the data, and sinks — consuming the resulting data. These nodes are dynamically loaded at runtime.

Remote source, operators, and sinks leverage zenoh to communicate in a transparent manner. In other terms, the dataflow the dafalow graph retails location transparency and could be deployed in different ways depending on specific needs.

Zenoh-Flow provides several working examples that illustrate how to define operators, sources and sinks as well as how to declaratively define they dataflow graph by means of a YAML file.


How to build it

Install Cargo and Rust. Zenoh Flow can be successfully compiled with Rust stable (>= 1.5.1), so no special configuration is required — except for certain examples.

To build Zenoh-Flow, just type the following command after having followed the previous instructions:

$ cargo build --release

How to run

Assuming that the previous steps completed successfully, you'll find the Zenoh-Flow runtime under target/release/runtime. This executable expects the following arguments:

  • the path of the dataflow graph to execute: --graph-file zenoh-flow-examples/graphs/fizz_buzz_pipeline.yaml,
  • a name for the runtime: --runtime foo.

The graph describes the different nodes composing the dataflow. Although mandatory, the name of the runtime is used to "deploy" the graph on different "runtime instances" (see the related examples).


Creating your nodes

Assuming that the build steps completed successfully, you'll be able to use the cargo zenoh-flow subcommand to create a boilerplate for your nodes. First let's ensure to have the cargo-zenoh-flow binary in the Cargo path.

$ ln -s $(pwd)/target/release/cargo-zenoh-flow ~/.cargo/bin/

Then you can create your own node with:

$ cd ~
$ cargo zenoh-flow new myoperator

By default cargo zenoh-flow generates the template for an operator. In order to create a source or a sink you need to add either --kind source or --kind sink.
The Cargo.toml will contain metadata information (eg. the inputs/outputs) used during the build process to generate the descriptor.

More information about the cargo zenoh-flow can be obtained using cargo zenoh-flow --help.
You can now modify the src/lib.rs file with your business logic and update the Cargo.toml according to the inputs/outputs that you need.

Once you are done you can build it:

$ cargo zenoh-flow build

It will provide you the path of the descriptor for the new node, that can be used inside a flow descriptor.


Examples

Examples can be found in our example repository.

Comments
  • [Bug] Not work on OpenCV+Qt

    [Bug] Not work on OpenCV+Qt

    Describe the bug

    I encountered the following bug when executing this simple video pipeline.

    QObject::startTimer: Timers cannot be started from another thread
    

    In short, we may have a problem if we try to use multi-threading OpenCV with the Qt backend. Since Qt is also a common GUI backend besides GTK, I would suggest some alternative solutions to the mandatory async Source/Sink trait in zenoh-flow in this case.

    To reproduce

    Please visit here.

    System info

    I found this issue because I use Arch on daily use. Here I summarize my observations.

    • Arch Linux (latest)
      • Default package: OpenCV 4.6.0 + Qt (failed)
    • Ubuntu 20.04
      • Default package: OpenCV 4.2.0 + GTK (passed)
      • Manual build: OpenCV 4.6.0 + Qt (failed)
    wontfix 
    opened by YuanYuYuan 4
  • Failed to compile from sources

    Failed to compile from sources

    After resolving some missing dependencies (clang-sys), I hit the following compilation error (zenoh-dependent):

       Compiling zenoh-plugin-trait v0.1.0-dev (https://github.com/eclipse-zenoh/zenoh.git?branch=master#70182ad7)
    error[E0433]: failed to resolve: use of undeclared type `SharedMemoryAuthenticator`
       --> /home/enea/.cargo/git/checkouts/zenoh-cc237f2570fab813/70182ad/zenoh/src/net/transport/unicast/manager.rs:194:25
        |
    194 |                 .insert(SharedMemoryAuthenticator::new().into());
        |                         ^^^^^^^^^^^^^^^^^^^^^^^^^ use of undeclared type `SharedMemoryAuthenticator`
    

    cargo version 1.54, rustc version 1.54

    opened by haianos 4
  • Zenoh-Flow v0.4.0

    Zenoh-Flow v0.4.0

    This PR prepares the release of Zenoh-Flow version v0.4.0.

    The remaining tasks are:

    • [x] https://github.com/ZettaScaleLabs/zenoh-rpc/pull/4
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/3
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/4
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/5
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/6
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/10
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/7
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/13
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/12
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/11
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/14
    • [x] https://github.com/ZettaScaleLabs/zenoh-flow/pull/19
    • [ ] ~Update the README~
    • [ ] https://github.com/ZettaScaleLabs/zenoh-flow-python/pull/14
    • [ ] https://github.com/ZettaScaleLabs/zenoh-flow-examples/pull/2
    • [ ] Add a release pipeline https://github.com/ZettaScaleLabs/zenoh-flow/pull/23 (will be done later)
    opened by J-Loudet 3
  • Feedback from AutoCore team

    Feedback from AutoCore team

    Hello zenoh-flow team,

    We use zenoh-flow in our project, this framework is great and meets our expectations!

    In the process of project development, we have met some scenarios and can't find a suitable solution.

    Feedback is here and we look forward to discussing suitable solutions with you.

    1. In graph node configuration, the current state is that a key can only have one value:
    configuration:
      key1: value1
      key2: value2
      key3: value3
    

    But we need more fields to describe a key:

    configuration:
      - id: key1
        foo: value
        bar: value
        baz: value
      - id: key2
        foo: value
        bar: value
        baz: value
    

    or:

    configuration:
      key1: 
        foo: value
        bar: value
        baz: value
      key2:
        foo: value
        bar: value
        baz: value
    
    1. Development path of dynamic library are different with deployment path, we expect to change the default global graph args when runtime starting:
    args:
      release_path: /opt/release/bin
      default_input_id: Input
      default_output_id: Output
      default_config_foo: foo
    operators:
      - id : MyOperator
        uri: file://{{release_path}}/libmy_operator.so
        inputs:
          - id: {{default_input_id}}
            type: i32
        outputs:
          - id: {{default_output_id}}
            type: i32
        configuration:
          config_foo: {{default_config_foo}}
    

    and launch with development path:

    $ runtime --graph-file demo.yaml --runtime foo --release_path './build/bin'
    

    More feedback will be updated after the demo test, looking forward to your reply:)

    opened by moelang 3
  • Runtime binary running as daemon and CLI tool

    Runtime binary running as daemon and CLI tool

    New functionalities

    This PR implements the Zenoh-Flow runtime daemon.

    • [x] Define resource tree for runtime data stored in zenoh
    • [x] Implement a helper for CRUD operations on those resources
    • [x] Define interfaces for the runtime and the CLI tool
    • [x] Daemonize the runtime
    • [x] Implement the CLI tool

    Resource Tree

    The resource tree in zenoh is represented by this picture. zenoh-flow-tree

    Information is stored in the leaves, the root of the tree depends if we want to be a zenoh-plugin or a standalone process.

    All the IDs are UUIDv4

    The Registry will contain metadata for accessing the components.

    Interface

    The interface for the Runtime is defined as follow:

    /// The interface the Runtime expose to a client
    /// (eg. another runtime, the cli, the mgmt API)
    #[znservice(
        timeout_s = 60,
        prefix = "/zf/runtime",
        service_uuid = "00000000-0000-0000-0000-000000000001"
    )]
    pub trait ZFRuntime {
        /// Sends an initiation request for the given [`FlowId`]
        /// Note the request is asynchronous, the runtime that receives the request
        /// flattens the descriptor, maps it to the infrastructure,
        /// and, creates the associated [`DataFlowRecord`].
        /// The record contains an [`Uuid`] that identifies the record.
        /// The actual instantiation process runs asynchronously in the runtime.
        async fn instantiate(&self, flow_id: String) -> ZFResult<DataFlowRecord>;
    
        /// Sends a teardown request for the given record identified by the [`Uuid`]
        /// Note the request is asynchronous, the runtime that receives the request will
        /// return immediately, but the teardown process will run asynchronously in the runtime.
        async fn teardown(&self, record_id: Uuid) -> ZFResult<DataFlowRecord>;
    
        /// Prepares the runtime to run the given record identified by the [`Uuid`].
        /// Preparing a runtime means, fetch the operators/source/sinks libraries,
        /// create the needed structures in memory, the links.
        /// Once everything is prepared the runtime should return the [`DataFlowRecord`]
        async fn prepare(&self, record_id: Uuid) -> ZFResult<DataFlowRecord>;
    
        /// Cleans the runtime from the remains of the given record.
        /// Cleans means unload the libraries, drop data structures and destroy links.
        async fn clean(&self, record_id: Uuid) -> ZFResult<DataFlowRecord>;
    
        /// Starts the sinks, connectors, and operators for the given record.
        async fn start(&self, record_id: Uuid) -> ZFResult<()>;
    
        /// Starts the sources for the given record.
        /// Note that this should be called only after the `start(record)` has returned
        /// successfully otherwise data may be lost.
        async fn start_sources(&self, record_id: Uuid) -> ZFResult<()>;
    
        /// Stops the sinks, connectors, and operators for the given record.
        /// Note that this should be called after the `stop_sources(record)` has returned
        /// successfully otherwise data may be lost.
        async fn stop(&self, record_id: Uuid) -> ZFResult<()>;
    
        /// Stops the sources for the given record.
        async fn stop_sources(&self, record_id: Uuid) -> ZFResult<()>;
    
        /// Starts the given graph node for the given instance.
        /// A graph node can be a source, a sink, a connector, or an operator.
        async fn start_node(&self, record_id: Uuid, node: String) -> ZFResult<()>;
    
        /// Stops the given graph node from the given instance.
        /// A graph node can be a source, a sink, a connector, or an operator.
        async fn stop_node(&self, record_id: Uuid, node: String) -> ZFResult<()>;
    
        /// Gets the state of the given graph node for the given instance.
        /// A graph node can be a source, a sink, a connector, or an operator.
        /// The node state represents the current state of the node:
        /// `enum ComponentState { Running, Stopped, Error(err) }`
        async fn get_node_state(&self, record_id: Uuid, node: String) -> ZFResult<ComponentState>;
    
        /// Sends the `message` to `node` for the given record.
        /// This is useful for sending out-of-band notification to a node.
        /// eg. in the case of deadline miss notification.
        async fn notify_node(
            &self,
            record_id: Uuid,
            node: String,
            message: ZFControlMessage,
        ) -> ZFResult<()>;
    
        /// Checks the compatibility for the given `operator`
        /// Compatibility is based on tags and some machine characteristics (eg. CPU architecture, OS)
        async fn check_operator_compatibility(&self, operator: ZFOperatorDescriptor) -> ZFResult<bool>;
    
        /// Checks the compatibility for the given `source`
        /// Compatibility is based on tags and some machine characteristics (eg. CPU architecture, OS)
        async fn check_source_compatibility(&self, source: ZFSourceDescriptor) -> ZFResult<bool>;
    
        /// Checks the compatibility for the given `sink`
        /// Compatibility is based on tags and some machine characteristics (eg. CPU architecture, OS)
        async fn check_sink_compatibility(&self, sink: ZFSinkDescriptor) -> ZFResult<bool>;
    }
    

    This interface will be exposed over Zenoh using zenoh-rpc

    opened by gabrik 3
  • Refactor of extension loading mechanism

    Refactor of extension loading mechanism

    This PR changes the way extensions (e.g. the support for Python nodes) are defined and loaded by Zenoh Flow. Nowadays each extension needs to be configured in the runtime configuration file.

    ---
        pid_file : /var/zenoh-flow/runtime.pid
        path : /etc/zenoh-flow
        loader:
          ### this is what we mean by configured in the runtime configuration file
          extensions:
                  - name: python
                    file_extension: py
                    source_lib: /home/ato/Workspace/zenoh-flow-python/target/release/libpy_source.so
                    sink_lib: /home/ato/Workspace/zenoh-flow-python/target/release/libpy_sink.so
                    operator_lib: /home/ato/Workspace/zenoh-flow-python/target/release/libpy_op.so
                    config_lib_key: python-script
        zenoh :
          kind: peer
          listen: ["tcp/0.0.0.0:7997"]
          locators: []
    

    They can instead be positioned on a directory like extension.d where each file contains the definition of one extension.

    This facilitates adding extensions by just dropping a file in that directory.

    .../extensions.d/
    - 00-py.zfext
    - 01-ws.zfext
    - ....
    

    Single extension description:

    name: python
    file_extension: py
    source_lib: /etc/zenoh-flow/extension/python/libpy_source.so
    sink_lib: /etc/zenoh-flow/extension/python/libpy_sink.so
    operator_lib: /etc/zenoh-flow/extension/python/libpy_op.so
    config_lib_key: python-script
    

    In particular this PR implements:

    • [x] Update the daemon configuration (and the default one)
    • [x] Implement TryFrom<DaemonConfig> for the daemon
    • [x] Refactor the LoaderConfig to hide the internal details.
    • [x] Update .deb and .rpm generators
    • [x] Load extensions at start-up

    Signed-off-by: gabrik [email protected]

    enhancement 
    opened by gabrik 2
  • cargo zenoh-flow subcommand

    cargo zenoh-flow subcommand

    ~This PR contains the code for the Zenoh-Flow registry and the cargo zenoh-flow subcommand.~

    ~- [ ] Registry daemon~ ~- Stores the metadata and the libraries for each component.~

    • [x] cargo zenoh-flow
      • [x] New: creates a basic boilerplate for a Zenoh-Flow component
      • [x] Build: builds the component, generates the metadata needed for the Registry, and register it to the local registry ~- [ ] Push: pushes the component to a remote registry~ ~- [ ] List: shows the components stored on the local registry~ ~- [ ] Pull: pulls a component from a remote registry to the local one~

    Now this PR contains only the code for the cargo-zenoh-flow subcommand.

    opened by gabrik 2
  • [Bug][dev/v0.4.0] Transformation of Receiver<Sample> into ZFJobStream is never polled

    [Bug][dev/v0.4.0] Transformation of Receiver into ZFJobStream is never polled

    Describe the bug

    The job is indeed published on Zenoh on the correct key expression but no worker takes care of it.

    The cause seems to be in the transformation of what we receive from Zenoh into a ZFJobStream: the resulting Future is never polled. The underlying issue remains to be determined (bad handling of Futures?).

    To reproduce

    Try to launch a DataFlow via zfctl.

    System info

    Zenoh-Flow dev/v0.4.0

    bug 
    opened by J-Loudet 1
  • Dissociate the `setup` and `run` phases when starting a node

    Dissociate the `setup` and `run` phases when starting a node

    Describe the feature

    Dissociating both phases is a first step towards being able to "pause" a running data flow.

    For now, when nodes are stopped, the only way to restart them is to call setup again. Calling setup regenerates (and clears) their internal state.

    By dissociating the setup from the run, we would be able to choose to either resume (calling run & keeping the internal state) or reset (calling setup + run) the execution of a node.

    feature 
    opened by J-Loudet 1
  • [Bug] Inconsistent naming, e.g. `runtime_uuid` & `instance_id`

    [Bug] Inconsistent naming, e.g. `runtime_uuid` & `instance_id`

    Describe the bug

    Consistent naming is preferable. Considering the fact that all unique ids are, in fact, UUID, having *_uuid seems the better option.

    To reproduce

    N/A

    System info

    N/A

    bug 
    opened by J-Loudet 1
  • [Bug] Inconsistency in `Output` and `Input` types function names.

    [Bug] Inconsistency in `Output` and `Input` types function names.

    Describe the bug

    There is an inconsistency in the Output and Input types in the name of the function used to retrieve the ID.

    In Input we have:

    impl Input {
        pub fn id(&self) -> &PortId {
            &self.id
        }
        ...
    }
    

    While in Output we have:

    impl Output {
        /// Returns the port id associated with this Output.
        ///
        /// Port ids are unique per type (i.e. Input / Output) and per node.
        pub fn port_id(&self) -> &PortId {
            &self.port_id
        }
        ...
     }   
    

    They should be the same, so either id() or port_id()

    To reproduce

    N/A

    System info

    N/A

    bug 
    opened by gabrik 1
  • It should be possible to configure the Congestion Control and reliability of underlying Zenoh channels.

    It should be possible to configure the Congestion Control and reliability of underlying Zenoh channels.

    Describe the feature

    It would be helpful to provide users the possibility to configure the Congestion Control and Reliability for the channels created by Zenoh Flow.

    Currently, the behavior is to use CongestionControl::Block in ZenohSender. While this works in most of the cases, with some slow subscribers it would be better to have CongestionControl::Drop, thus avoiding filling the queues with "old data".

    opened by gabrik 0
  • Observability

    Observability

    Summary

    To measure and provide insights about Zenoh Flow’s performance, some metrics should be provided.

    Intended outcome

    "Metrics" (to be refined) are made available.

    Steps

    TBD.

    opened by J-Loudet 0
  • Kubernetes integration

    Kubernetes integration

    Summary

    To further ease the deployment of a data flow, an integration with Kubernetes is envisioned.

    Intended outcome

    A user can deploy a flow through Kubernetes’ command line tool (for instance: kubectl add flow data-flow.yaml).

    Prerequisites

    • Bindings in Go for Zenoh?

    Steps

    TBD.

    feature 
    opened by J-Loudet 0
  • Orchestration interface

    Orchestration interface

    Summary

    Currently, the mapping of nodes is specified manually in the YAML file. Zenoh Flow should expose an interface that would let external "logic" perform this mapping.

    Intended outcome

    1. Zenoh Flow exposes an interface to perform this mapping.
    2. We provide a simple example that assigns nodes to daemons randomly.

    Steps

    TBD.

    feature 
    opened by J-Loudet 0
  • Extend URI support

    Extend URI support

    Summary

    Currently, the only URI that is supported for shared libraries (or Python script) is file://. Other schemas should be supported (for instance: http://, https://).

    Intended outcome

    Shared libraries or Python scripts should be fetched following other URI schemas.

    Steps

    • [ ] Determine a first list of schemas to support.
    • [ ] Provide a working example for each schema.
    • [ ] TBD.
    enhancement 
    opened by J-Loudet 0
Releases(0.4.0-alpha.1)
Owner
null
A Rust library for interacting with OpenAI's ChatGPT API, providing an easy-to-use interface and strongly typed structures.

ChatGPT Rust Library A Rust library for interacting with OpenAI's ChatGPT API. This library simplifies the process of making requests to the ChatGPT A

Arend-Jan 6 Mar 23, 2023
A Rust library integrated with ONNXRuntime, providing a collection of ML models.

usls A Rust library integrated with ONNXRuntime, providing a collection of Computer Vison and Vision-Language models including YOLOv8, YOLOv9, RTDETR,

Jamjamjon 3 Apr 9, 2024
A distributed network for cloud-native services

fólkvangr A distributed network for cloud-native services About Fólkvangr started out as a way to build social networks that were secure and decentral

Ignacy Koper 4 Jan 12, 2023
Framework and Language for Neuro-Symbolic Programming

Scallop Scallop is a declarative language designed to support rich symbolic reasoning in AI applications. It is based on Datalog, a logic rule-based q

null 27 Dec 14, 2022
High performance distributed framework for training deep learning recommendation models based on PyTorch.

PERSIA (Parallel rEcommendation tRaining System with hybrId Acceleration) is developed by AI platform@Kuaishou Technology, collaborating with ETH. It

null 340 Dec 30, 2022
A lesson in Humility - or how a boy teaches himself a new programming language.

Learning Rust - A Lesson in Humility J. M. Barrie - Author of Peter Pan Life is a long lesson in humility... and so is learning Rust. I tried to teach

Gary McDougall 6 Nov 1, 2022
A collection of CC-BY-SA course material to teach the Rust programming language, in different formats, levels, and focus points

A collection of CC-BY-SA course material to teach the Rust programming language, in different formats, levels, and focus points. Contact me for remote and on-site trainings!

Katharina Fey 13 Apr 13, 2023
A Rust machine learning framework.

Linfa linfa (Italian) / sap (English): The vital circulating fluid of a plant. linfa aims to provide a comprehensive toolkit to build Machine Learning

Rust-ML 2.2k Jan 2, 2023
Open Machine Intelligence Framework for Hackers. (GPU/CPU)

Leaf • Introduction Leaf is a open Machine Learning Framework for hackers to build classical, deep or hybrid machine learning applications. It was ins

Autumn 5.5k Jan 1, 2023
Xaynet represents an agnostic Federated Machine Learning framework to build privacy-preserving AI applications.

xaynet Xaynet: Train on the Edge with Federated Learning Want a framework that supports federated learning on the edge, in desktop browsers, integrate

XayNet 196 Dec 22, 2022
Orkhon: ML Inference Framework and Server Runtime

Orkhon: ML Inference Framework and Server Runtime Latest Release License Build Status Downloads Gitter What is it? Orkhon is Rust framework for Machin

Theo M. Bulut 129 Dec 21, 2022
A fast, safe and easy to use reinforcement learning framework in Rust.

RSRL (api) Reinforcement learning should be fast, safe and easy to use. Overview rsrl provides generic constructs for reinforcement learning (RL) expe

Thomas Spooner 139 Dec 13, 2022
Tangram is an automated machine learning framework designed for programmers.

Tangram Tangram is an automated machine learning framework designed for programmers. Run tangram train to train a model from a CSV file on the command

Tangram 1.4k Dec 30, 2022
A Machine Learning Framework for High Performance written in Rust

polarlight polarlight is a machine learning framework for high performance written in Rust. Key Features TBA Quick Start TBA How To Contribute Contrib

Chris Ohk 25 Aug 23, 2022
Simple WIP GPGPU framework for Rust built on top of wgpu

gpgpu A simple GPU compute library based on wgpu. It is meant to be used alongside wgpu if desired. To start using gpgpu, just create a Framework inst

Jerónimo Sánchez 97 Dec 26, 2022
Accel: GPGPU Framework for Rust

Accel: GPGPU Framework for Rust crate crates.io docs.rs GitLab Pages accel CUDA-based GPGPU framework accel-core Helper for writing device code accel-

Toshiki Teramura 439 Dec 18, 2022
Multi-agent (path finding) planning framework

multi-agent (path finding) planning framework Mapf is a (currently experimental) Rust library for multi-agent planning, with a focus on cooperative pa

null 17 Dec 5, 2022
Machine learning framework for building object trackers and similarity search engines

Similari Similari is a framework that helps build sophisticated tracking systems. The most frequently met operations that can be efficiently implement

In-Sight 71 Dec 28, 2022
☁ Puff ☁ - The deep stack framework.

☁ Puff ☁ Python with an async runtime built-in Rust for GraphQL, ASGI, WSGI, Postgres, PubSub, Redis, Distributed Tasks, and HTTP2 Client. What is Puf

Kyle Hanson 290 Jan 8, 2023