summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authornea <nea@nea.moe>2023-03-27 21:54:37 +0200
committernea <nea@nea.moe>2023-03-27 21:54:37 +0200
commitb9da3e24eb07abcb33932480b8dbbcf69024c614 (patch)
tree8347175e88cb5333107460a6b9114146386252fb /src
parentb1c4131231227a7780e1c1940b399983cdd7f552 (diff)
downloadmgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.tar.gz
mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.tar.bz2
mgasi-b9da3e24eb07abcb33932480b8dbbcf69024c614.zip
Proper async server and ctrl-c handling
Diffstat (limited to 'src')
-rw-r--r--src/connect.rs29
-rw-r--r--src/main.rs79
2 files changed, 76 insertions, 32 deletions
diff --git a/src/connect.rs b/src/connect.rs
index 82590fb..1feb03e 100644
--- a/src/connect.rs
+++ b/src/connect.rs
@@ -1,9 +1,9 @@
-use std::net::TcpStream;
-
use anyhow::Result;
-use craftio_rs::{CraftSyncReader, CraftSyncWriter, CraftTcpConnection};
+use craftio_rs::{CraftAsyncReader, CraftAsyncWriter, CraftTokioConnection};
use mcproto_rs::protocol::HasPacketKind;
use mcproto_rs::v1_19_3::{Packet761, Packet761Kind, RawPacket761};
+use tokio::io::BufReader;
+use tokio::net::TcpStream;
#[macro_export]
macro_rules! await_packet {
@@ -36,24 +36,19 @@ macro_rules! assert_packet {
}
pub struct MinecraftClient {
- pub connection: CraftTcpConnection,
+ pub connection: CraftTokioConnection,
}
impl MinecraftClient {
- pub fn new(connection: CraftTcpConnection) -> Self {
- Self { connection }
- }
-
- pub fn from_stream(stream: TcpStream) -> Result<Self> {
- Ok(Self {
- connection: CraftTcpConnection::from_std(
- stream,
- mcproto_rs::protocol::PacketDirection::ServerBound,
- )?,
- })
+ pub fn from_stream(stream: TcpStream) -> Self {
+ let (read, write) = stream.into_split();
+ let bufread = BufReader::new(read);
+ MinecraftClient {
+ connection: CraftTokioConnection::from_async((bufread, write), mcproto_rs::protocol::PacketDirection::ServerBound),
+ }
}
pub async fn read_next_packet(&mut self) -> Result<Option<Packet761>> {
- if let Some(raw) = self.connection.read_packet::<RawPacket761>()? {
+ if let Some(raw) = self.connection.read_packet_async::<RawPacket761>().await? {
println!("Client -> Server: {:?}", raw);
Ok(Some(raw))
} else {
@@ -62,7 +57,7 @@ impl MinecraftClient {
}
pub async fn send_packet(&mut self, packet: Packet761) -> Result<()> {
println!("Server -> Client: {:?}", packet);
- self.connection.write_packet(packet)?;
+ self.connection.write_packet_async(packet).await?;
Ok(())
}
pub async fn wait_for_packet(&mut self, packet_kind: Packet761Kind) -> Result<Packet761> {
diff --git a/src/main.rs b/src/main.rs
index 088f27f..a113a30 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,40 +1,89 @@
-use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use anyhow::Result;
-use craftio_rs::{CraftIo, CraftSyncReader, CraftSyncWriter, CraftTcpConnection};
+use craftio_rs::CraftIo;
use mcproto_rs::nbt::Tag;
-use mcproto_rs::protocol::{HasPacketKind, State};
+use mcproto_rs::protocol::State;
use mcproto_rs::Serializer;
use mcproto_rs::status::{StatusPlayersSpec, StatusSpec, StatusVersionSpec};
use mcproto_rs::types::{BytesSerializer, Chat, IntPosition, ItemStack, NamedNbtTag, RemainingBytes, Vec3};
use mcproto_rs::uuid::UUID4;
use mcproto_rs::v1_19_3::{BitSet, BlobArray, BookSettings, ChunkDataAndUpdateLightSpec, ChunkDataSpec, ChunkSection, CommandNode, CommandNodeSpec, CommandsSpec, EntityEventSpec, InitializeWorldBorderSpec, KeepAliveClientBoundSpec, LightDataSpec, PalettedContainer, PluginMessageSpec, RecipeBookAction, RecipeBookInitSpec, SetCenterChunkSpec, SetContainerContentSpec, SetDefaultSpawnPositionSpec, SynchronizePlayerPositionSpec, TagType, TypedTagList, UpdateRecipeBookSpec};
-use mcproto_rs::v1_19_3::{GameMode, HandshakeNextState, LoginPlaySpec, LoginSuccessSpec, Packet761, Packet761Kind, PingRequestSpec, PingResponseSpec, PreviousGameMode, RawPacket761, SetHeldItemClientSpec, StatusResponseSpec, UpdateRecipesSpec, UpdateTagsSpec};
+use mcproto_rs::v1_19_3::{GameMode, HandshakeNextState, LoginPlaySpec, LoginSuccessSpec, Packet761, Packet761Kind, PingRequestSpec, PingResponseSpec, PreviousGameMode, SetHeldItemClientSpec, StatusResponseSpec, UpdateRecipesSpec, UpdateTagsSpec};
use tokio;
-use crate::connect::MinecraftClient;
+use tokio::net::TcpStream;
+use tokio::signal::unix::{signal, SignalKind};
+use tokio::sync::{broadcast};
+use tokio::sync::broadcast::Receiver;
+use tokio::task::JoinHandle;
+use crate::connect::MinecraftClient;
pub mod connect;
pub mod nbtdsl;
+
#[tokio::main]
async fn main() -> Result<()> {
+ let (shutdown_send, _recv) = broadcast::channel::<()>(1);
+
+ let bind = tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 25565)).await?;
+ let server_rx = shutdown_send.subscribe();
+ let server_handle = tokio::spawn(async {
+ listener(bind, server_rx).await;
+ ()
+ });
+
+ println!("Awaiting shutdown request");
+ signal(SignalKind::interrupt())?.recv().await;
+ println!("Shutdown request detected");
+ shutdown_send.send(())?;
+ server_handle.await?;
+
+ Ok(())
+}
+
+async fn listener(listener: tokio::net::TcpListener, mut receiver: Receiver<()>) {
println!("Starting server");
- let bind = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 25565))?;
+ let mut handles: Vec<JoinHandle<()>> = Vec::new();
loop {
- if let Ok((socket, address)) = bind.accept() {
- println!("Connection accepted from {}", address);
- let client = MinecraftClient::from_stream(socket)?;
- tokio::spawn(async {
- if let Err(x) = handle_conn(client).await {
- println!("Error: {:?}", x);
- }
- });
- }
+ let x: Option<(TcpStream, SocketAddr)> = tokio::select! {
+ _ = receiver.recv() => {
+ break;
+ }
+ conn = listener.accept() => { conn.ok() }
+ };
+ let (stream, from) = match x {
+ None => { continue }
+ Some(y) => { y }
+ };
+ println!("Connection accepted from {}", from);
+ let client = MinecraftClient::from_stream(stream);
+ let handle = tokio::spawn(async {
+ if let Err(x) = handle_conn(client).await {
+ println!("Error: {:?}", x);
+ }
+ });
+ handles.push(handle);
}
+ println!("Shutdown started");
+ for x in &handles {
+ x.abort();
+ }
+ /*
+ TODO: proper kicks so that we can just join
+ for x in handles {
+ match x.await {
+ Ok(_) => {}
+ Err(_) => {
+ println!("Failed to join server during exit!")
+ }
+ };
+ }*/
}
+
async fn handle_conn(mut client: MinecraftClient) -> Result<()> {
let hs = assert_packet!(Handshake, client, "Missing packet");
if hs.next_state == HandshakeNextState::Login {