High-performance, low-level framework for composing flexible web integrations

Related tags

Command-line barter
Overview

Barter-Integration

High-performance, low-level framework for composing flexible web integrations.

Utilised by other Barter trading ecosystem crates to build robust financial exchange integrations, primarily for public data collection & trade execution. It is:

  • Low-Level: Translates raw data streams communicated over the web into any desired data model using arbitrary data transformations.
  • Flexible: Compatible with any protocol (WebSocket, FIX, Http, etc.), any input/output model, and any user defined transformations.

Core abstractions include:

  • RestClient providing configurable signed Http communication between client & server.
  • ExchangeStream providing configurable communication over any asynchronous stream protocols (WebSocket, FIX, etc.).

Both core abstractions provide the robust glue you need to conveniently translate between server & client data models.

See: Barter, Barter-Data & Barter-Execution

Crates.io MIT licensed Build Status Discord chat

API Documentation | Chat

Overview

Barter-Integration is a high-performance, low-level, configurable framework for composing flexible web integrations.

RestClient

(sync private & public Http communication)

At a high level, a RestClient is has a few major components that allow it to execute RestRequests:

  • RequestSigner with configurable signing logic on the target API.
  • HttpParser that translates API specific responses into the desired output types.

ExchangeStream

(async communication using streaming protocols such as WebSocket and FIX)

At a high level, an ExchangeStream is made up of a few major components:

  • Inner Stream/Sink socket (eg/ WebSocket, FIX, etc).
  • StreamParser that is capable of parsing input protocol messages (eg/ WebSocket, FIX, etc.) as exchange specific messages.
  • Transformer that transforms from exchange specific message into an iterator of the desired outputs type.

Examples

Fetch Ftx Account Balances Using Signed GET request:

( config: Self::Config<'a>, builder: RequestBuilder, signature: String, ) -> Result { // Add Ftx required Headers & build reqwest::Request builder .header("FTX-KEY", config.api_key) .header("FTX-TS", &config.time.timestamp_millis().to_string()) .header("FTX-SIGN", &signature) .build() .map_err(SocketError::from) } } struct FtxParser; impl HttpParser for FtxParser { type ApiError = serde_json::Value; type OutputError = ExecutionError; fn parse_api_error(&self, status: StatusCode, api_error: Self::ApiError) -> Self::OutputError { // For simplicity, use serde_json::Value as Error and extract raw String for parsing let error = api_error.to_string(); // Parse Ftx error message to determine custom ExecutionError variant match error.as_str() { message if message.contains("Invalid login credentials") => { ExecutionError::Unauthorised(error) } _ => ExecutionError::Socket(SocketError::HttpResponse(status, error)), } } } #[derive(Debug, Error)] enum ExecutionError { #[error("request authorisation invalid: {0}")] Unauthorised(String), #[error("SocketError: {0}")] Socket(#[from] SocketError), } struct FetchBalancesRequest; impl RestRequest for FetchBalancesRequest { type Response = FetchBalancesResponse; // Define Response type type QueryParams = (); // FetchBalances does not require any QueryParams type Body = (); // FetchBalances does not require any Body fn path() -> &'static str { "/api/wallet/balances" } fn method() -> reqwest::Method { reqwest::Method::GET } fn metric_tag() -> Tag { Tag::new("method", "fetch_balances") } } #[derive(Deserialize)] #[allow(dead_code)] struct FetchBalancesResponse { success: bool, result: Vec, } #[derive(Deserialize)] #[allow(dead_code)] struct FtxBalance { #[serde(rename = "coin")] symbol: Symbol, total: f64, } /// See Barter-Execution for a comprehensive real-life example, as well as code you can use out of the /// box to execute trades on many exchanges. #[tokio::main] async fn main() { // Construct Metric channel to send Http execution metrics over let (http_metric_tx, _http_metric_rx) = mpsc::unbounded_channel(); // HMAC-SHA256 encoded account API secret used for signing private http requests let mac: Hmac = Hmac::new_from_slice("api_secret".as_bytes()).unwrap(); // Build Ftx configured RequestSigner for signing http requests with hex encoding let request_signer = RequestSigner::new( FtxSigner { api_key: "api_key".to_string(), }, mac, HexEncoder, ); // Build RestClient with Ftx configuration let rest_client = RestClient::new("https://ftx.com", http_metric_tx, request_signer, FtxParser); // Fetch Result let _response = rest_client.execute(FetchBalancesRequest).await; }">
use barter_integration::{
    error::SocketError,
    metric::Tag,
    model::Symbol,
    protocol::http::{
        private::{encoder::HexEncoder, RequestSigner, Signer},
        rest::{client::RestClient, RestRequest},
        HttpParser,
    },
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use hmac::{digest::KeyInit, Hmac};
use reqwest::{RequestBuilder, StatusCode};
use serde::Deserialize;
use thiserror::Error;
use tokio::sync::mpsc;

struct FtxSigner {
    api_key: String,
}

// Configuration required to sign every Ftx `RestRequest`
struct FtxSignConfig<'a> {
    api_key: &'a str,
    time: DateTime<Utc>,
    method: reqwest::Method,
    path: &'static str,
}

impl Signer for FtxSigner {
    type Config<'a> = FtxSignConfig<'a> where Self: 'a;

    fn config<'a, Request>(
        &'a self,
        _: Request,
        _: &RequestBuilder,
    ) -> Result<Self::Config<'a>, SocketError>
    where
        Request: RestRequest,
    {
        Ok(FtxSignConfig {
            api_key: self.api_key.as_str(),
            time: Utc::now(),
            method: Request::method(),
            path: Request::path(),
        })
    }

    fn bytes_to_sign<'a>(config: &Self::Config<'a>) -> Bytes {
        Bytes::copy_from_slice(
            format!("{}{}{}", config.time, config.method, config.path).as_bytes(),
        )
    }

    fn build_signed_request<'a>(
        config: Self::Config<'a>,
        builder: RequestBuilder,
        signature: String,
    ) -> Result<reqwest::Request, SocketError> {
        // Add Ftx required Headers & build reqwest::Request
        builder
            .header("FTX-KEY", config.api_key)
            .header("FTX-TS", &config.time.timestamp_millis().to_string())
            .header("FTX-SIGN", &signature)
            .build()
            .map_err(SocketError::from)
    }
}

struct FtxParser;

impl HttpParser for FtxParser {
    type ApiError = serde_json::Value;
    type OutputError = ExecutionError;

    fn parse_api_error(&self, status: StatusCode, api_error: Self::ApiError) -> Self::OutputError {
        // For simplicity, use serde_json::Value as Error and extract raw String for parsing
        let error = api_error.to_string();

        // Parse Ftx error message to determine custom ExecutionError variant
        match error.as_str() {
            message if message.contains("Invalid login credentials") => {
                ExecutionError::Unauthorised(error)
            }
            _ => ExecutionError::Socket(SocketError::HttpResponse(status, error)),
        }
    }
}

#[derive(Debug, Error)]
enum ExecutionError {
    #[error("request authorisation invalid: {0}")]
    Unauthorised(String),

    #[error("SocketError: {0}")]
    Socket(#[from] SocketError),
}

struct FetchBalancesRequest;

impl RestRequest for FetchBalancesRequest {
    type Response = FetchBalancesResponse; // Define Response type
    type QueryParams = (); // FetchBalances does not require any QueryParams
    type Body = (); // FetchBalances does not require any Body

    fn path() -> &'static str {
        "/api/wallet/balances"
    }

    fn method() -> reqwest::Method {
        reqwest::Method::GET
    }

    fn metric_tag() -> Tag {
        Tag::new("method", "fetch_balances")
    }
}

#[derive(Deserialize)]
#[allow(dead_code)]
struct FetchBalancesResponse {
    success: bool,
    result: Vec<FtxBalance>,
}

#[derive(Deserialize)]
#[allow(dead_code)]
struct FtxBalance {
    #[serde(rename = "coin")]
    symbol: Symbol,
    total: f64,
}

/// See Barter-Execution for a comprehensive real-life example, as well as code you can use out of the
/// box to execute trades on many exchanges.
#[tokio::main]
async fn main() {
    // Construct Metric channel to send Http execution metrics over
    let (http_metric_tx, _http_metric_rx) = mpsc::unbounded_channel();

    // HMAC-SHA256 encoded account API secret used for signing private http requests
    let mac: Hmac<sha2::Sha256> = Hmac::new_from_slice("api_secret".as_bytes()).unwrap();

    // Build Ftx configured RequestSigner for signing http requests with hex encoding
    let request_signer = RequestSigner::new(
        FtxSigner {
            api_key: "api_key".to_string(),
        },
        mac,
        HexEncoder,
    );

    // Build RestClient with Ftx configuration
    let rest_client = RestClient::new("https://ftx.com", http_metric_tx, request_signer, FtxParser);

    // Fetch Result
    let _response = rest_client.execute(FetchBalancesRequest).await;
}

Consume Binance Futures tick-by-tick Trades and calculate a rolling sum of volume:

>, id: u32, }, Trade { #[serde(rename = "q", deserialize_with = "de_str")] quantity: f64, }, } struct StatefulTransformer { sum_of_volume: VolumeSum, } impl Transformer for StatefulTransformer { type Input = BinanceMessage; type OutputIter = Vec>; fn transform(&mut self, input: Self::Input) -> Self::OutputIter { // Add new input Trade quantity to sum match input { BinanceMessage::SubResponse { result, id } => { debug!("Received SubResponse for {}: {:?}", id, result); // Don't care about this for the example } BinanceMessage::Trade { quantity, .. } => { // Add new Trade volume to internal state VolumeSum self.sum_of_volume += quantity; } }; // Return IntoIterator of length 1 containing the running sum of volume vec![Ok(self.sum_of_volume)] } } /// See Barter-Data for a comprehensive real-life example, as well as code you can use out of the /// box to collect real-time public market data from many exchanges. #[tokio::main] async fn main() { // Establish Sink/Stream communication with desired WebSocket server let mut binance_conn = connect_async("wss://fstream.binance.com/ws/") .await .map(|(ws_conn, _)| ws_conn) .expect("failed to connect"); // Send something over the socket (eg/ Binance trades subscription) binance_conn .send(WsMessage::Text( json!({"method": "SUBSCRIBE","params": ["btcusdt@aggTrade"],"id": 1}).to_string(), )) .await .expect("failed to send WsMessage over socket"); // Instantiate some arbitrary Transformer to apply to data parsed from the WebSocket protocol let transformer = StatefulTransformer { sum_of_volume: 0.0 }; // ExchangeWsStream includes pre-defined WebSocket Sink/Stream & WebSocket StreamParser let mut ws_stream = ExchangeWsStream::new(binance_conn, transformer); // Receive a stream of your desired Output data model from the ExchangeStream while let Some(volume_result) = ws_stream.next().await { match volume_result { Ok(cumulative_volume) => { // Do something with your data println!("{cumulative_volume:?}"); } Err(error) => { // React to any errors produced by the internal transformation eprintln!("{error}") } } } } /// Deserialize a `String` as the desired type. fn de_str<'de, D, T>(deserializer: D) -> Result where D: de::Deserializer<'de>, T: FromStr, T::Err: std::fmt::Display, { let data: String = Deserialize::deserialize(deserializer)?; data.parse::().map_err(de::Error::custom) }">
use barter_integration::{
    error::SocketError,
    protocol::websocket::{WebSocket, WebSocketParser, WsMessage},
    ExchangeStream, Transformer,
};
use futures::{SinkExt, StreamExt};
use serde::{de, Deserialize};
use serde_json::json;
use std::str::FromStr;
use tokio_tungstenite::connect_async;
use tracing::debug;

// Convenient type alias for an `ExchangeStream` utilising a tungstenite `WebSocket`
type ExchangeWsStream<Exchange> = ExchangeStream<WebSocketParser, WebSocket, Exchange, VolumeSum>;

// Communicative type alias for what the VolumeSum the Transformer is generating
type VolumeSum = f64;

#[derive(Deserialize)]
#[serde(untagged, rename_all = "camelCase")]
enum BinanceMessage {
    SubResponse {
        result: Option<Vec<String>>,
        id: u32,
    },
    Trade {
        #[serde(rename = "q", deserialize_with = "de_str")]
        quantity: f64,
    },
}

struct StatefulTransformer {
    sum_of_volume: VolumeSum,
}

impl Transformer<VolumeSum> for StatefulTransformer {
    type Input = BinanceMessage;
    type OutputIter = Vec<Result<VolumeSum, SocketError>>;

    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
        // Add new input Trade quantity to sum
        match input {
            BinanceMessage::SubResponse { result, id } => {
                debug!("Received SubResponse for {}: {:?}", id, result);
                // Don't care about this for the example
            }
            BinanceMessage::Trade { quantity, .. } => {
                // Add new Trade volume to internal state VolumeSum
                self.sum_of_volume += quantity;
            }
        };

        // Return IntoIterator of length 1 containing the running sum of volume
        vec![Ok(self.sum_of_volume)]
    }
}

/// See Barter-Data for a comprehensive real-life example, as well as code you can use out of the
/// box to collect real-time public market data from many exchanges.
#[tokio::main]
async fn main() {
    // Establish Sink/Stream communication with desired WebSocket server
    let mut binance_conn = connect_async("wss://fstream.binance.com/ws/")
        .await
        .map(|(ws_conn, _)| ws_conn)
        .expect("failed to connect");

    // Send something over the socket (eg/ Binance trades subscription)
    binance_conn
        .send(WsMessage::Text(
            json!({"method": "SUBSCRIBE","params": ["btcusdt@aggTrade"],"id": 1}).to_string(),
        ))
        .await
        .expect("failed to send WsMessage over socket");

    // Instantiate some arbitrary Transformer to apply to data parsed from the WebSocket protocol
    let transformer = StatefulTransformer { sum_of_volume: 0.0 };

    // ExchangeWsStream includes pre-defined WebSocket Sink/Stream & WebSocket StreamParser
    let mut ws_stream = ExchangeWsStream::new(binance_conn, transformer);

    // Receive a stream of your desired Output data model from the ExchangeStream
    while let Some(volume_result) = ws_stream.next().await {
        match volume_result {
            Ok(cumulative_volume) => {
                // Do something with your data
                println!("{cumulative_volume:?}");
            }
            Err(error) => {
                // React to any errors produced by the internal transformation
                eprintln!("{error}")
            }
        }
    }
}

/// Deserialize a `String` as the desired type.
fn de_str<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
    D: de::Deserializer<'de>,
    T: FromStr,
    T::Err: std::fmt::Display,
{
    let data: String = Deserialize::deserialize(deserializer)?;
    data.parse::<T>().map_err(de::Error::custom)
}

For a larger, "real world" example, see the Barter-Data repository.

Getting Help

Firstly, see if the answer to your question can be found in the API Documentation. If the answer is not there, I'd be happy to help to Chat and try answer your question via Discord.

Contributing

Thanks for your help in improving the Barter ecosystem! Please do get in touch on the discord to discuss development, new features, and the future roadmap.

Related Projects

In addition to the Barter-Integration crate, the Barter project also maintains:

  • Barter: High-performance, extensible & modular trading components with batteries-included. Contains a pre-built trading Engine that can serve as a live-trading or backtesting system.
  • Barter-Data: A high-performance WebSocket integration library for streaming public data from leading cryptocurrency exchanges.
  • Barter-Execution: Financial exchange integrations for trade execution - yet to be released!

Roadmap

  • Add new default StreamParser implementations to enable integration with other popular systems such as Kafka.

Licence

This project is licensed under the MIT license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Barter-Integration by you, shall be licensed as MIT, without any additional terms or conditions.

You might also like...
Horus is an open source tool for running forensic and administrative tasks at the kernel level using eBPF, a low-overhead in-kernel virtual machine, and the Rust programming language.
Horus is an open source tool for running forensic and administrative tasks at the kernel level using eBPF, a low-overhead in-kernel virtual machine, and the Rust programming language.

Horus Horus is an open-source tool for running forensic and administrative tasks at the kernel level using eBPF, a low-overhead in-kernel virtual mach

Low level access to processors using the AArch64 execution state.

aarch64-cpu Low level access to processors using the AArch64 execution state. Usage Please note that for using this crate's register definitions (as p

Rust low-level minimalist APNG writer and PNG reader with just a few dependencies with all possible formats coverage (including HDR).

project Wiki https://github.com/js29a/micro_png/wiki at glance use micro_png::*; fn main() { // load an image let image = read_png("tmp/test.

A new pure-Rust library for cross-platform low-level access to USB devices.

nusb A new pure-Rust library for cross-platform low-level access to USB devices. Documentation Compared to rusb and libusb Pure Rust, no dependency on

Rust Imaging Library's Python binding: A performant and high-level image processing library for Python written in Rust

ril-py Rust Imaging Library for Python: Python bindings for ril, a performant and high-level image processing library written in Rust. What's this? Th

high-level API for reaper-rs

rea-rs Easy to use ReaScript API. While reaper-rs is full-implemented at low-level, and, partially implemented at medium-level, on top of it (mostly,

Crates.io library that provides high-level APIs for obtaining information on various entertainment media such as books, movies, comic books, anime, manga, and so on.
Crates.io library that provides high-level APIs for obtaining information on various entertainment media such as books, movies, comic books, anime, manga, and so on.

Crates.io library that provides high-level APIs for obtaining information on various entertainment media such as books, movies, comic books, anime, manga, and so on.

A high level DSL for Simplicity. This is a work in progress and is not yet ready for production use

A high level DSL for Simplicity. This is a work in progress and is not yet ready for production use. The language is designed to be simple and easy to use. It is inspired by rust syntax and is statically typed. The syntax will be extended in the future to support more features.

A high-level, ergonomic crate for interacting with the UploadThing API

utapi-rs A high-level, ergonomic Rust crate for interacting with the Uploadthing API. Why? If you're using Rust and want to use Uploadthing for file u

Comments
  • Feature: RestClient

    Feature: RestClient

    Feature:

    • RestClient struct that executes abstract RestRequests.
    • Configurable HTTP request signing utilising Signer, Encoder traits.
    • Parse API responses with HttpParser trait.
    opened by just-a-stream 0
  • Draft: Refactor: ExchangeStream & Transformer

    Draft: Refactor: ExchangeStream & Transformer

    Refactor

    • Updated dependency versions.
    • fn de_str() optimisation to use borrowed &str rather than allocating.
    • trait Transformer<Output> now has Output as an associated type.
    opened by just-a-stream 0
  • Ping/Pong

    Ping/Pong

    Hi, I noticed that ping/pongs not working in this integration

    2022-11-12 15:33:11 - TRACE: Parsed headers [137, 0]
    2022-11-12 15:33:11 - TRACE: First: 10001001
    2022-11-12 15:33:11 - TRACE: Second: 0
    2022-11-12 15:33:11 - TRACE: Opcode: Control(Ping)
    2022-11-12 15:33:11 - TRACE: Masked: false
    2022-11-12 15:33:11 - TRACE: received frame 
    <FRAME>
    final: true
    reserved: false false false
    opcode: PING
    length: 2
    payload length: 0
    payload: 0x
                
    2022-11-12 15:33:11 - TRACE: Received message 
    2022-11-12 15:33:11 - TRACE: [] received Ping WebSocket message
    

    Example tokio-tungstenite normal working:

    2022-11-12 18:33:23 - TRACE: Opcode: Control(Ping)
    2022-11-12 18:33:23 - TRACE: Masked: false
    2022-11-12 18:33:23 - TRACE: received frame 
    <FRAME>
    final: true
    reserved: false false false
    opcode: PING
    length: 2
    payload length: 0
    payload: 0x
                
    2022-11-12 18:33:23 - TRACE: Received message
    2022-11-12 18:33:23 - TRACE: Sending pong reply
    2022-11-12 18:33:23 - TRACE: writing frame 
    <FRAME>
    final: true
    reserved: false false false
    opcode: PONG
    length: 6
    payload length: 0
    payload: 0x
    
    opened by Exzentttt 0
Owner
Barter
Open-source Rust algorithmic trading framework.
Barter
Simple low-level web server to serve file uploads with some shell scripting-friendly features

http_file_uploader Simple low-level web server to serve file uploads with some shell scripting-friendly features. A bridge between Web's multipart/for

Vitaly Shukela 2 Oct 27, 2022
A lightweight and high-performance order-book designed to process level 2 and trades data. Available in Rust and Python

ninjabook A lightweight and high-performance order-book implemented in Rust, designed to process level 2 and trades data. Available in Python and Rust

Ninja Quant 134 Jul 22, 2024
Rust in Anger: high-performance web applications

Rust in Anger: Book demo This is the code repository that accompanies the Rust in Anger blog post. The following folders each come with their own buil

EqualTo 26 Apr 9, 2023
A high-performance web-based geospatial visualization tool with an emphasis LEO satellites and lunar missions.

A high-performance web-based geospatial visualization tool with an emphasis LEO satellites and lunar missions. Written in Rust to target WebGPU, with WebGL2 backwards compatibility.

Shane B. 5 Dec 20, 2023
High-performance asynchronous computation framework for system simulation

Asynchronix A high-performance asynchronous computation framework for system simulation. What is this? Warning: this page is at the moment mostly addr

Asynchronics 7 Dec 7, 2022
Low-level Rust library for implementing terminal command line interface, like in embedded systems.

Terminal CLI Need to build an interactive command prompt, with commands, properties and with full autocomplete? This is for you. Example, output only

HashMismatch 47 Nov 25, 2022
A low-level ncurses wrapper for Rust

ncurses-rs This is a very thin wrapper around the ncurses TUI lib. NOTE: The ncurses lib is terribly unsafe and ncurses-rs is only the lightest wrappe

Jeaye Wilkerson 628 Jan 7, 2023
Verified Rust for low-level systems code

See Goals for a brief description of the project's goals. Building the project The main project source is in source. tools contains scripts for settin

Secure Foundations Lab 95 Dec 27, 2022
A low-level MVCC file format for storing blobs.

Sediment This repository isn't ready for public consumption. It just reached a stage where I wanted to start sharing ideas with others as well as usin

Khonsu Labs 24 Jan 8, 2023
Unopinionated low level API bindings focused on soundness, safety, and stronger types over raw FFI.

?? firehazard ?? Create a fire hazard by locking down your (Microsoft) Windows so nobody can escape (your security sandbox.) Unopinionated low level A

null 5 Nov 17, 2022