summaryrefslogtreecommitdiff
path: root/src/connect.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/connect.rs')
-rw-r--r--src/connect.rs79
1 files changed, 67 insertions, 12 deletions
diff --git a/src/connect.rs b/src/connect.rs
index 1feb03e..cfebb29 100644
--- a/src/connect.rs
+++ b/src/connect.rs
@@ -1,9 +1,14 @@
+use std::future::Future;
+
use anyhow::Result;
-use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftTokioConnection};
-use mcproto_rs::protocol::HasPacketKind;
+use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftIo, CraftReader, CraftTokioConnection, CraftWrapper, CraftWriter};
+use mcproto_rs::protocol::{HasPacketKind, PacketDirection, State};
use mcproto_rs::v1_19_3::{Packet761, Packet761Kind, RawPacket761};
-use tokio::io::BufReader;
+use tokio::io::{AsyncRead, BufReader};
+use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpStream;
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
#[macro_export]
macro_rules! await_packet {
@@ -34,32 +39,69 @@ macro_rules! assert_packet {
)
};
}
+#[derive(Debug)]
+enum WriterAction {
+ Send(Packet761),
+ SetState(State),
+}
pub struct MinecraftClient {
- pub connection: CraftTokioConnection,
+ send_handle: JoinHandle<()>,
+ send_tx: mpsc::UnboundedSender<WriterAction>,
+ craft_reader: CraftReader<BufReader<OwnedReadHalf>>,
}
impl MinecraftClient {
pub fn from_stream(stream: TcpStream) -> Self {
let (read, write) = stream.into_split();
let bufread = BufReader::new(read);
+ let mut craft_writer = CraftWriter::wrap(write, PacketDirection::ClientBound);
+ let mut craft_reader = CraftReader::wrap(bufread, PacketDirection::ServerBound);
+ let (send_tx, mut send_recv) = mpsc::unbounded_channel::<WriterAction>();
+ let send_handle = tokio::spawn(async move {
+ loop {
+ match send_recv.recv().await {
+ None => break,
+ Some(WriterAction::Send(packet)) => {
+ if let Err(err) = craft_writer.write_packet_async(packet).await {
+ println!("Failed to send packet {:?}", err);
+ }
+ }
+ Some(WriterAction::SetState(state)) => {
+ craft_writer.set_state(state);
+ }
+ }
+ }
+ ()
+ });
MinecraftClient {
- connection: CraftTokioConnection::from_async((bufread, write), mcproto_rs::protocol::PacketDirection::ServerBound),
+ craft_reader,
+ send_tx,
+ send_handle,
}
}
+
+ pub async fn close_and_join(self) -> Result<()> {
+ drop(self.send_tx);
+ self.send_handle.await?;
+ // The reader is closed when the writer half is dropped by the send task.
+ Ok(())
+ }
+
+ pub fn set_state(&mut self, state: State) -> Result<()> {
+ self.send_tx.send(WriterAction::SetState(state))?;
+ self.craft_reader.set_state(state);
+ Ok(())
+ }
+
pub async fn read_next_packet(&mut self) -> Result<Option<Packet761>> {
- if let Some(raw) = self.connection.read_packet_async::<RawPacket761>().await? {
- println!("Client -> Server: {:?}", raw);
+ if let Some(raw) = self.craft_reader.read_packet_async::<RawPacket761>().await? {
+ // println!("Client -> Server: {:?}", raw);
Ok(Some(raw))
} else {
Ok(None)
}
}
- pub async fn send_packet(&mut self, packet: Packet761) -> Result<()> {
- println!("Server -> Client: {:?}", packet);
- self.connection.write_packet_async(packet).await?;
- Ok(())
- }
pub async fn wait_for_packet(&mut self, packet_kind: Packet761Kind) -> Result<Packet761> {
loop {
if let Some(packet) = self.read_next_packet().await? {
@@ -70,4 +112,17 @@ impl MinecraftClient {
}
}
}
+
+ pub fn send_packet(&self, packet: Packet761) -> Result<()> {
+ // println!("Server -> Client: {:?}", packet);
+ self.send_tx.send(WriterAction::Send(packet))?;
+ Ok(())
+ }
+
+ pub fn send_all_packets(&self, packets: Vec<Packet761>) -> Result<()> {
+ for x in packets {
+ self.send_packet(x)?;
+ }
+ Ok(())
+ }
}