diff options
author | nea <nea@nea.moe> | 2023-03-27 21:54:37 +0200 |
---|---|---|
committer | nea <nea@nea.moe> | 2023-03-27 21:54:37 +0200 |
commit | b9da3e24eb07abcb33932480b8dbbcf69024c614 (patch) | |
tree | 8347175e88cb5333107460a6b9114146386252fb /src | |
parent | b1c4131231227a7780e1c1940b399983cdd7f552 (diff) | |
download | mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.tar.gz mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.tar.bz2 mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.zip |
Proper async server and ctrl-c handling
Diffstat (limited to 'src')
-rw-r--r-- | src/connect.rs | 29 | ||||
-rw-r--r-- | src/main.rs | 79 |
2 files changed, 76 insertions, 32 deletions
diff --git a/src/connect.rs b/src/connect.rs index 82590fb..1feb03e 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,9 +1,9 @@ -use std::net::TcpStream; - use anyhow::Result; -use craftio_rs::{CraftSyncReader, CraftSyncWriter, CraftTcpConnection}; +use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftTokioConnection}; use mcproto_rs::protocol::HasPacketKind; use mcproto_rs::v1_19_3::{Packet761, Packet761Kind, RawPacket761}; +use tokio::io::BufReader; +use tokio::net::TcpStream; #[macro_export] macro_rules! await_packet { @@ -36,24 +36,19 @@ macro_rules! assert_packet { } pub struct MinecraftClient { - pub connection: CraftTcpConnection, + pub connection: CraftTokioConnection, } impl MinecraftClient { - pub fn new(connection: CraftTcpConnection) -> Self { - Self { connection } - } - - pub fn from_stream(stream: TcpStream) -> Result<Self> { - Ok(Self { - connection: CraftTcpConnection::from_std( - stream, - mcproto_rs::protocol::PacketDirection::ServerBound, - )?, - }) + pub fn from_stream(stream: TcpStream) -> Self { + let (read, write) = stream.into_split(); + let bufread = BufReader::new(read); + MinecraftClient { + connection: CraftTokioConnection::from_async((bufread, write), mcproto_rs::protocol::PacketDirection::ServerBound), + } } pub async fn read_next_packet(&mut self) -> Result<Option<Packet761>> { - if let Some(raw) = self.connection.read_packet::<RawPacket761>()? { + if let Some(raw) = self.connection.read_packet_async::<RawPacket761>().await? { println!("Client -> Server: {:?}", raw); Ok(Some(raw)) } else { @@ -62,7 +57,7 @@ impl MinecraftClient { } pub async fn send_packet(&mut self, packet: Packet761) -> Result<()> { println!("Server -> Client: {:?}", packet); - self.connection.write_packet(packet)?; + self.connection.write_packet_async(packet).await?; Ok(()) } pub async fn wait_for_packet(&mut self, packet_kind: Packet761Kind) -> Result<Packet761> { diff --git a/src/main.rs b/src/main.rs index 088f27f..a113a30 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,40 +1,89 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::time::Duration; use anyhow::Result; -use craftio_rs::{CraftIo, CraftSyncReader, CraftSyncWriter, CraftTcpConnection}; +use craftio_rs::CraftIo; use mcproto_rs::nbt::Tag; -use mcproto_rs::protocol::{HasPacketKind, State}; +use mcproto_rs::protocol::State; use mcproto_rs::Serializer; use mcproto_rs::status::{StatusPlayersSpec, StatusSpec, StatusVersionSpec}; use mcproto_rs::types::{BytesSerializer, Chat, IntPosition, ItemStack, NamedNbtTag, RemainingBytes, Vec3}; use mcproto_rs::uuid::UUID4; use mcproto_rs::v1_19_3::{BitSet, BlobArray, BookSettings, ChunkDataAndUpdateLightSpec, ChunkDataSpec, ChunkSection, CommandNode, CommandNodeSpec, CommandsSpec, EntityEventSpec, InitializeWorldBorderSpec, KeepAliveClientBoundSpec, LightDataSpec, PalettedContainer, PluginMessageSpec, RecipeBookAction, RecipeBookInitSpec, SetCenterChunkSpec, SetContainerContentSpec, SetDefaultSpawnPositionSpec, SynchronizePlayerPositionSpec, TagType, TypedTagList, UpdateRecipeBookSpec}; -use mcproto_rs::v1_19_3::{GameMode, HandshakeNextState, LoginPlaySpec, LoginSuccessSpec, Packet761, Packet761Kind, PingRequestSpec, PingResponseSpec, PreviousGameMode, RawPacket761, SetHeldItemClientSpec, StatusResponseSpec, UpdateRecipesSpec, UpdateTagsSpec}; +use mcproto_rs::v1_19_3::{GameMode, HandshakeNextState, LoginPlaySpec, LoginSuccessSpec, Packet761, Packet761Kind, PingRequestSpec, PingResponseSpec, PreviousGameMode, SetHeldItemClientSpec, StatusResponseSpec, UpdateRecipesSpec, UpdateTagsSpec}; use tokio; -use crate::connect::MinecraftClient; +use tokio::net::TcpStream; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::{broadcast}; +use tokio::sync::broadcast::Receiver; +use tokio::task::JoinHandle; +use crate::connect::MinecraftClient; pub mod connect; pub mod nbtdsl; + #[tokio::main] async fn main() -> Result<()> { + let (shutdown_send, _recv) = broadcast::channel::<()>(1); + + let bind = tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 25565)).await?; + let server_rx = shutdown_send.subscribe(); + let server_handle = tokio::spawn(async { + listener(bind, server_rx).await; + () + }); + + println!("Awaiting shutdown request"); + signal(SignalKind::interrupt())?.recv().await; + println!("Shutdown request detected"); + shutdown_send.send(())?; + server_handle.await?; + + Ok(()) +} + +async fn listener(listener: tokio::net::TcpListener, mut receiver: Receiver<()>) { println!("Starting server"); - let bind = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 25565))?; + let mut handles: Vec<JoinHandle<()>> = Vec::new(); loop { - if let Ok((socket, address)) = bind.accept() { - println!("Connection accepted from {}", address); - let client = MinecraftClient::from_stream(socket)?; - tokio::spawn(async { - if let Err(x) = handle_conn(client).await { - println!("Error: {:?}", x); - } - }); - } + let x: Option<(TcpStream, SocketAddr)> = tokio::select! { + _ = receiver.recv() => { + break; + } + conn = listener.accept() => { conn.ok() } + }; + let (stream, from) = match x { + None => { continue } + Some(y) => { y } + }; + println!("Connection accepted from {}", from); + let client = MinecraftClient::from_stream(stream); + let handle = tokio::spawn(async { + if let Err(x) = handle_conn(client).await { + println!("Error: {:?}", x); + } + }); + handles.push(handle); } + println!("Shutdown started"); + for x in &handles { + x.abort(); + } + /* + TODO: proper kicks so that we can just join + for x in handles { + match x.await { + Ok(_) => {} + Err(_) => { + println!("Failed to join server during exit!") + } + }; + }*/ } + async fn handle_conn(mut client: MinecraftClient) -> Result<()> { let hs = assert_packet!(Handshake, client, "Missing packet"); if hs.next_state == HandshakeNextState::Login { |