use std::future::Future; use anyhow::Result; use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftIo, CraftReader, CraftTokioConnection, CraftWrapper, CraftWriter}; use mcproto_rs::protocol::{HasPacketKind, PacketDirection, State}; use mcproto_rs::v1_19_3::{Packet761, Packet761Kind, RawPacket761}; use tokio::io::{AsyncRead, BufReader}; use tokio::net::tcp::OwnedReadHalf; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::task::JoinHandle; #[macro_export] macro_rules! await_packet { ($packet_type:ident, $conn:expr) => { assert_packet!( $packet_type, $conn.wait_for_packet(Packet761Kind::$packet_type).await? ) }; } #[macro_export] macro_rules! assert_packet { ($packet_type:ident, $obj:expr) => { if let Packet761::$packet_type(packet_data) = $obj { packet_data } else { panic!("Expected packet of type {}", stringify!($packet_type)) } }; ($packet_type:ident, $conn:expr, $errmgs:literal) => { assert_packet!( $packet_type, $conn .read_next_packet() .await? .ok_or(anyhow::anyhow!("Missing packet"))? ) }; } #[derive(Debug)] enum WriterAction { Send(Packet761), SetState(State), } pub struct MinecraftClient { send_handle: JoinHandle<()>, send_tx: mpsc::UnboundedSender, craft_reader: CraftReader>, } impl MinecraftClient { pub fn from_stream(stream: TcpStream) -> Self { let (read, write) = stream.into_split(); let bufread = BufReader::new(read); let mut craft_writer = CraftWriter::wrap(write, PacketDirection::ClientBound); let mut craft_reader = CraftReader::wrap(bufread, PacketDirection::ServerBound); let (send_tx, mut send_recv) = mpsc::unbounded_channel::(); let send_handle = tokio::spawn(async move { loop { match send_recv.recv().await { None => break, Some(WriterAction::Send(packet)) => { if let Err(err) = craft_writer.write_packet_async(packet).await { println!("Failed to send packet {:?}", err); } } Some(WriterAction::SetState(state)) => { craft_writer.set_state(state); } } } () }); MinecraftClient { craft_reader, send_tx, send_handle, } } pub async fn close_and_join(self) -> Result<()> { drop(self.send_tx); self.send_handle.await?; // The reader is closed when the writer half is dropped by the send task. Ok(()) } pub fn set_state(&mut self, state: State) -> Result<()> { self.send_tx.send(WriterAction::SetState(state))?; self.craft_reader.set_state(state); Ok(()) } pub async fn read_next_packet(&mut self) -> Result> { if let Some(raw) = self.craft_reader.read_packet_async::().await? { // println!("Client -> Server: {:?}", raw); Ok(Some(raw)) } else { Ok(None) } } pub async fn wait_for_packet(&mut self, packet_kind: Packet761Kind) -> Result { loop { if let Some(packet) = self.read_next_packet().await? { if packet.kind() == packet_kind { return Ok(packet); } println!("Skipping packet {:?}", packet); } } } pub fn send_packet(&self, packet: Packet761) -> Result<()> { // println!("Server -> Client: {:?}", packet); self.send_tx.send(WriterAction::Send(packet))?; Ok(()) } pub fn send_all_packets(&self, packets: Vec) -> Result<()> { for x in packets { self.send_packet(x)?; } Ok(()) } }