High performance non-blocking spsc/mpsc/mpmc channels.
Supports asynchronous contexts and communication between asynchronous and blocking contexts.
The lower level is based on the tail of transverse beams.
For the concept, see the wiki.
-
V1.0: Released in 2022.12 and used in production.
-
V2.0: Released in 2025.6. Refactored the codebase and API by removing generic types from the ChannelShared type, making coding easier.
-
v2.1: Released in 2025.9. Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, which provides performance improvements for both asynchronous and blocking contexts.
As a non-blocking channel, crossfire outperforms other channels with asynchronous capability. And thanks to a lighter notification mechanism, in a lockdown context, some cases are even better than the original cross channel,
More reference data is posted on the wiki.
Additionally, being a non-blocking channel, the algorithm is based on spin and yield. Spinning is good on multi-core systems, but not friendly to single-core systems (such as virtual machines). So we provide a function. detect_backoff_cfg() to detect the running platform. Calling it within the initialization section of your code will get you a 2x performance boost on VPS.
The benchmark is written into the criteria framework. You can run the benchmark using:
cargo bench --bench crossfire
NOTE: Because v2.1 has brought speed to a level that no one has reached before, it can put pure pressure on asynchronous execution time. Some hidden errors may occur (especially atomic operations on a weaker ordering platform):
v2.0.26 (legacy):
| bow | execution time | workflow | state |
|---|---|---|---|
| x86_64 | threaded |
cron_2.0_x86 |
APPROVED |
| Tokyo 1.47.1 | |||
| asynchronous standard | |||
| arm | threaded |
cron_2.0_arm |
APPROVED |
| tokyo-1.47.1 | |||
| asynchronous standard |
Debug locally:
Wear --features trace_log to run the bank or test until it hangs, then press ctrl+c or send SIGINTthere will be the latest log dump in /tmp/crossfire_ring.log (see tests/common.rs _setup_log())
Debug with github workflow: #37
There are 3 modules: spsc, mpsc, mmpmcproviding functions to assign different types of channels.
The SP or SC interface is for non-simultaneous operation only. It is more memory efficient than the MP or MC implementations and sometimes a little faster.
The return types in these 3 modules are different:
-
mmpc::bounded_blocking() : (tx block, rx block)
-
mpmc::bounded_async() : (tx async, rx async)
-
mpmc::bounded_tx_async_rx_blocking() : (async tx, rx blocking)
-
mpmc::bounded_tx_blocking_rx_async() : (tx blocking, rx async)
-
mpmc::unbounded_blocking() : (unbounded tx, blocking rx)
-
mpmc::unbounded_async() : (non-blocking tx, asynchronous rx)
NOTE : For a delimited channel, size 0 is not supported yet. (Temporary rewrite as 1 size).
| Context | Sender (Producer) | Receiver (Consumer) | ||
|---|---|---|---|---|
| Single | Multiple | Single | Multiple | |
| Blockade | LockTxTrait | BlockRxTrait | ||
| tx | MTx | recipe | mrx | |
| asynchronous | AsyncTxTrait | AsyncRxTrait | ||
| asynchronousTx | MAsyncTx | AsynchronousRx | MAsyncRx | |
For SP/SC version, AsyncTx, AsyncRx, Txand Rx they are not Clone and without Sync. Although you can move to other threads, you are not allowed to use send/receive while in an Arc. (See compile_fail examples in the type doc.)
The benefit of using the SP/SC API is completely non-blocking waker registration, in exchange for a performance increase.
The sender/receiver can use the From trait to convert between blocking and asynchronous context counterparts.
The error types are the same as those of the cross channel: TrySendError, SendError, TryRecvError, RecvError
-
tokio: Enable send_timeout, recv_timeout API for asynchronous context, depending ontokio. And it will detect the correct rollback strategy for the runtime type (multithreaded/current thread). -
async_std: Enable send_timeout, recv_timeout API for asynchronous context, depending onasync-std.
Tested on tokio-1.x and async-std-1.x, crossfire is runtime independent.
The following scenarios are considered:
-
He
AsyncTx::send()andAsyncRx:recv()the operations are secure cancellation in an asynchronous context. You can safely use select! macro function and timeout() in tokio/futures in combination with recv(). In case of cancellation, [SendFuture] and [RecvFuture] will trigger drop(), which will clean up the wakeup state, making sure there are no memory leaks or crashes. But you can’t know the true result of SendFuture, since it is deleted when you cancel it. Thus, we suggest usingAsyncTx::send_timeout()instead. -
When the “tokyo” or “async_std” feature is enabled, we also provide two additional features:
-
AsyncTx::send_timeout()which will return the message that could not be sent in
[SendTimeoutError]. We guarantee that the result is atomic. Alternatively, you can useAsyncTx::send_with_timer(). -
AsyncRx::recv_timeout()we guarantee that the result is atomic. Alternatively, you can usecrate::AsyncRx::recv_with_timer().
-
Between the blocking context and the asynchronous context, and between different asynchronous runtime instances.
-
The trace of the asynchronous alarm clock.
When using a multi-producer, multi-consumer scenario, there is a small memory overhead for passing a Weak
alarm clock reference. Because our goal is to be lock-free, when send/receive futures are canceled (such as tokyo::time::timeout()), an immediate cleanup could be triggered if the test lock succeeds; Otherwise, it will depend on deferred cleaning. (This won’t be a problem because weak wakeups will be consumed by sending and receiving actual messages.) In an idle selection scenario, such as a close notification, the wakeup call will be reused as much as possible if poll() returns pending.
Cargo.toml:
[dependencies]
crossfire = "2.1"
extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::bounded_async::<i32>(100);
for _ in 0..10 {
let _tx = tx.clone();
tokio::spawn(async move {
for i in 0i32..10 {
let _ = _tx.send(i).await;
sleep(Duration::from_millis(100)).await;
println!("sent {}", i);
}
});
}
drop(tx);
let mut inv = tokio::time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = inv.tick() =>{
println!("tick");
}
r = rx.recv() => {
if let Ok(_i) = r {
println!("recv {}", _i);
} else {
println!("rx closed");
break;
}
}
}
}
}



