diff options
Diffstat (limited to 'src/connect.rs')
-rw-r--r-- | src/connect.rs | 79 |
1 files changed, 67 insertions, 12 deletions
diff --git a/src/connect.rs b/src/connect.rs index 1feb03e..cfebb29 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,9 +1,14 @@ +use std::future::Future; + use anyhow::Result; -use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftTokioConnection}; -use mcproto_rs::protocol::HasPacketKind; +use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftIo, CraftReader, CraftTokioConnection, CraftWrapper, CraftWriter}; +use mcproto_rs::protocol::{HasPacketKind, PacketDirection, State}; use mcproto_rs::v1_19_3::{Packet761, Packet761Kind, RawPacket761}; -use tokio::io::BufReader; +use tokio::io::{AsyncRead, BufReader}; +use tokio::net::tcp::OwnedReadHalf; use tokio::net::TcpStream; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; #[macro_export] macro_rules! await_packet { @@ -34,32 +39,69 @@ macro_rules! assert_packet { ) }; } +#[derive(Debug)] +enum WriterAction { + Send(Packet761), + SetState(State), +} pub struct MinecraftClient { - pub connection: CraftTokioConnection, + send_handle: JoinHandle<()>, + send_tx: mpsc::UnboundedSender<WriterAction>, + craft_reader: CraftReader<BufReader<OwnedReadHalf>>, } impl MinecraftClient { pub fn from_stream(stream: TcpStream) -> Self { let (read, write) = stream.into_split(); let bufread = BufReader::new(read); + let mut craft_writer = CraftWriter::wrap(write, PacketDirection::ClientBound); + let mut craft_reader = CraftReader::wrap(bufread, PacketDirection::ServerBound); + let (send_tx, mut send_recv) = mpsc::unbounded_channel::<WriterAction>(); + let send_handle = tokio::spawn(async move { + loop { + match send_recv.recv().await { + None => break, + Some(WriterAction::Send(packet)) => { + if let Err(err) = craft_writer.write_packet_async(packet).await { + println!("Failed to send packet {:?}", err); + } + } + Some(WriterAction::SetState(state)) => { + craft_writer.set_state(state); + } + } + } + () + }); MinecraftClient { - connection: CraftTokioConnection::from_async((bufread, write), mcproto_rs::protocol::PacketDirection::ServerBound), + craft_reader, + send_tx, + send_handle, } } + + pub async fn close_and_join(self) -> Result<()> { + drop(self.send_tx); + self.send_handle.await?; + // The reader is closed when the writer half is dropped by the send task. + Ok(()) + } + + pub fn set_state(&mut self, state: State) -> Result<()> { + self.send_tx.send(WriterAction::SetState(state))?; + self.craft_reader.set_state(state); + Ok(()) + } + pub async fn read_next_packet(&mut self) -> Result<Option<Packet761>> { - if let Some(raw) = self.connection.read_packet_async::<RawPacket761>().await? { - println!("Client -> Server: {:?}", raw); + if let Some(raw) = self.craft_reader.read_packet_async::<RawPacket761>().await? { + // println!("Client -> Server: {:?}", raw); Ok(Some(raw)) } else { Ok(None) } } - pub async fn send_packet(&mut self, packet: Packet761) -> Result<()> { - println!("Server -> Client: {:?}", packet); - self.connection.write_packet_async(packet).await?; - Ok(()) - } pub async fn wait_for_packet(&mut self, packet_kind: Packet761Kind) -> Result<Packet761> { loop { if let Some(packet) = self.read_next_packet().await? { @@ -70,4 +112,17 @@ impl MinecraftClient { } } } + + pub fn send_packet(&self, packet: Packet761) -> Result<()> { + // println!("Server -> Client: {:?}", packet); + self.send_tx.send(WriterAction::Send(packet))?; + Ok(()) + } + + pub fn send_all_packets(&self, packets: Vec<Packet761>) -> Result<()> { + for x in packets { + self.send_packet(x)?; + } + Ok(()) + } } |