use crate::base::Message;
use crate::net::client::dgram;
use crate::net::client::multi_stream;
use crate::net::client::protocol::{
AsyncConnect, AsyncDgramRecv, AsyncDgramSend,
};
use crate::net::client::request::{
ComposeRequest, Error, GetResponse, SendRequest,
};
use bytes::Bytes;
use std::boxed::Box;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
#[derive(Clone, Debug, Default)]
pub struct Config {
dgram: dgram::Config,
multi_stream: multi_stream::Config,
}
impl Config {
pub fn new() -> Self {
Default::default()
}
pub fn from_parts(
dgram: dgram::Config,
multi_stream: multi_stream::Config,
) -> Self {
Self {
dgram,
multi_stream,
}
}
pub fn dgram(&self) -> &dgram::Config {
&self.dgram
}
pub fn dgram_mut(&mut self) -> &mut dgram::Config {
&mut self.dgram
}
pub fn set_dgram(&mut self, dgram: dgram::Config) {
self.dgram = dgram
}
pub fn stream(&self) -> &multi_stream::Config {
&self.multi_stream
}
pub fn stream_mut(&mut self) -> &mut multi_stream::Config {
&mut self.multi_stream
}
pub fn set_stream(&mut self, stream: multi_stream::Config) {
self.multi_stream = stream
}
}
#[derive(Clone, Debug)]
pub struct Connection<DgramS, Req> {
udp_conn: Arc<dgram::Connection<DgramS>>,
tcp_conn: multi_stream::Connection<Req>,
}
impl<DgramS, Req> Connection<DgramS, Req>
where
DgramS: AsyncConnect + Clone + Send + Sync + 'static,
DgramS::Connection:
AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin + 'static,
{
pub fn new<StreamS>(
dgram_remote: DgramS,
stream_remote: StreamS,
) -> (Self, multi_stream::Transport<StreamS, Req>) {
Self::with_config(dgram_remote, stream_remote, Default::default())
}
pub fn with_config<StreamS>(
dgram_remote: DgramS,
stream_remote: StreamS,
config: Config,
) -> (Self, multi_stream::Transport<StreamS, Req>) {
let udp_conn =
dgram::Connection::with_config(dgram_remote, config.dgram).into();
let (tcp_conn, transport) = multi_stream::Connection::with_config(
stream_remote,
config.multi_stream,
);
(Self { udp_conn, tcp_conn }, transport)
}
}
impl<DgramS, Req> SendRequest<Req> for Connection<DgramS, Req>
where
DgramS: AsyncConnect + Clone + Debug + Send + Sync + 'static,
DgramS::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
Req: ComposeRequest + Clone + 'static,
{
fn send_request(
&self,
request_msg: Req,
) -> Box<dyn GetResponse + Send + Sync> {
Box::new(Request::new(
request_msg,
self.udp_conn.clone(),
self.tcp_conn.clone(),
))
}
}
#[derive(Debug)]
pub struct Request<S, Req> {
request_msg: Req,
udp_conn: Arc<dgram::Connection<S>>,
tcp_conn: multi_stream::Connection<Req>,
state: QueryState,
}
#[derive(Debug)]
enum QueryState {
StartUdpRequest,
GetUdpResponse(Box<dyn GetResponse + Send + Sync>),
StartTcpRequest,
GetTcpResponse(Box<dyn GetResponse + Send + Sync>),
}
impl<S, Req> Request<S, Req>
where
S: AsyncConnect + Clone + Send + Sync + 'static,
Req: ComposeRequest + Clone + 'static,
{
fn new(
request_msg: Req,
udp_conn: Arc<dgram::Connection<S>>,
tcp_conn: multi_stream::Connection<Req>,
) -> Request<S, Req> {
Self {
request_msg,
udp_conn,
tcp_conn,
state: QueryState::StartUdpRequest,
}
}
async fn get_response_impl(&mut self) -> Result<Message<Bytes>, Error>
where
S::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
{
loop {
match &mut self.state {
QueryState::StartUdpRequest => {
let msg = self.request_msg.clone();
let request = self.udp_conn.send_request(msg);
self.state = QueryState::GetUdpResponse(request);
continue;
}
QueryState::GetUdpResponse(ref mut request) => {
let response = request.get_response().await?;
if response.header().tc() {
self.state = QueryState::StartTcpRequest;
continue;
}
return Ok(response);
}
QueryState::StartTcpRequest => {
let msg = self.request_msg.clone();
let request = self.tcp_conn.send_request(msg);
self.state = QueryState::GetTcpResponse(request);
continue;
}
QueryState::GetTcpResponse(ref mut query) => {
let response = query.get_response().await?;
return Ok(response);
}
}
}
}
}
impl<S, Req> GetResponse for Request<S, Req>
where
S: AsyncConnect + Clone + Debug + Send + Sync + 'static,
S::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
Req: ComposeRequest + Clone + 'static,
{
fn get_response(
&mut self,
) -> Pin<
Box<
dyn Future<Output = Result<Message<Bytes>, Error>>
+ Send
+ Sync
+ '_,
>,
> {
Box::pin(self.get_response_impl())
}
}