macro_rules! select { (@ { // The index of the future to poll first (in bias mode), or the RNG // expression to use to pick a future to poll first. start=$start:expr; // One `_` for each branch in the `select!` macro. Passing this to // `count!` converts $skip to an integer. ( $($count:tt)* ) // Normalized select branches. `( $skip )` is a set of `_` characters. // There is one `_` for each select branch **before** this one. Given // that all input futures are stored in a tuple, $skip is useful for // generating a pattern to reference the future for the current branch. // $skip is also used as an argument to `count!`, returning the index of // the current select branch. $( ( $($skip:tt)* ) $bind:pat = $fut:expr, if $c:expr => $handle:expr, )+ // Fallback expression used when all select branches have been disabled. ; $else:expr }) => { ... }; (@ { start=$start:expr; $($t:tt)* } ) => { ... }; (@ { start=$start:expr; $($t:tt)* } else => $else:expr $(,)?) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block, $($r:tt)* ) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block, $($r:tt)* ) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block $($r:tt)* ) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block $($r:tt)* ) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr ) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr ) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr, $($r:tt)* ) => { ... }; (@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr, $($r:tt)* ) => { ... }; (biased; $p:pat = $($t:tt)* ) => { ... }; ( $p:pat = $($t:tt)* ) => { ... }; () => { ... }; }
Expand description
Waits on multiple concurrent branches, returning when the first branch completes, cancelling the remaining branches.
The select!
macro must be used inside of async functions, closures, and
blocks.
The select!
macro accepts one or more branches with the following pattern:
<pattern> = <async expression> (, if <precondition>)? => <handler>,
Additionally, the select!
macro may include a single, optional else
branch, which evaluates if none of the other branches match their patterns:
else => <expression>
The macro aggregates all <async expression>
expressions and runs them
concurrently on the current task. Once the first expression
completes with a value that matches its <pattern>
, the select!
macro
returns the result of evaluating the completed branch’s <handler>
expression.
Additionally, each branch may include an optional if
precondition. If the
precondition returns false
, then the branch is disabled. The provided
<async expression>
is still evaluated but the resulting future is never
polled. This capability is useful when using select!
within a loop.
The complete lifecycle of a select!
expression is as follows:
- Evaluate all provided
<precondition>
expressions. If the precondition returnsfalse
, disable the branch for the remainder of the current call toselect!
. Re-enteringselect!
due to a loop clears the “disabled” state. - Aggregate the
<async expression>
s from each branch, including the disabled ones. If the branch is disabled,<async expression>
is still evaluated, but the resulting future is not polled. - Concurrently await on the results for all remaining
<async expression>
s. - Once an
<async expression>
returns a value, attempt to apply the value to the provided<pattern>
, if the pattern matches, evaluate<handler>
and return. If the pattern does not match, disable the current branch and for the remainder of the current call toselect!
. Continue from step 3. - If all branches are disabled, evaluate the
else
expression. If no else branch is provided, panic.
§Runtime characteristics
By running all async expressions on the current task, the expressions are
able to run concurrently but not in parallel. This means all
expressions are run on the same thread and if one branch blocks the thread,
all other expressions will be unable to continue. If parallelism is
required, spawn each async expression using tokio::spawn
and pass the
join handle to select!
.
§Fairness
By default, select!
randomly picks a branch to check first. This provides
some level of fairness when calling select!
in a loop with branches that
are always ready.
This behavior can be overridden by adding biased;
to the beginning of the
macro usage. See the examples for details. This will cause select
to poll
the futures in the order they appear from top to bottom. There are a few
reasons you may want this:
- The random number generation of
tokio::select!
has a non-zero CPU cost - Your futures may interact in a way where known polling order is significant
But there is an important caveat to this mode. It becomes your responsibility
to ensure that the polling order of your futures is fair. If for example you
are selecting between a stream and a shutdown future, and the stream has a
huge volume of messages and zero or nearly zero time between them, you should
place the shutdown future earlier in the select!
list to ensure that it is
always polled, and will not be ignored due to the stream being constantly
ready.
§Panics
The select!
macro panics if all branches are disabled and there is no
provided else
branch. A branch is disabled when the provided if
precondition returns false
or when the pattern does not match the
result of <async expression>
.
§Cancellation safety
When using select!
in a loop to receive messages from multiple sources,
you should make sure that the receive call is cancellation safe to avoid
losing messages. This section goes through various common methods and
describes whether they are cancel safe. The lists in this section are not
exhaustive.
The following methods are cancellation safe:
tokio::sync::mpsc::Receiver::recv
tokio::sync::mpsc::UnboundedReceiver::recv
tokio::sync::broadcast::Receiver::recv
tokio::sync::watch::Receiver::changed
tokio::net::TcpListener::accept
tokio::net::UnixListener::accept
tokio::signal::unix::Signal::recv
tokio::io::AsyncReadExt::read
on anyAsyncRead
tokio::io::AsyncReadExt::read_buf
on anyAsyncRead
tokio::io::AsyncWriteExt::write
on anyAsyncWrite
tokio::io::AsyncWriteExt::write_buf
on anyAsyncWrite
tokio_stream::StreamExt::next
on anyStream
futures::stream::StreamExt::next
on anyStream
The following methods are not cancellation safe and can lead to loss of data:
tokio::io::AsyncReadExt::read_exact
tokio::io::AsyncReadExt::read_to_end
tokio::io::AsyncReadExt::read_to_string
tokio::io::AsyncWriteExt::write_all
The following methods are not cancellation safe because they use a queue for fairness and cancellation makes you lose your place in the queue:
tokio::sync::Mutex::lock
tokio::sync::RwLock::read
tokio::sync::RwLock::write
tokio::sync::Semaphore::acquire
tokio::sync::Notify::notified
To determine whether your own methods are cancellation safe, look for the
location of uses of .await
. This is because when an asynchronous method is
cancelled, that always happens at an .await
. If your function behaves
correctly even if it is restarted while waiting at an .await
, then it is
cancellation safe.
Cancellation safety can be defined in the following way: If you have a
future that has not yet completed, then it must be a no-op to drop that
future and recreate it. This definition is motivated by the situation where
a select!
is used in a loop. Without this guarantee, you would lose your
progress when another branch completes and you restart the select!
by
going around the loop.
Be aware that cancelling something that is not cancellation safe is not necessarily wrong. For example, if you are cancelling a task because the application is shutting down, then you probably don’t care that partially read data is lost.
§Examples
Basic select with two branches.
async fn do_stuff_async() {
// async work
}
async fn more_async_work() {
// more here
}
#[tokio::main]
async fn main() {
tokio::select! {
_ = do_stuff_async() => {
println!("do_stuff_async() completed first")
}
_ = more_async_work() => {
println!("more_async_work() completed first")
}
};
}
Basic stream selecting.
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream1 = stream::iter(vec![1, 2, 3]);
let mut stream2 = stream::iter(vec![4, 5, 6]);
let next = tokio::select! {
v = stream1.next() => v.unwrap(),
v = stream2.next() => v.unwrap(),
};
assert!(next == 1 || next == 4);
}
Collect the contents of two streams. In this example, we rely on pattern
matching and the fact that stream::iter
is “fused”, i.e. once the stream
is complete, all calls to next()
return None
.
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream1 = stream::iter(vec![1, 2, 3]);
let mut stream2 = stream::iter(vec![4, 5, 6]);
let mut values = vec![];
loop {
tokio::select! {
Some(v) = stream1.next() => values.push(v),
Some(v) = stream2.next() => values.push(v),
else => break,
}
}
values.sort();
assert_eq!(&[1, 2, 3, 4, 5, 6], &values[..]);
}
Using the same future in multiple select!
expressions can be done by passing
a reference to the future. Doing so requires the future to be Unpin
. A
future can be made Unpin
by either using Box::pin
or stack pinning.
Here, a stream is consumed for at most 1 second.
use tokio_stream::{self as stream, StreamExt};
use tokio::time::{self, Duration};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![1, 2, 3]);
let sleep = time::sleep(Duration::from_secs(1));
tokio::pin!(sleep);
loop {
tokio::select! {
maybe_v = stream.next() => {
if let Some(v) = maybe_v {
println!("got = {}", v);
} else {
break;
}
}
_ = &mut sleep => {
println!("timeout");
break;
}
}
}
}
Joining two values using select!
.
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = oneshot::channel();
let (tx2, mut rx2) = oneshot::channel();
tokio::spawn(async move {
tx1.send("first").unwrap();
});
tokio::spawn(async move {
tx2.send("second").unwrap();
});
let mut a = None;
let mut b = None;
while a.is_none() || b.is_none() {
tokio::select! {
v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()),
v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()),
}
}
let res = (a.unwrap(), b.unwrap());
assert_eq!(res.0, "first");
assert_eq!(res.1, "second");
}
Using the biased;
mode to control polling order.
#[tokio::main]
async fn main() {
let mut count = 0u8;
loop {
tokio::select! {
// If you run this example without `biased;`, the polling order is
// pseudo-random, and the assertions on the value of count will
// (probably) fail.
biased;
_ = async {}, if count < 1 => {
count += 1;
assert_eq!(count, 1);
}
_ = async {}, if count < 2 => {
count += 1;
assert_eq!(count, 2);
}
_ = async {}, if count < 3 => {
count += 1;
assert_eq!(count, 3);
}
_ = async {}, if count < 4 => {
count += 1;
assert_eq!(count, 4);
}
else => {
break;
}
};
}
}
§Avoid racy if
preconditions
Given that if
preconditions are used to disable select!
branches, some
caution must be used to avoid missing values.
For example, here is incorrect usage of sleep
with if
. The objective
is to repeatedly run an asynchronous task for up to 50 milliseconds.
However, there is a potential for the sleep
completion to be missed.
use tokio::time::{self, Duration};
async fn some_async_work() {
// do work
}
#[tokio::main]
async fn main() {
let sleep = time::sleep(Duration::from_millis(50));
tokio::pin!(sleep);
while !sleep.is_elapsed() {
tokio::select! {
_ = &mut sleep, if !sleep.is_elapsed() => {
println!("operation timed out");
}
_ = some_async_work() => {
println!("operation completed");
}
}
}
panic!("This example shows how not to do it!");
}
In the above example, sleep.is_elapsed()
may return true
even if
sleep.poll()
never returned Ready
. This opens up a potential race
condition where sleep
expires between the while !sleep.is_elapsed()
check and the call to select!
resulting in the some_async_work()
call to
run uninterrupted despite the sleep having elapsed.
One way to write the above example without the race would be:
use tokio::time::{self, Duration};
async fn some_async_work() {
// do work
}
#[tokio::main]
async fn main() {
let sleep = time::sleep(Duration::from_millis(50));
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => {
println!("operation timed out");
break;
}
_ = some_async_work() => {
println!("operation completed");
}
}
}
}