Single and multi-threaded custom ingestion crate for Stellar Futurenet, written in Rust.

Overview

Crates.io

rs-ingest

Ingestion library written in rust for Futurenet

This package provides primitives for building custom ingestion engines on Futurenet. It's inspired from stellars go/ingest package.

Often, developers either need ingestion features that are outside of Horizon's scope, or need higher availability for the data. For example, a protocol's frontend might need to ingest events into their own database with a very specific filtering or at a large amount, or they might also need to replay history to see if they missed some events.

This crate is being designed with this need in mind, and works on futurenet!

Note: You can also use this crate for pubnet, see the example.

Note: This crate is still a work in progress. The current capabilities of the crate are limited.

Note: Currently only POSIX systems are supported.

Features

Currently, you can both replay history and run online. Running online does not currently support running starting to replay history from a given ledger.

Note that the current implementation is experimental and does not cover all the functionalities that an ingestion crate should, including but not limited to failsafe mechanisms, archiver interaction, custom toml configuration, readers, and overall a more optimized codebase.

Running offline

Running offline means being able to replay history through a catchup. Replaying history will enable you to process everything that has happened on the network within the specified bounded range.

rs-ingest allows you to run offline in two modes:

  • single-thread
  • multi-thread

Single-thread mode

Running single-thread mode is the most straightforward way of using rs-ingest. This mode will await for the core subprocess to finish catching up and will then allow to retrieve the ledger(s) metadata.

Running single-thread mode will store every ledger meta.

Multi-thread mode

Running multi-thread mode is also pretty simple, but returns a Receiver<MetaResult> object that receives new ledger meta (already decoded) as soon as it is emitted.

When you run multi-thread mode you will be in charge of how to store the metadata or object derived from it.

When running multi-thread mode you also need to call the closing mechanism manually.

Running online

Running online means being able to sync with Futurenet and close ledgers, thus receive ledger close meta. This mode is more suitable for building using real-time data (for example event streaming).

Multi-threaded mode

Running online can only be done in multi-thread mode. You'll receive a Receiver<MetaResult> object which receives ledger close meta as stellar-core closes ledgers.

When running multi-thread mode you also need to call the closing mechanism manually.

Closing mechanism

rs-ingest has a closing mechanism that deletes temporary buckets created during execution and clears the objects. Closing is important before re-initiating an action on rs-ingest. When running single-thread mode, the closing mechanism is triggered within rs-ingest modules, however when running multi-thread it's the implementor that must decide when to trigger the closing mechanism.

Try it out

The crate is a WIP, but you can already start playing around the features it currently offers. For example, check out the examples.

The crate is available on crates.io:

ingest = "0.0.1"

stellar-core setup

Before using the crate, you need the stellar-core executable. To install the currently futurenet-compatible core:

git clone https://github.com/stellar/stellar-core

cd stellar-core

git checkout b7d3a8f8

git submodule init

git submodule update

./autogen.sh

CXX=clang++-12 ./configure --enable-next-protocol-version-unsafe-for-production

make

make install [this one might need root access on some machines]

Learn

The ingest crate is pretty easy to use, especially currently since the amount of features is limited and there is a predefined TOML configuration.

Anyways, ingest depends on the stellar-core executable which enables an option to stream ledger metadata to the provided descriptor. ingest simply wraps all the commands and stream-reading for you so you can focus on actually writing your own ingestion mechanism.

Hello ledger!

This first example is very simple, and consists in capturing a specific ledger and make sure that ingest worked properly and captured the correct ledger.

use ingest::{IngestionConfig, CaptiveCore, Range, BoundedRange};
use stellar_xdr::next::LedgerCloseMeta;

pub fn main() {
    let config = IngestionConfig {
        executable_path: "/usr/local/bin/stellar-core".to_string(),
        context_path: Default::default(),
        network: SupportedNetwork::Futurenet,
        bounded_buffer_size: None,
        staggered: None,
        staggered: None,
        staggered: None,
        staggered: None,
        staggered: None,
        staggered: None,
        staggered: None,
        staggered: None
    };

    let mut captive_core = CaptiveCore::new(config);

    let range = Range::Bounded(BoundedRange(292395, 292396));
    captive_core.prepare_ledgers(&range).unwrap();

    let ledger = captive_core.get_ledger(292395);
    let ledger_seq = match ledger.unwrap() {
        LedgerCloseMeta::V1(v1) => v1.ledger_header.header.ledger_seq,
        _ => unreachable!()
    };

    println!("Hello ledger {}", ledger_seq);
}

As you can see, this is all pretty simple and straightforward. We first setup the captive core instance by providing the path to our stellar-core executable, choosing the default context path (/tmp/rs_ingestion_temp/, holds buckets and potentially the db), and specifying that we don't want to use a bounded buffer size when running multi-thread mode (though notice in this example we are only using single-thread mode, you can find multi-thread examples in the examples). Then we choose a range of ledgers we want ingest to prepare with CaptiveCore::prepare_ledgers(&mut CaptiveCore, &Range) so that we can later get them.

In our case, we just chose to load two ledgers and then capture the first one with CaptiveCore::get_ledger(&CaptiveCore, ledger_sequence).

At this point, it's all about using the stellar_xdr::next::LedgerCloseMeta object we received with get_ledger(sequence): we assume it's a v1 ledgerclose meta and then get the ledger's sequence from the header and print it out.

Running hello ledger

You can now run this with either cargo run [OPTIONS] or by compiling to an executable and running it. This should be pretty quick and the result should be:

Warning: soroban-env-host-curr is running a pre-release version 0.0.17
Hello ledger 292395

Note: Don't worry about the warning. stellar-core is just letting us know we are working on Futurenet, which has not yet been released as a stable audited release.

If you're seeing a different result and are experiencing errors, first make sure you have installed the correct version of stellar-core (following this set of commands), and if you still experience errors please open an issue.

Invoke host function operations statistics

Note: this doesn't take into account FeeBump transactions.

Now let's get to a more interesting use of ingest. We are going to analyze a set of 10000 ledgers and see how many operations that happened within those ledgers are invoke host function operations. This statistic is interesting since host functions are invoked for almost everything soroban-related: uploading contracts and invoking contracts.

Again, we start by preparing the ledgers we're interested in:

use ingest::{IngestionConfig, CaptiveCore, Range, BoundedRange};
use stellar_xdr::next::{LedgerCloseMeta, TransactionPhase, TxSetComponent, TransactionEnvelope, OperationBody};

pub fn main() {
    let config = IngestionConfig {
        executable_path: "/usr/local/bin/stellar-core".to_string(),
        context_path: Default::default(),
        network: SupportedNetwork::Futurenet,
        bounded_buffer_size: None,
        staggered: None,
        staggered: None
    };

    let mut captive_core = CaptiveCore::new(config);

    // preparing just 10000 ledgers for simplicity.
    let range = Range::Bounded(BoundedRange(292_000, 302_000)); 
    
    println!("[+] Preparing ledgers [{} to {}]", range.bounded().0, range.bounded().1);
    captive_core.prepare_ledgers(&range).unwrap();

    // ...    

}

As you can see, we're preparing 10000 ledgers. Note that preparing these is not instant and will take a bit more than preparing just the two ledgers of the previous example.

Anyways, now it's time to process the prepared ledgers and count how many of the total operations were invoke host functions operations:

pub fn main() {
    // ...
    let mut all_other_ops = 0;
    let mut invoke_host_ops = 0;

    for n in std::ops::Range::from(range) {
        let ledger = captive_core.get_ledger(n);
        if let LedgerCloseMeta::V1(v1) = ledger.unwrap() {
            let set = match &v1.tx_set {
                stellar_xdr::next::GeneralizedTransactionSet::V1(set) => set
            };
            for tx_phase in set.phases.iter() {
                let set = match tx_phase {
                    TransactionPhase::V0(set) => set
                };
                for set in set.iter() {
                    let ops: Vec<&Operation> = match set {
                        TxSetComponent::TxsetCompTxsMaybeDiscountedFee(set) => {
                            let mut ops = Vec::new();
                            for tx_envelope in set.txs.iter() {
                                match tx_envelope {
                                    TransactionEnvelope::Tx(tx) => {
                                        for op in tx.tx.operations.iter() {
                                            ops.push(op);
                                        }
                                    },
                                    TransactionEnvelope::TxV0(tx) => {
                                        for op in tx.tx.operations.iter() {
                                            ops.push(op);
                                        }
                                    },
                                    _ => todo!()
                                };
                            }

                            ops
                        }
                    };
                    for op in ops {
                        match op.body {
                            OperationBody::InvokeHostFunction(_) => invoke_host_ops += 1,
                            _ => all_other_ops += 1,
                        }
                    }
                }
            }
        }
    }

    println!("Total operations recorded: {}", all_other_ops+invoke_host_ops);
    println!("Non invoke host function operations: {all_other_ops}");
    println!("Invoke host function operations: {invoke_host_ops}");

}

Hopefully the above is straightforward, at least in that we're just processing every one of the ledgers we have prepared in the previous step. The processing is done against stellar_xdr::next::LedgerCloseMeta, but we realize that as is it's a bit too verbose to get things like operations and so on so we plan to provide some util functions that parse LedgerCloseMeta to obtain information. For example, we would like to provide a helper that converts the meta into a std::Vec of events.

Running

As mentioned previously this one will take a bit more than the hello example (~10 minutes).

The result should be the following:

[+] Preparing ledgers [292000 to 302000]
Warning: soroban-env-host-curr is running a pre-release version 0.0.17
Total operations recorded: 2760
Non invoke host function operations: 2273
Invoke host function operations: 487

This means that one every ~5 and a half operations for the captured ledgers is an invoke host function operation.

Event streaming with online mode

Running online mode is suitable for tasks that need to ingest real-time data. For example, think of an indexer for contract events. It needs to store in a database all events following a certain schema and makes them available by exposing a queriable API. The service needs to provide real-time data to its customers, thus will need to run online mode.

In this example, we explore how we can print out contract events, if any, as ledgers close and the network progresses.

As always, we set up our captive core with the usual configs:

use ingest::{IngestionConfig, CaptiveCore};
use stellar_xdr::next::{LedgerCloseMeta, TransactionMeta};

const TARGET_SEQ: u32 = 387468;

pub fn main() {
    let config = IngestionConfig {
        executable_path: "/usr/local/bin/stellar-core".to_string(),
        context_path: Default::default(),
        network: SupportedNetwork::Futurenet,
        bounded_buffer_size: None,
        staggered: None
    };

    let mut captive_core = CaptiveCore::new(config);

    // ...
}

Notice that we added a TARGET_SEQ constant. That's because in this example we want to showcase how you could call the closing mechanism when running online, and to do so we need a ledger sequence at which our program exists and calls the closing mechanism.

Note: when you run this example, choose an appropriate target sequence, for example 100 sequences from the current sequence.

Now, we need to start our core instance in online mode:

pub fn main() {
    // ...

    let receiver = captive_core.start_online_no_range().unwrap();

    // ...
}

As you can see, we're obtaining a receiver object from starting online. The Receiver<MetaResult> object is needed for our runner instance thread to communicate with the implementor, i.e you. In fact when you call captive_core.start_online_no_range() you are also spawning the stella-core run process and the buffered ledger meta process, which will send meta to the receiver as new framed ledger close meta is decoded.

Now you can choose how to deal with the receiver, in our case we just loop over it and print out contract events if we find any:

pub fn main() {
    println!("Capturing all events. When a contract event will be emitted it will be printed to stdout");
    for result in receiver.iter() {
        let ledger = result.ledger_close_meta.unwrap().ledger_close_meta;
        match &ledger {
            LedgerCloseMeta::V1(v1) => {

                let ledger_seq = v1.ledger_header.header.ledger_seq;
                if ledger_seq == TARGET_SEQ {
                    println!("Reached target ledger, closing");
                    captive_core.close_runner_process().unwrap();

                    std::process::exit(0)
                }

                for tx_processing in v1.tx_processing.iter() {
                    match &tx_processing.tx_apply_processing {
                        TransactionMeta::V3(meta) => {
                            if let Some(soroban) = &meta.soroban_meta {
                                if !soroban.events.is_empty() {
                                    println!("Events for ledger {}: \n{}\n",  ledger_seq, serde_json::to_string_pretty(&soroban.events).unwrap())
                                }
                            }
                        },
                        _ => todo!()
                    }
                }
            },
            _ => ()
        }
    }
}

As you can see, we're just destructuring every MetaResult we receive and print out, if any, the events on the processed ledger.

Notice this code block:

if ledger_seq == TARGET_SEQ {
    println!("Reached target ledger, closing");
    captive_core.close_runner_process().unwrap();

    std::process::exit(0)
}

Where we trigger the closing mechanism and exit successfully.

This might be a good starting point to start working on an indexer for you protocol's contract events.

You might also like...
Manager for single node Rancher clusters

Bovine Manage single node Rancher clusters with a single binary, bovine. % bovine run Pulling [rancher/rancher:latest], this may take awhile... Ranche

Most useful information about your system in a single command.

mymy Access the most common information about your system using a single command. Mymy is a command line tool that provides the most helpful informati

Retina is a network analysis framework that supports 100+ Gbps traffic analysis on a single server with no specialized hardware.

Retina Retina is a network analysis framework that enables operators and researchers to ask complex questions about high-speed (100gbE) network links

A fast and secure multi protocol honeypot.

Medusa A fast and secure multi protocol honeypot that can mimic realistic devices running ssh, telnet, http, https or any other tcp and udp servers. W

A standalone Aleo prover build upon snarkOS and snarkVM, with multi-threading optimization

Aleo Light Prover Introduction A standalone Aleo prover build upon snarkOS and snarkVM, with multi-threading optimization. It's called "light" because

Rust implementation of multi-party Schnorr signatures over elliptic curves.

Multi Party Schnorr Signatures This library contains several Rust implementations of multi-signature Schnorr schemes. Generally speaking, these scheme

Ethereum JSON-RPC multi-transport client. Rust implementation of web3 library

Ethereum JSON-RPC multi-transport client. Rust implementation of web3 library. ENS address: rust-web3.eth

MLIR Rust multi-level compiler framework

MLIR-RS Multi-Level Intermediate Representation framework for Rust. What Modern programming language design is moving towards multi-level lowering to

multi-market crank for serum-dex

A performance and cost optimized serum-dex crank that allows combining multiple market cranking instructions into a single transaction, while concurrently generating the crank instructions allowing for increased throughput.

Comments
  • Workaround or document catchup result streamed meta

    Workaround or document catchup result streamed meta

    When running catchup in multi-thread mode users might incur an error within the MetaResult wrapper since after catching up it will try to decode the catchup result. The most straightforward way to approach this is to document this and notify users not to do unsafe access to options of MetaResult since it will always result in a panic when running catchup in multi-thread mode.

    opened by heytdep 1
  • Option to have bounded channels for multi-thread mode

    Option to have bounded channels for multi-thread mode

    Currently, the runner creates unbounded channels with std::sync::mpsc::channel(). This means every send op goes through in the bufreader since the channel has an infinite buffer and might cause overloads if the streamed data from code overloads the decoder. While this might be what most users look for, adding the option for bounded channels might help in some situations to prevent excessive memory usage.

    opened by heytdep 1
  • Add closing mechanism for both single-thread and multi-thread.

    Add closing mechanism for both single-thread and multi-thread.

    When running single-threaded the closing mechanism (which is not yet implemented) could be within the runner core module. However, when running multi-threaded the closing mechanism can't be triggered by rs-ingest modules since it would halt execution.

    The best way forward seems to be to provide a handle users should call to initiate closing.

    opened by heytdep 1
  • track and document pubnet compatibility

    track and document pubnet compatibility

    rs-ingest currently seems to work well also on pubnet and a new flag in IngestionConfig has been added to specify wether to run on mainnet or futurenet.

    That said, I haven't had the chance yet to see how it goes when running online or going through big chunks of history. We should also document pubnet compatibility and add examples, possibly similar to the already-existing ones on stellar/go/ingest.

    Pubnet 
    opened by heytdep 1
Releases(0.0.3-alpha.0)
Owner
Xycloo Labs
Building projects on the stellar network
Xycloo Labs
CLI for Stellar Smart Contracts.

stellar-contract-cli CLI for running Stellar contracts locally in a test VM. Executes WASM files built using the rs-stellar-contract-sdk. Install carg

Stellar 22 Dec 20, 2022
Yi Token by Crate Protocol: the primitive for auto-compounding single token staking pools.

yi Yi Token by Crate Protocol: the primitive for auto-compounding single token staking pools. About Yi is a Solana primitive for building single-sided

Crate Protocol 12 Apr 7, 2022
An encrypted multi client messaging system written in pure Rust

?? Preamble This is a pure Rust multi-client encrypted messaging system, also known as Edode's Secured Messaging System. It is an end-to-end(s) commun

Edode 3 Sep 16, 2022
Freelance payment protocol written in Rust.. (with multi-sig signing for dispute settling)

Freelance Escrow payment protocol ?? About The freelance protocol is a protocol built on decentralized and open systems such as blockchain and decentr

Femi Bolaji 4 Jan 3, 2023
Custom Ethereum vanity address generator made in Rust

ethaddrgen Custom Ethereum address generator Get a shiny ethereum address and stand out from the crowd! Disclaimer: Do not use the private key shown i

Jakub Hlusička 153 Dec 27, 2022
A multiplexed p2p network framework that supports custom protocols

Tentacle Overview This is a minimal implementation for a multiplexed p2p network based on yamux that supports mounting custom protocols. Architecture

漂流 188 Dec 19, 2022
Custom p2p swaps powered by Solana blockchain

Peer to peer, decentralized protocol which allow direct swaps between 2 network participants for custom tokens without liquidity pools on Solana blockchain.

Ilia 3 Mar 18, 2022
Biddi Network enables custom p2p swaps in Solana ocean 🌊.

Peer to peer, decentralized protocol which allow direct swaps between 2 network participants for custom tokens without liquidity pools on Solana blockchain.

Biddi Network 2 Nov 1, 2022
A monorepo containing all the custom components of the Astria network

A monorepo containing all the custom components of the Astria network, a decentralized system that replaces traditional sequencers, offering a shared, permissionless sequencer network.

Astria 6 Jun 7, 2023
Two-party and multi-party ECDSA protocols based on class group with Rust

CG-MPC-ECDSA This project aims to implement two-party and multi-party ECDSA protocols based on class group with Rust. It currently includes schemes de

LatticeX Foundation 16 Mar 17, 2022