1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
use std::future::Future;
use anyhow::Result;
use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftIo, CraftReader, CraftTokioConnection, CraftWrapper, CraftWriter};
use mcproto_rs::protocol::{HasPacketKind, PacketDirection, State};
use mcproto_rs::v1_19_3::{Packet761, Packet761Kind, RawPacket761};
use tokio::io::{AsyncRead, BufReader};
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
#[macro_export]
macro_rules! await_packet {
($packet_type:ident, $conn:expr) => {
assert_packet!(
$packet_type,
$conn.wait_for_packet(Packet761Kind::$packet_type).await?
)
};
}
#[macro_export]
macro_rules! assert_packet {
($packet_type:ident, $obj:expr) => {
if let Packet761::$packet_type(packet_data) = $obj {
packet_data
} else {
panic!("Expected packet of type {}", stringify!($packet_type))
}
};
($packet_type:ident, $conn:expr, $errmgs:literal) => {
assert_packet!(
$packet_type,
$conn
.read_next_packet()
.await?
.ok_or(anyhow::anyhow!("Missing packet"))?
)
};
}
#[derive(Debug)]
enum WriterAction {
Send(Packet761),
SetState(State),
}
pub struct MinecraftClient {
send_handle: JoinHandle<()>,
send_tx: mpsc::UnboundedSender<WriterAction>,
craft_reader: CraftReader<BufReader<OwnedReadHalf>>,
}
impl MinecraftClient {
pub fn from_stream(stream: TcpStream) -> Self {
let (read, write) = stream.into_split();
let bufread = BufReader::new(read);
let mut craft_writer = CraftWriter::wrap(write, PacketDirection::ClientBound);
let mut craft_reader = CraftReader::wrap(bufread, PacketDirection::ServerBound);
let (send_tx, mut send_recv) = mpsc::unbounded_channel::<WriterAction>();
let send_handle = tokio::spawn(async move {
loop {
match send_recv.recv().await {
None => break,
Some(WriterAction::Send(packet)) => {
if let Err(err) = craft_writer.write_packet_async(packet).await {
println!("Failed to send packet {:?}", err);
}
}
Some(WriterAction::SetState(state)) => {
craft_writer.set_state(state);
}
}
}
()
});
MinecraftClient {
craft_reader,
send_tx,
send_handle,
}
}
pub async fn close_and_join(self) -> Result<()> {
drop(self.send_tx);
self.send_handle.await?;
// The reader is closed when the writer half is dropped by the send task.
Ok(())
}
pub fn set_state(&mut self, state: State) -> Result<()> {
self.send_tx.send(WriterAction::SetState(state))?;
self.craft_reader.set_state(state);
Ok(())
}
pub async fn read_next_packet(&mut self) -> Result<Option<Packet761>> {
if let Some(raw) = self.craft_reader.read_packet_async::<RawPacket761>().await? {
// println!("Client -> Server: {:?}", raw);
Ok(Some(raw))
} else {
Ok(None)
}
}
pub async fn wait_for_packet(&mut self, packet_kind: Packet761Kind) -> Result<Packet761> {
loop {
if let Some(packet) = self.read_next_packet().await? {
if packet.kind() == packet_kind {
return Ok(packet);
}
println!("Skipping packet {:?}", packet);
}
}
}
pub fn send_packet(&self, packet: Packet761) -> Result<()> {
// println!("Server -> Client: {:?}", packet);
self.send_tx.send(WriterAction::Send(packet))?;
Ok(())
}
pub fn send_all_packets(&self, packets: Vec<Packet761>) -> Result<()> {
for x in packets {
self.send_packet(x)?;
}
Ok(())
}
}
|