hyperlocal/server.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
use hyper::{
body::{Body, Incoming},
service::service_fn,
Request, Response,
};
use hyper_util::rt::TokioIo;
use std::future::Future;
use tokio::net::UnixListener;
/// Extension trait for provisioning a hyper HTTP server over a Unix domain
/// socket.
///
/// # Example
///
/// ```rust
/// use hyper::Response;
/// use hyperlocal::UnixListenerExt;
/// use tokio::net::UnixListener;
///
/// let future = async move {
/// let listener = UnixListener::bind("/tmp/hyperlocal.sock").expect("parsed unix path");
///
/// listener
/// .serve(|| {
/// |_request| async {
/// Ok::<_, hyper::Error>(Response::new("Hello, world.".to_string()))
/// }
/// })
/// .await
/// .expect("failed to serve a connection")
/// };
/// ```
pub trait UnixListenerExt {
/// Indefinitely accept and respond to connections.
///
/// Pass a function which will generate the function which responds to
/// all requests for an individual connection.
fn serve<MakeResponseFn, ResponseFn, ResponseFuture, B, E>(
self,
f: MakeResponseFn,
) -> impl Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
where
MakeResponseFn: Fn() -> ResponseFn,
ResponseFn: Fn(Request<Incoming>) -> ResponseFuture,
ResponseFuture: Future<Output = Result<Response<B>, E>>,
B: Body + 'static,
<B as Body>::Error: std::error::Error + Send + Sync,
E: std::error::Error + Send + Sync + 'static;
}
impl UnixListenerExt for UnixListener {
fn serve<MakeServiceFn, ResponseFn, ResponseFuture, B, E>(
self,
f: MakeServiceFn,
) -> impl Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
where
MakeServiceFn: Fn() -> ResponseFn,
ResponseFn: Fn(Request<Incoming>) -> ResponseFuture,
ResponseFuture: Future<Output = Result<Response<B>, E>>,
B: Body + 'static,
<B as Body>::Error: std::error::Error + Send + Sync,
E: std::error::Error + Send + Sync + 'static,
{
async move {
loop {
let (stream, _) = self.accept().await?;
let io = TokioIo::new(stream);
let svc_fn = service_fn(f());
hyper::server::conn::http1::Builder::new()
// On OSX, disabling keep alive prevents serve_connection from
// blocking and later returning an Err derived from E_NOTCONN.
.keep_alive(false)
.serve_connection(io, svc_fn)
.await?;
}
}
}
}