frostyplanet/crossfire-rs: A non-blocking mpmc/mpsc to support an asynchronous base on a crossbar

frostyplanet/crossfire-rs: A non-blocking mpmc/mpsc to support an asynchronous base on a crossbar


License


Burden
Documentation
Rust 1.36+

High performance non-blocking spsc/mpsc/mpmc channels.

Supports asynchronous contexts and communication between asynchronous and blocking contexts.

The lower level is based on the tail of transverse beams.

For the concept, see the wiki.

  • V1.0: Released in 2022.12 and used in production.

  • V2.0: Released in 2025.6. Refactored the codebase and API by removing generic types from the ChannelShared type, making coding easier.

  • v2.1: Released in 2025.9. Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, which provides performance improvements for both asynchronous and blocking contexts.

As a non-blocking channel, crossfire outperforms other channels with asynchronous capability. And thanks to a lighter notification mechanism, in a lockdown context, some cases are even better than the original cross channel,

mpsc limited size 100 lock context

mpmc bounded size locking context 100

mpsc limited size 100 asynchronous context

Mmpmc size limited 100 asynchronous context

More reference data is posted on the wiki.

Additionally, being a non-blocking channel, the algorithm is based on spin and yield. Spinning is good on multi-core systems, but not friendly to single-core systems (such as virtual machines). So we provide a function. detect_backoff_cfg() to detect the running platform. Calling it within the initialization section of your code will get you a 2x performance boost on VPS.

The benchmark is written into the criteria framework. You can run the benchmark using:

cargo bench --bench crossfire

NOTE: Because v2.1 has brought speed to a level that no one has reached before, it can put pure pressure on asynchronous execution time. Some hidden errors may occur (especially atomic operations on a weaker ordering platform):

v2.0.26 (legacy):

bow execution time workflow state
x86_64 threaded

cron_2.0_x86

APPROVED
Tokyo 1.47.1
asynchronous standard
arm threaded

cron_2.0_arm

APPROVED
tokyo-1.47.1
asynchronous standard

Debug locally:

Wear --features trace_log to run the bank or test until it hangs, then press ctrl+c or send SIGINTthere will be the latest log dump in /tmp/crossfire_ring.log (see tests/common.rs _setup_log())

Debug with github workflow: #37

There are 3 modules: spsc, mpsc, mmpmcproviding functions to assign different types of channels.

The SP or SC interface is for non-simultaneous operation only. It is more memory efficient than the MP or MC implementations and sometimes a little faster.

The return types in these 3 modules are different:

  • mmpc::bounded_blocking() : (tx block, rx block)

  • mpmc::bounded_async() : (tx async, rx async)

  • mpmc::bounded_tx_async_rx_blocking() : (async tx, rx blocking)

  • mpmc::bounded_tx_blocking_rx_async() : (tx blocking, rx async)

  • mpmc::unbounded_blocking() : (unbounded tx, blocking rx)

  • mpmc::unbounded_async() : (non-blocking tx, asynchronous rx)

NOTE : For a delimited channel, size 0 is not supported yet. (Temporary rewrite as 1 size).

Context Sender (Producer) Receiver (Consumer)
Single Multiple Single Multiple
Blockade LockTxTrait BlockRxTrait
tx MTx recipe mrx
asynchronous AsyncTxTrait AsyncRxTrait
asynchronousTx MAsyncTx AsynchronousRx MAsyncRx

For SP/SC version, AsyncTx, AsyncRx, Txand Rx they are not Clone and without Sync. Although you can move to other threads, you are not allowed to use send/receive while in an Arc. (See compile_fail examples in the type doc.)

The benefit of using the SP/SC API is completely non-blocking waker registration, in exchange for a performance increase.

The sender/receiver can use the From trait to convert between blocking and asynchronous context counterparts.

The error types are the same as those of the cross channel: TrySendError, SendError, TryRecvError, RecvError

  • tokio: Enable send_timeout, recv_timeout API for asynchronous context, depending on tokio. And it will detect the correct rollback strategy for the runtime type (multithreaded/current thread).

  • async_std: Enable send_timeout, recv_timeout API for asynchronous context, depending on async-std.

Tested on tokio-1.x and async-std-1.x, crossfire is runtime independent.

The following scenarios are considered:

  • He AsyncTx::send() and AsyncRx:recv() the operations are secure cancellation in an asynchronous context. You can safely use select! macro function and timeout() in tokio/futures in combination with recv(). In case of cancellation, [SendFuture] and [RecvFuture] will trigger drop(), which will clean up the wakeup state, making sure there are no memory leaks or crashes. But you can’t know the true result of SendFuture, since it is deleted when you cancel it. Thus, we suggest using AsyncTx::send_timeout() instead.

  • When the “tokyo” or “async_std” feature is enabled, we also provide two additional features:

  • AsyncTx::send_timeout()which will return the message that could not be sent in
    [SendTimeoutError]. We guarantee that the result is atomic. Alternatively, you can use AsyncTx::send_with_timer().

  • AsyncRx::recv_timeout()we guarantee that the result is atomic. Alternatively, you can use crate::AsyncRx::recv_with_timer().

  • Between the blocking context and the asynchronous context, and between different asynchronous runtime instances.

  • The trace of the asynchronous alarm clock.

When using a multi-producer, multi-consumer scenario, there is a small memory overhead for passing a Weak
alarm clock reference. Because our goal is to be lock-free, when send/receive futures are canceled (such as tokyo::time::timeout()), an immediate cleanup could be triggered if the test lock succeeds; Otherwise, it will depend on deferred cleaning. (This won’t be a problem because weak wakeups will be consumed by sending and receiving actual messages.) In an idle selection scenario, such as a close notification, the wakeup call will be reused as much as possible if poll() returns pending.

Cargo.toml:

[dependencies]
crossfire = "2.1"
extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::bounded_async::<i32>(100);
    for _ in 0..10 {
        let _tx = tx.clone();
        tokio::spawn(async move {
            for i in 0i32..10 {
                let _ = _tx.send(i).await;
                sleep(Duration::from_millis(100)).await;
                println!("sent {}", i);
            }
        });
    }
    drop(tx);
    let mut inv = tokio::time::interval(Duration::from_millis(500));
    loop {
        tokio::select! {
            _ = inv.tick() =>{
                println!("tick");
            }
            r = rx.recv() => {
                if let Ok(_i) = r {
                    println!("recv {}", _i);
                } else {
                    println!("rx closed");
                    break;
                }
            }
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *