pub struct AsyncFd<T: AsRawFd> { /* private fields */ }
Expand description
Associates an IO object backed by a Unix file descriptor with the tokio
reactor, allowing for readiness to be polled. The file descriptor must be of
a type that can be used with the OS polling facilities (ie, poll
, epoll
,
kqueue
, etc), such as a network socket or pipe, and the file descriptor
must have the nonblocking mode set to true.
Creating an AsyncFd registers the file descriptor with the current tokio Reactor, allowing you to directly await the file descriptor being readable or writable. Once registered, the file descriptor remains registered until the AsyncFd is dropped.
The AsyncFd takes ownership of an arbitrary object to represent the IO
object. It is intended that this object will handle closing the file
descriptor when it is dropped, avoiding resource leaks and ensuring that the
AsyncFd can clean up the registration before closing the file descriptor.
The AsyncFd::into_inner
function can be used to extract the inner object
to retake control from the tokio IO reactor.
The inner object is required to implement AsRawFd
. This file descriptor
must not change while AsyncFd
owns the inner object, i.e. the
AsRawFd::as_raw_fd
method on the inner type must always return the same
file descriptor when called multiple times. Failure to uphold this results
in unspecified behavior in the IO driver, which may include breaking
notifications for other sockets/etc.
Polling for readiness is done by calling the async functions readable
and writable
. These functions complete when the associated readiness
condition is observed. Any number of tasks can query the same AsyncFd
in
parallel, on the same or different conditions.
On some platforms, the readiness detecting mechanism relies on
edge-triggered notifications. This means that the OS will only notify Tokio
when the file descriptor transitions from not-ready to ready. For this to
work you should first try to read or write and only poll for readiness
if that fails with an error of std::io::ErrorKind::WouldBlock
.
Tokio internally tracks when it has received a ready notification, and when
readiness checking functions like readable
and writable
are called,
if the readiness flag is set, these async functions will complete
immediately. This however does mean that it is critical to ensure that this
ready flag is cleared when (and only when) the file descriptor ceases to be
ready. The AsyncFdReadyGuard
returned from readiness checking functions
serves this function; after calling a readiness-checking async function,
you must use this AsyncFdReadyGuard
to signal to tokio whether the file
descriptor is no longer in a ready state.
§Use with to a poll-based API
In some cases it may be desirable to use AsyncFd
from APIs similar to
TcpStream::poll_read_ready
. The AsyncFd::poll_read_ready
and
AsyncFd::poll_write_ready
functions are provided for this purpose.
Because these functions don’t create a future to hold their state, they have
the limitation that only one task can wait on each direction (read or write)
at a time.
§Examples
This example shows how to turn std::net::TcpStream
asynchronous using
AsyncFd
. It implements the read/write operations both as an async fn
and using the IO traits AsyncRead
and AsyncWrite
.
use futures::ready;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::unix::AsyncFd;
pub struct AsyncTcpStream {
inner: AsyncFd<TcpStream>,
}
impl AsyncTcpStream {
pub fn new(tcp: TcpStream) -> io::Result<Self> {
tcp.set_nonblocking(true)?;
Ok(Self {
inner: AsyncFd::new(tcp)?,
})
}
pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
loop {
let mut guard = self.inner.readable().await?;
match guard.try_io(|inner| inner.get_ref().read(out)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
loop {
let mut guard = self.inner.writable().await?;
match guard.try_io(|inner| inner.get_ref().write(buf)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
}
impl AsyncRead for AsyncTcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>
) -> Poll<io::Result<()>> {
loop {
let mut guard = ready!(self.inner.poll_read_ready(cx))?;
let unfilled = buf.initialize_unfilled();
match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
Ok(Ok(len)) => {
buf.advance(len);
return Poll::Ready(Ok(()));
},
Ok(Err(err)) => return Poll::Ready(Err(err)),
Err(_would_block) => continue,
}
}
}
}
impl AsyncWrite for AsyncTcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<io::Result<usize>> {
loop {
let mut guard = ready!(self.inner.poll_write_ready(cx))?;
match guard.try_io(|inner| inner.get_ref().write(buf)) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
// tcp flush is a no-op
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
Poll::Ready(Ok(()))
}
}
Implementations§
source§impl<T: AsRawFd> AsyncFd<T>
impl<T: AsRawFd> AsyncFd<T>
sourcepub fn new(inner: T) -> Result<Self>where
T: AsRawFd,
pub fn new(inner: T) -> Result<Self>where
T: AsRawFd,
Creates an AsyncFd backed by (and taking ownership of) an object
implementing AsRawFd
. The backing file descriptor is cached at the
time of creation.
Only configures the Interest::READABLE
and Interest::WRITABLE
interests. For more
control, use AsyncFd::with_interest
.
This method must be called in the context of a tokio runtime.
§Panics
This function panics if there is no current reactor set, or if the rt
feature flag is not enabled.
sourcepub fn with_interest(inner: T, interest: Interest) -> Result<Self>where
T: AsRawFd,
pub fn with_interest(inner: T, interest: Interest) -> Result<Self>where
T: AsRawFd,
sourcepub fn get_mut(&mut self) -> &mut T
pub fn get_mut(&mut self) -> &mut T
Returns a mutable reference to the backing object of this AsyncFd
.
sourcepub fn into_inner(self) -> T
pub fn into_inner(self) -> T
Deregisters this file descriptor and returns ownership of the backing object.
sourcepub fn poll_read_ready<'a>(
&'a self,
cx: &mut Context<'_>,
) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>
pub fn poll_read_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>
Polls for read readiness.
If the file descriptor is not currently ready for reading, this method
will store a clone of the Waker
from the provided Context
. When the
file descriptor becomes ready for reading, Waker::wake
will be called.
Note that on multiple calls to poll_read_ready
or
poll_read_ready_mut
, only the Waker
from the Context
passed to the
most recent call is scheduled to receive a wakeup. (However,
poll_write_ready
retains a second, independent waker).
This method is intended for cases where creating and pinning a future
via readable
is not feasible. Where possible, using readable
is
preferred, as this supports polling from multiple tasks at once.
This method takes &self
, so it is possible to call this method
concurrently with other methods on this struct. This method only
provides shared access to the inner IO resource when handling the
AsyncFdReadyGuard
.
sourcepub fn poll_read_ready_mut<'a>(
&'a mut self,
cx: &mut Context<'_>,
) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>
pub fn poll_read_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>
Polls for read readiness.
If the file descriptor is not currently ready for reading, this method
will store a clone of the Waker
from the provided Context
. When the
file descriptor becomes ready for reading, Waker::wake
will be called.
Note that on multiple calls to poll_read_ready
or
poll_read_ready_mut
, only the Waker
from the Context
passed to the
most recent call is scheduled to receive a wakeup. (However,
poll_write_ready
retains a second, independent waker).
This method is intended for cases where creating and pinning a future
via readable
is not feasible. Where possible, using readable
is
preferred, as this supports polling from multiple tasks at once.
This method takes &mut self
, so it is possible to access the inner IO
resource mutably when handling the AsyncFdReadyMutGuard
.
sourcepub fn poll_write_ready<'a>(
&'a self,
cx: &mut Context<'_>,
) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>
pub fn poll_write_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>
Polls for write readiness.
If the file descriptor is not currently ready for writing, this method
will store a clone of the Waker
from the provided Context
. When the
file descriptor becomes ready for writing, Waker::wake
will be called.
Note that on multiple calls to poll_write_ready
or
poll_write_ready_mut
, only the Waker
from the Context
passed to the
most recent call is scheduled to receive a wakeup. (However,
poll_read_ready
retains a second, independent waker).
This method is intended for cases where creating and pinning a future
via writable
is not feasible. Where possible, using writable
is
preferred, as this supports polling from multiple tasks at once.
This method takes &self
, so it is possible to call this method
concurrently with other methods on this struct. This method only
provides shared access to the inner IO resource when handling the
AsyncFdReadyGuard
.
sourcepub fn poll_write_ready_mut<'a>(
&'a mut self,
cx: &mut Context<'_>,
) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>
pub fn poll_write_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>
Polls for write readiness.
If the file descriptor is not currently ready for writing, this method
will store a clone of the Waker
from the provided Context
. When the
file descriptor becomes ready for writing, Waker::wake
will be called.
Note that on multiple calls to poll_write_ready
or
poll_write_ready_mut
, only the Waker
from the Context
passed to the
most recent call is scheduled to receive a wakeup. (However,
poll_read_ready
retains a second, independent waker).
This method is intended for cases where creating and pinning a future
via writable
is not feasible. Where possible, using writable
is
preferred, as this supports polling from multiple tasks at once.
This method takes &mut self
, so it is possible to access the inner IO
resource mutably when handling the AsyncFdReadyMutGuard
.
sourcepub async fn ready(
&self,
interest: Interest,
) -> Result<AsyncFdReadyGuard<'_, T>>
pub async fn ready( &self, interest: Interest, ) -> Result<AsyncFdReadyGuard<'_, T>>
Waits for any of the requested ready states, returning a
AsyncFdReadyGuard
that must be dropped to resume
polling for the requested ready states.
The function may complete without the file descriptor being ready. This is a
false-positive and attempting an operation will return with
io::ErrorKind::WouldBlock
. The function can also return with an empty
Ready
set, so you should always check the returned value and possibly
wait again if the requested states are not set.
When an IO operation does return io::ErrorKind::WouldBlock
, the readiness must be cleared.
When a combined interest is used, it is important to clear only the readiness
that is actually observed to block. For instance when the combined
interest Interest::READABLE | Interest::WRITABLE
is used, and a read blocks, only
read readiness should be cleared using the AsyncFdReadyGuard::clear_ready_matching
method:
guard.clear_ready_matching(Ready::READABLE)
.
Also clearing the write readiness in this case would be incorrect. The AsyncFdReadyGuard::clear_ready
method clears all readiness flags.
This method takes &self
, so it is possible to call this method
concurrently with other methods on this struct. This method only
provides shared access to the inner IO resource when handling the
AsyncFdReadyGuard
.
§Examples
Concurrently read and write to a std::net::TcpStream
on the same task without
splitting.
use std::error::Error;
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;
use tokio::io::unix::AsyncFd;
use tokio::io::{Interest, Ready};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let stream = TcpStream::connect("127.0.0.1:8080")?;
stream.set_nonblocking(true)?;
let stream = AsyncFd::new(stream)?;
loop {
let mut guard = stream
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
if guard.ready().is_readable() {
let mut data = vec![0; 1024];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.get_ref().read(&mut data) {
Ok(n) => {
println!("read {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// a read has blocked, but a write might still succeed.
// clear only the read readiness.
guard.clear_ready_matching(Ready::READABLE);
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
if guard.ready().is_writable() {
// Try to write data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.get_ref().write(b"hello world") {
Ok(n) => {
println!("write {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// a write has blocked, but a read might still succeed.
// clear only the write readiness.
guard.clear_ready_matching(Ready::WRITABLE);
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
}
sourcepub async fn ready_mut(
&mut self,
interest: Interest,
) -> Result<AsyncFdReadyMutGuard<'_, T>>
pub async fn ready_mut( &mut self, interest: Interest, ) -> Result<AsyncFdReadyMutGuard<'_, T>>
Waits for any of the requested ready states, returning a
AsyncFdReadyMutGuard
that must be dropped to resume
polling for the requested ready states.
The function may complete without the file descriptor being ready. This is a
false-positive and attempting an operation will return with
io::ErrorKind::WouldBlock
. The function can also return with an empty
Ready
set, so you should always check the returned value and possibly
wait again if the requested states are not set.
When an IO operation does return io::ErrorKind::WouldBlock
, the readiness must be cleared.
When a combined interest is used, it is important to clear only the readiness
that is actually observed to block. For instance when the combined
interest Interest::READABLE | Interest::WRITABLE
is used, and a read blocks, only
read readiness should be cleared using the AsyncFdReadyMutGuard::clear_ready_matching
method:
guard.clear_ready_matching(Ready::READABLE)
.
Also clearing the write readiness in this case would be incorrect.
The AsyncFdReadyMutGuard::clear_ready
method clears all readiness flags.
This method takes &mut self
, so it is possible to access the inner IO
resource mutably when handling the AsyncFdReadyMutGuard
.
§Examples
Concurrently read and write to a std::net::TcpStream
on the same task without
splitting.
use std::error::Error;
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;
use tokio::io::unix::AsyncFd;
use tokio::io::{Interest, Ready};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let stream = TcpStream::connect("127.0.0.1:8080")?;
stream.set_nonblocking(true)?;
let mut stream = AsyncFd::new(stream)?;
loop {
let mut guard = stream
.ready_mut(Interest::READABLE | Interest::WRITABLE)
.await?;
if guard.ready().is_readable() {
let mut data = vec![0; 1024];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match guard.get_inner_mut().read(&mut data) {
Ok(n) => {
println!("read {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// a read has blocked, but a write might still succeed.
// clear only the read readiness.
guard.clear_ready_matching(Ready::READABLE);
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
if guard.ready().is_writable() {
// Try to write data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match guard.get_inner_mut().write(b"hello world") {
Ok(n) => {
println!("write {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// a write has blocked, but a read might still succeed.
// clear only the write readiness.
guard.clear_ready_matching(Ready::WRITABLE);
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
}
sourcepub async fn readable<'a>(&'a self) -> Result<AsyncFdReadyGuard<'a, T>>
pub async fn readable<'a>(&'a self) -> Result<AsyncFdReadyGuard<'a, T>>
Waits for the file descriptor to become readable, returning a
AsyncFdReadyGuard
that must be dropped to resume read-readiness
polling.
This method takes &self
, so it is possible to call this method
concurrently with other methods on this struct. This method only
provides shared access to the inner IO resource when handling the
AsyncFdReadyGuard
.
sourcepub async fn readable_mut<'a>(
&'a mut self,
) -> Result<AsyncFdReadyMutGuard<'a, T>>
pub async fn readable_mut<'a>( &'a mut self, ) -> Result<AsyncFdReadyMutGuard<'a, T>>
Waits for the file descriptor to become readable, returning a
AsyncFdReadyMutGuard
that must be dropped to resume read-readiness
polling.
This method takes &mut self
, so it is possible to access the inner IO
resource mutably when handling the AsyncFdReadyMutGuard
.
sourcepub async fn writable<'a>(&'a self) -> Result<AsyncFdReadyGuard<'a, T>>
pub async fn writable<'a>(&'a self) -> Result<AsyncFdReadyGuard<'a, T>>
Waits for the file descriptor to become writable, returning a
AsyncFdReadyGuard
that must be dropped to resume write-readiness
polling.
This method takes &self
, so it is possible to call this method
concurrently with other methods on this struct. This method only
provides shared access to the inner IO resource when handling the
AsyncFdReadyGuard
.
sourcepub async fn writable_mut<'a>(
&'a mut self,
) -> Result<AsyncFdReadyMutGuard<'a, T>>
pub async fn writable_mut<'a>( &'a mut self, ) -> Result<AsyncFdReadyMutGuard<'a, T>>
Waits for the file descriptor to become writable, returning a
AsyncFdReadyMutGuard
that must be dropped to resume write-readiness
polling.
This method takes &mut self
, so it is possible to access the inner IO
resource mutably when handling the AsyncFdReadyMutGuard
.
sourcepub async fn async_io<R>(
&self,
interest: Interest,
f: impl FnMut(&T) -> Result<R>,
) -> Result<R>
pub async fn async_io<R>( &self, interest: Interest, f: impl FnMut(&T) -> Result<R>, ) -> Result<R>
Reads or writes from the file descriptor using a user-provided IO operation.
The async_io
method is a convenience utility that waits for the file
descriptor to become ready, and then executes the provided IO operation.
Since file descriptors may be marked ready spuriously, the closure will
be called repeatedly until it returns something other than a
WouldBlock
error. This is done using the following loop:
async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
loop {
// or `readable` if called with the read interest.
let guard = self.writable().await?;
match guard.try_io(&mut f) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
The closure should only return a WouldBlock
error if it has performed
an IO operation on the file descriptor that failed due to the file descriptor not being
ready. Returning a WouldBlock
error in any other situation will
incorrectly clear the readiness flag, which can cause the file descriptor to
behave incorrectly.
The closure should not perform the IO operation using any of the methods
defined on the Tokio AsyncFd
type, as this will mess with the
readiness flag and can cause the file descriptor to behave incorrectly.
This method is not intended to be used with combined interests. The closure should perform only one type of IO operation, so it should not require more than one ready state. This method may panic or sleep forever if it is called with a combined interest.
§Examples
This example sends some bytes on the inner std::net::UdpSocket
. The async_io
method waits for readiness, and retries if the send operation does block. This example
is equivalent to the one given for try_io
.
use tokio::io::{Interest, unix::AsyncFd};
use std::io;
use std::net::UdpSocket;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = UdpSocket::bind("0.0.0.0:8080")?;
socket.set_nonblocking(true)?;
let async_fd = AsyncFd::new(socket)?;
let written = async_fd
.async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
.await?;
println!("wrote {written} bytes");
Ok(())
}