diff options
| author | nea <nea@nea.moe> | 2023-03-27 21:54:37 +0200 | 
|---|---|---|
| committer | nea <nea@nea.moe> | 2023-03-27 21:54:37 +0200 | 
| commit | b9da3e24eb07abcb33932480b8dbbcf69024c614 (patch) | |
| tree | 8347175e88cb5333107460a6b9114146386252fb /src | |
| parent | b1c4131231227a7780e1c1940b399983cdd7f552 (diff) | |
| download | mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.tar.gz mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.tar.bz2 mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.zip | |
Proper async server and ctrl-c handling
Diffstat (limited to 'src')
| -rw-r--r-- | src/connect.rs | 29 | ||||
| -rw-r--r-- | src/main.rs | 79 | 
2 files changed, 76 insertions, 32 deletions
| diff --git a/src/connect.rs b/src/connect.rs index 82590fb..1feb03e 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,9 +1,9 @@ -use std::net::TcpStream; -  use anyhow::Result; -use craftio_rs::{CraftSyncReader, CraftSyncWriter, CraftTcpConnection}; +use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftTokioConnection};  use mcproto_rs::protocol::HasPacketKind;  use mcproto_rs::v1_19_3::{Packet761, Packet761Kind, RawPacket761}; +use tokio::io::BufReader; +use tokio::net::TcpStream;  #[macro_export]  macro_rules! await_packet { @@ -36,24 +36,19 @@ macro_rules! assert_packet {  }  pub struct MinecraftClient { -    pub connection: CraftTcpConnection, +    pub connection: CraftTokioConnection,  }  impl MinecraftClient { -    pub fn new(connection: CraftTcpConnection) -> Self { -        Self { connection } -    } - -    pub fn from_stream(stream: TcpStream) -> Result<Self> { -        Ok(Self { -            connection: CraftTcpConnection::from_std( -                stream, -                mcproto_rs::protocol::PacketDirection::ServerBound, -            )?, -        }) +    pub fn from_stream(stream: TcpStream) -> Self { +        let (read, write) = stream.into_split(); +        let bufread = BufReader::new(read); +        MinecraftClient { +            connection: CraftTokioConnection::from_async((bufread, write), mcproto_rs::protocol::PacketDirection::ServerBound), +        }      }      pub async fn read_next_packet(&mut self) -> Result<Option<Packet761>> { -        if let Some(raw) = self.connection.read_packet::<RawPacket761>()? { +        if let Some(raw) = self.connection.read_packet_async::<RawPacket761>().await? {              println!("Client -> Server: {:?}", raw);              Ok(Some(raw))          } else { @@ -62,7 +57,7 @@ impl MinecraftClient {      }      pub async fn send_packet(&mut self, packet: Packet761) -> Result<()> {          println!("Server -> Client: {:?}", packet); -        self.connection.write_packet(packet)?; +        self.connection.write_packet_async(packet).await?;          Ok(())      }      pub async fn wait_for_packet(&mut self, packet_kind: Packet761Kind) -> Result<Packet761> { diff --git a/src/main.rs b/src/main.rs index 088f27f..a113a30 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,40 +1,89 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr};  use std::time::Duration;  use anyhow::Result; -use craftio_rs::{CraftIo, CraftSyncReader, CraftSyncWriter, CraftTcpConnection}; +use craftio_rs::CraftIo;  use mcproto_rs::nbt::Tag; -use mcproto_rs::protocol::{HasPacketKind, State}; +use mcproto_rs::protocol::State;  use mcproto_rs::Serializer;  use mcproto_rs::status::{StatusPlayersSpec, StatusSpec, StatusVersionSpec};  use mcproto_rs::types::{BytesSerializer, Chat, IntPosition, ItemStack, NamedNbtTag, RemainingBytes, Vec3};  use mcproto_rs::uuid::UUID4;  use mcproto_rs::v1_19_3::{BitSet, BlobArray, BookSettings, ChunkDataAndUpdateLightSpec, ChunkDataSpec, ChunkSection, CommandNode, CommandNodeSpec, CommandsSpec, EntityEventSpec, InitializeWorldBorderSpec, KeepAliveClientBoundSpec, LightDataSpec, PalettedContainer, PluginMessageSpec, RecipeBookAction, RecipeBookInitSpec, SetCenterChunkSpec, SetContainerContentSpec, SetDefaultSpawnPositionSpec, SynchronizePlayerPositionSpec, TagType, TypedTagList, UpdateRecipeBookSpec}; -use mcproto_rs::v1_19_3::{GameMode, HandshakeNextState, LoginPlaySpec, LoginSuccessSpec, Packet761, Packet761Kind, PingRequestSpec, PingResponseSpec, PreviousGameMode, RawPacket761, SetHeldItemClientSpec, StatusResponseSpec, UpdateRecipesSpec, UpdateTagsSpec}; +use mcproto_rs::v1_19_3::{GameMode, HandshakeNextState, LoginPlaySpec, LoginSuccessSpec, Packet761, Packet761Kind, PingRequestSpec, PingResponseSpec, PreviousGameMode, SetHeldItemClientSpec, StatusResponseSpec, UpdateRecipesSpec, UpdateTagsSpec};  use tokio; -use crate::connect::MinecraftClient; +use tokio::net::TcpStream; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::{broadcast}; +use tokio::sync::broadcast::Receiver; +use tokio::task::JoinHandle; +use crate::connect::MinecraftClient;  pub mod connect;  pub mod nbtdsl; +  #[tokio::main]  async fn main() -> Result<()> { +    let (shutdown_send, _recv) = broadcast::channel::<()>(1); + +    let bind = tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 25565)).await?; +    let server_rx = shutdown_send.subscribe(); +    let server_handle = tokio::spawn(async { +        listener(bind, server_rx).await; +        () +    }); + +    println!("Awaiting shutdown request"); +    signal(SignalKind::interrupt())?.recv().await; +    println!("Shutdown request detected"); +    shutdown_send.send(())?; +    server_handle.await?; + +    Ok(()) +} + +async fn listener(listener: tokio::net::TcpListener, mut receiver: Receiver<()>) {      println!("Starting server"); -    let bind = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 25565))?; +    let mut handles: Vec<JoinHandle<()>> = Vec::new();      loop { -        if let Ok((socket, address)) = bind.accept() { -            println!("Connection accepted from {}", address); -            let client = MinecraftClient::from_stream(socket)?; -            tokio::spawn(async { -                if let Err(x) = handle_conn(client).await { -                    println!("Error: {:?}", x); -                } -            }); -        } +        let x: Option<(TcpStream, SocketAddr)> = tokio::select! { +            _ = receiver.recv() => { +                break; +            } +            conn = listener.accept() => { conn.ok() } +        }; +        let (stream, from) = match x { +            None => { continue } +            Some(y) => { y } +        }; +        println!("Connection accepted from {}", from); +        let client = MinecraftClient::from_stream(stream); +        let handle = tokio::spawn(async { +            if let Err(x) = handle_conn(client).await { +                println!("Error: {:?}", x); +            } +        }); +        handles.push(handle);      } +    println!("Shutdown started"); +    for x in &handles { +        x.abort(); +    } +    /* +    TODO: proper kicks so that we can just join +    for x in handles { +        match x.await { +            Ok(_) => {} +            Err(_) => { +                println!("Failed to join server during exit!") +            } +        }; +    }*/  } +  async fn handle_conn(mut client: MinecraftClient) -> Result<()> {      let hs = assert_packet!(Handshake, client, "Missing packet");      if hs.next_state == HandshakeNextState::Login { | 
