diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/connection.rs | 31 | ||||
-rw-r--r-- | src/reader.rs | 71 | ||||
-rw-r--r-- | src/tcp.rs | 33 | ||||
-rw-r--r-- | src/writer.rs | 46 |
4 files changed, 138 insertions, 43 deletions
diff --git a/src/connection.rs b/src/connection.rs index af6f4a9..4da97bb 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,11 +1,14 @@ use crate::cfb8::CipherError; -use crate::reader::{CraftAsyncReader, CraftReader, CraftSyncReader, ReadResult}; +use crate::reader::{CraftReader, CraftSyncReader, ReadResult}; use crate::wrapper::{CraftIo, CraftWrapper}; -use crate::writer::{CraftAsyncWriter, CraftSyncWriter, CraftWriter, WriteResult}; +use crate::writer::{CraftSyncWriter, CraftWriter, WriteResult}; use mcproto_rs::protocol::{Packet, RawPacket, State}; -#[cfg(feature = "async")] -use async_trait::async_trait; +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +use { + crate::{reader::CraftAsyncReader, writer::CraftAsyncWriter}, + async_trait::async_trait +}; pub struct CraftConnection<R, W> { pub(crate) reader: CraftReader<R>, @@ -76,7 +79,7 @@ where } } -#[cfg(feature = "async")] +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] #[async_trait] impl<R, W> CraftAsyncReader for CraftConnection<R, W> where @@ -85,22 +88,22 @@ where CraftWriter<W>: CraftAsyncWriter, W: Send + Sync, { - async fn read_packet<'a, P>(&'a mut self) -> ReadResult<<P as RawPacket<'a>>::Packet> + async fn read_packet_async<'a, P>(&'a mut self) -> ReadResult<<P as RawPacket<'a>>::Packet> where P: RawPacket<'a>, { - self.reader.read_packet::<P>().await + self.reader.read_packet_async::<P>().await } - async fn read_raw_packet<'a, P>(&'a mut self) -> ReadResult<P> + async fn read_raw_packet_async<'a, P>(&'a mut self) -> ReadResult<P> where P: RawPacket<'a>, { - self.reader.read_raw_packet::<P>().await + self.reader.read_raw_packet_async::<P>().await } } -#[cfg(feature = "async")] +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] #[async_trait] impl<R, W> CraftAsyncWriter for CraftConnection<R, W> where @@ -109,17 +112,17 @@ where CraftWriter<W>: CraftAsyncWriter, W: Send + Sync, { - async fn write_packet<P>(&mut self, packet: P) -> WriteResult<()> + async fn write_packet_async<P>(&mut self, packet: P) -> WriteResult<()> where P: Packet + Send + Sync, { - self.writer.write_packet(packet).await + self.writer.write_packet_async(packet).await } - async fn write_raw_packet<'a, P>(&mut self, packet: P) -> WriteResult<()> + async fn write_raw_packet_async<'a, P>(&mut self, packet: P) -> WriteResult<()> where P: RawPacket<'a> + Send + Sync, { - self.writer.write_raw_packet(packet).await + self.writer.write_raw_packet_async(packet).await } } diff --git a/src/reader.rs b/src/reader.rs index dd27440..5b302aa 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -9,8 +9,8 @@ use std::backtrace::Backtrace; use std::io; use thiserror::Error; -#[cfg(feature = "async")] -use {async_trait::async_trait, futures::AsyncReadExt}; +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +use async_trait::async_trait; #[derive(Debug, Error)] pub enum ReadError { @@ -50,17 +50,17 @@ pub enum DecompressErr { pub type ReadResult<P> = Result<Option<P>, ReadError>; -#[cfg(feature = "async")] +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] #[async_trait] pub trait CraftAsyncReader { - async fn read_packet<'a, P>(&'a mut self) -> ReadResult<<P as RawPacket<'a>>::Packet> + async fn read_packet_async<'a, P>(&'a mut self) -> ReadResult<<P as RawPacket<'a>>::Packet> where P: RawPacket<'a>, { - deserialize_raw_packet(self.read_raw_packet::<P>().await) + deserialize_raw_packet(self.read_raw_packet_async::<P>().await) } - async fn read_raw_packet<'a, P>(&'a mut self) -> ReadResult<P> + async fn read_raw_packet_async<'a, P>(&'a mut self) -> ReadResult<P> where P: RawPacket<'a>; } @@ -147,13 +147,13 @@ where } } -#[cfg(feature = "async")] +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] #[async_trait] impl<R> CraftAsyncReader for CraftReader<R> where - R: futures::AsyncRead + Unpin + Sync + Send, + R: AsyncReadExact, { - async fn read_raw_packet<'a, P>(&'a mut self) -> ReadResult<P> + async fn read_raw_packet_async<'a, P>(&'a mut self) -> ReadResult<P> where P: RawPacket<'a>, { @@ -190,10 +190,10 @@ where } } -#[cfg(feature = "async")] +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] impl<R> CraftReader<R> where - R: futures::io::AsyncRead + Unpin + Sync + Send, + R: AsyncReadExact, { async fn read_packet_len_async(&mut self) -> ReadResult<VarInt> { self.move_ready_data_to_front(); @@ -218,6 +218,55 @@ where } } +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +pub trait IntoBufferedAsyncRead { + + type Target: AsyncReadExact; + + fn into_buffered(self, capacity: usize) -> Self::Target; +} + +#[cfg(all(feature = "futures-io", not(feature = "tokio-io")))] +impl<R> IntoBufferedAsyncRead for R where R: futures::io::AsyncRead + Send + Sync + Unpin { + type Target = futures::io::BufReader<R>; + + fn into_buffered(self, capacity: usize) -> Self::Target { + futures::io::BufReader::with_capacity(capacity, self) + } +} + +#[cfg(feature = "tokio-io")] +impl<R> IntoBufferedAsyncRead for R where R: tokio::io::AsyncRead + Send + Sync + Unpin { + type Target = tokio::io::BufReader<R>; + + fn into_buffered(self, capacity: usize) -> Self::Target { + tokio::io::BufReader::with_capacity(capacity, self) + } +} + +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +#[async_trait] +pub trait AsyncReadExact: Unpin + Sync + Send { + async fn read_exact(&mut self, to: &mut [u8]) -> Result<(), io::Error>; +} + +#[cfg(all(feature = "futures-io", not(feature = "tokio-io")))] +#[async_trait] +impl<R> AsyncReadExact for R where R: futures::AsyncReadExt + Unpin + Sync + Send { + async fn read_exact(&mut self, to: &mut [u8]) -> Result<(), io::Error> { + futures::AsyncReadExt::read_exact(self, to).await + } +} + +#[cfg(feature = "tokio-io")] +#[async_trait] +impl<R> AsyncReadExact for R where R: tokio::io::AsyncRead + Unpin + Sync + Send { + async fn read_exact(&mut self, to: &mut [u8]) -> Result<(), io::Error> { + tokio::io::AsyncReadExt::read_exact(self, to).await?; + Ok(()) + } +} + macro_rules! dsz_unwrap { ($bnam: expr, $k: ty) => { match <$k>::mc_deserialize($bnam) { @@ -5,8 +5,8 @@ use mcproto_rs::protocol::{PacketDirection, State}; use std::io::BufReader as StdBufReader; use std::net::TcpStream; -#[cfg(feature = "async")] -use futures::io::{AsyncRead, AsyncWrite, BufReader as AsyncBufReader}; +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +use crate::{CraftAsyncWriter, CraftAsyncReader, IntoBufferedAsyncRead}; pub const BUF_SIZE: usize = 8192; @@ -43,12 +43,32 @@ impl CraftConnection<StdBufReader<TcpStream>, TcpStream> { } } -#[cfg(feature = "async")] -impl<R, W> CraftConnection<AsyncBufReader<R>, W> +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +impl<R, W> CraftConnection<R, W> where - R: AsyncRead + Send + Sync + Unpin, - W: AsyncWrite + Send + Sync + Unpin, + CraftReader<R>: CraftAsyncReader, + CraftWriter<W>: CraftAsyncWriter, { + pub fn from_unbuffered_async<U>(tuple: (U, W), read_direction: PacketDirection) -> Self + where + U: IntoBufferedAsyncRead<Target=R>, + { + Self::from_unbuffered_async_with_state(tuple, read_direction, State::Handshaking) + } + + pub fn from_unbuffered_async_with_state<U>( + tuple: (U, W), + read_direction: PacketDirection, + state: State, + ) -> Self + where + U: IntoBufferedAsyncRead<Target=R>, + { + let (ru, writer) = tuple; + let reader = ru.into_buffered(BUF_SIZE); + Self::from_async_with_state((reader, writer), read_direction, state) + } + pub fn from_async(tuple: (R, W), read_direction: PacketDirection) -> Self { Self::from_async_with_state(tuple, read_direction, State::Handshaking) } @@ -59,7 +79,6 @@ where state: State, ) -> Self { let (reader, writer) = tuple; - let reader = AsyncBufReader::with_capacity(BUF_SIZE, reader); Self { reader: CraftReader::wrap_with_state(reader, read_direction, state), writer: CraftWriter::wrap_with_state(writer, read_direction.opposite(), state), diff --git a/src/writer.rs b/src/writer.rs index d4e6027..e1f71c3 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -9,8 +9,8 @@ use std::backtrace::Backtrace; use std::ops::{Deref, DerefMut}; use thiserror::Error; -#[cfg(feature = "async")] -use {async_trait::async_trait, futures::AsyncWriteExt}; +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +use async_trait::async_trait; #[derive(Debug, Error)] pub enum WriteError { @@ -90,14 +90,14 @@ impl Into<SerializeErr> for PacketSerializeFail { pub type WriteResult<P> = Result<P, WriteError>; -#[cfg(feature = "async")] +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] #[async_trait] pub trait CraftAsyncWriter { - async fn write_packet<P>(&mut self, packet: P) -> WriteResult<()> + async fn write_packet_async<P>(&mut self, packet: P) -> WriteResult<()> where P: Packet + Send + Sync; - async fn write_raw_packet<'a, P>(&mut self, packet: P) -> WriteResult<()> + async fn write_raw_packet_async<'a, P>(&mut self, packet: P) -> WriteResult<()> where P: RawPacket<'a> + Send + Sync; } @@ -174,13 +174,37 @@ where target.write_all(data) } -#[cfg(feature = "async")] +#[cfg(any(feature = "tokio-io", feature = "futures-io"))] +#[async_trait] +pub trait AsyncWriteAll: Unpin + Send + Sync { + async fn write_all(&mut self, data: &[u8]) -> Result<(), std::io::Error>; +} + +#[cfg(all(feature = "futures-io", not(feature = "tokio-io")))] +#[async_trait] +impl<W> AsyncWriteAll for W where W: futures::AsyncWrite + Unpin + Send + Sync { + async fn write_all(&mut self, data: &[u8]) -> Result<(), std::io::Error> { + futures::AsyncWriteExt::write_all(self, data).await?; + Ok(()) + } +} + +#[cfg(feature = "tokio-io")] +#[async_trait] +impl<W> AsyncWriteAll for W where W: tokio::io::AsyncWrite + Unpin + Send + Sync { + async fn write_all(&mut self, data: &[u8]) -> Result<(), std::io::Error> { + tokio::io::AsyncWriteExt::write_all(self, data).await?; + Ok(()) + } +} + +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] #[async_trait] impl<W> CraftAsyncWriter for CraftWriter<W> where - W: futures::AsyncWrite + Unpin + Send + Sync, + W: AsyncWriteAll, { - async fn write_packet<P>(&mut self, packet: P) -> WriteResult<()> + async fn write_packet_async<P>(&mut self, packet: P) -> WriteResult<()> where P: Packet + Send + Sync, { @@ -189,7 +213,7 @@ where Ok(()) } - async fn write_raw_packet<'a, P>(&mut self, packet: P) -> WriteResult<()> + async fn write_raw_packet_async<'a, P>(&mut self, packet: P) -> WriteResult<()> where P: RawPacket<'a> + Send + Sync, { @@ -199,12 +223,12 @@ where } } -#[cfg(feature = "async")] +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] async fn write_data_to_target_async<'a, W>( tuple: (&'a [u8], &'a mut W), ) -> Result<(), std::io::Error> where - W: futures::AsyncWrite + Unpin + Send + Sync, + W: AsyncWriteAll, { let (data, target) = tuple; target.write_all(data).await |