From 5b5ec9b6b4617fa5a5d955eca523f12c22769f18 Mon Sep 17 00:00:00 2001 From: KingRainbow44 Date: Sat, 9 Sep 2023 15:13:43 -0400 Subject: [PATCH] Organize & sort the KCP system --- .../java/emu/grasscutter/net/KcpChannel.java | 22 ++++ .../java/emu/grasscutter/net/KcpTunnel.java | 22 ++++ .../grasscutter/server/game/GameServer.java | 45 ++++--- .../grasscutter/server/game/GameSession.java | 21 ++-- .../server/game/GameSessionManager.java | 114 ------------------ .../game/session/GameSessionHandler.java | 31 +++++ .../game/session/GameSessionManager.java | 91 ++++++++++++++ 7 files changed, 199 insertions(+), 147 deletions(-) create mode 100644 src/main/java/emu/grasscutter/net/KcpChannel.java create mode 100644 src/main/java/emu/grasscutter/net/KcpTunnel.java delete mode 100644 src/main/java/emu/grasscutter/server/game/GameSessionManager.java create mode 100644 src/main/java/emu/grasscutter/server/game/session/GameSessionHandler.java create mode 100644 src/main/java/emu/grasscutter/server/game/session/GameSessionManager.java diff --git a/src/main/java/emu/grasscutter/net/KcpChannel.java b/src/main/java/emu/grasscutter/net/KcpChannel.java new file mode 100644 index 000000000..b85fc0dce --- /dev/null +++ b/src/main/java/emu/grasscutter/net/KcpChannel.java @@ -0,0 +1,22 @@ +package emu.grasscutter.net; + +public interface KcpChannel { + /** + * Event fired when the client connects. + * + * @param tunnel The tunnel. + */ + void onConnected(KcpTunnel tunnel); + + /** + * Event fired when the client disconnects. + */ + void onDisconnected(); + + /** + * Event fired when data is received from the client. + * + * @param bytes The data received. + */ + void onMessage(byte[] bytes); +} diff --git a/src/main/java/emu/grasscutter/net/KcpTunnel.java b/src/main/java/emu/grasscutter/net/KcpTunnel.java new file mode 100644 index 000000000..1f1c730e3 --- /dev/null +++ b/src/main/java/emu/grasscutter/net/KcpTunnel.java @@ -0,0 +1,22 @@ +package emu.grasscutter.net; + +import java.net.InetSocketAddress; + +public interface KcpTunnel { + /** + * @return The address of the client. + */ + InetSocketAddress getAddress(); + + /** + * Sends bytes to the client. + * + * @param bytes The bytes to send. + */ + void writeData(byte[] bytes); + + /** + * Closes the connection. + */ + void close(); +} diff --git a/src/main/java/emu/grasscutter/server/game/GameServer.java b/src/main/java/emu/grasscutter/server/game/GameServer.java index 4baf00b51..2be2b2300 100644 --- a/src/main/java/emu/grasscutter/server/game/GameServer.java +++ b/src/main/java/emu/grasscutter/server/game/GameServer.java @@ -1,55 +1,49 @@ package emu.grasscutter.server.game; -import static emu.grasscutter.config.Configuration.*; -import static emu.grasscutter.utils.lang.Language.translate; - import emu.grasscutter.*; import emu.grasscutter.Grasscutter.ServerRunMode; import emu.grasscutter.database.DatabaseHelper; import emu.grasscutter.game.Account; import emu.grasscutter.game.battlepass.BattlePassSystem; -import emu.grasscutter.game.chat.ChatSystem; -import emu.grasscutter.game.chat.ChatSystemHandler; +import emu.grasscutter.game.chat.*; import emu.grasscutter.game.combine.CombineManger; -import emu.grasscutter.game.drop.DropSystem; -import emu.grasscutter.game.drop.DropSystemLegacy; +import emu.grasscutter.game.drop.*; import emu.grasscutter.game.dungeons.DungeonSystem; import emu.grasscutter.game.expedition.ExpeditionSystem; import emu.grasscutter.game.gacha.GachaSystem; -import emu.grasscutter.game.home.HomeWorld; -import emu.grasscutter.game.home.HomeWorldMPSystem; -import emu.grasscutter.game.managers.cooking.CookingCompoundManager; -import emu.grasscutter.game.managers.cooking.CookingManager; +import emu.grasscutter.game.home.*; +import emu.grasscutter.game.managers.cooking.*; import emu.grasscutter.game.managers.energy.EnergyManager; import emu.grasscutter.game.managers.stamina.StaminaManager; import emu.grasscutter.game.player.Player; import emu.grasscutter.game.quest.QuestSystem; import emu.grasscutter.game.shop.ShopSystem; -import emu.grasscutter.game.systems.AnnouncementSystem; -import emu.grasscutter.game.systems.InventorySystem; -import emu.grasscutter.game.systems.MultiplayerSystem; +import emu.grasscutter.game.systems.*; import emu.grasscutter.game.talk.TalkSystem; import emu.grasscutter.game.tower.TowerSystem; -import emu.grasscutter.game.world.World; -import emu.grasscutter.game.world.WorldDataSystem; +import emu.grasscutter.game.world.*; import emu.grasscutter.net.packet.PacketHandler; import emu.grasscutter.net.proto.SocialDetailOuterClass.SocialDetail; import emu.grasscutter.server.dispatch.DispatchClient; import emu.grasscutter.server.event.game.ServerTickEvent; -import emu.grasscutter.server.event.internal.ServerStartEvent; -import emu.grasscutter.server.event.internal.ServerStopEvent; +import emu.grasscutter.server.event.internal.*; import emu.grasscutter.server.event.types.ServerEvent; import emu.grasscutter.server.scheduler.ServerTaskScheduler; import emu.grasscutter.task.TaskMap; import emu.grasscutter.utils.Utils; import it.unimi.dsi.fastutil.ints.*; +import kcp.highway.*; +import lombok.*; +import org.jetbrains.annotations.*; +import emu.grasscutter.server.game.session.GameSessionManager; + import java.net.*; import java.time.*; import java.util.*; import java.util.concurrent.*; -import kcp.highway.*; -import lombok.*; -import org.jetbrains.annotations.*; + +import static emu.grasscutter.config.Configuration.*; +import static emu.grasscutter.utils.lang.Language.translate; @Getter public final class GameServer extends KcpServer implements Iterable { @@ -60,6 +54,7 @@ public final class GameServer extends KcpServer implements Iterable { private final Set worlds; private final Int2ObjectMap homeWorlds; + @Getter private boolean started = false; @Setter private DispatchClient dispatchClient; // Server systems @@ -140,7 +135,7 @@ public final class GameServer extends KcpServer implements Iterable { channelConfig.setUseConvChannel(true); channelConfig.setAckNoDelay(false); - this.init(GameSessionManager.getListener(), channelConfig, address); + this.init(GameSessionManager.getInstance(), channelConfig, address); EnergyManager.initialize(); StaminaManager.initialize(); @@ -350,6 +345,8 @@ public final class GameServer extends KcpServer implements Iterable { .info(translate("messages.game.address_bind", GAME_INFO.accessAddress, address.getPort())); ServerStartEvent event = new ServerStartEvent(ServerEvent.Type.GAME, OffsetDateTime.now()); event.call(); + + this.started = true; } public void onServerShutdown() { @@ -364,10 +361,10 @@ public final class GameServer extends KcpServer implements Iterable { this.stop(); // Stop the server. try { - var threadPool = GameSessionManager.getLogicThread(); + var threadPool = GameSessionManager.getExecutor(); // Shutdown network thread. - threadPool.shutdownGracefully(); + threadPool.shutdown(); // Wait for the network thread to finish. if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) { Grasscutter.getLogger().error("Logic thread did not terminate!"); diff --git a/src/main/java/emu/grasscutter/server/game/GameSession.java b/src/main/java/emu/grasscutter/server/game/GameSession.java index 0cff31cd1..a7219584e 100644 --- a/src/main/java/emu/grasscutter/server/game/GameSession.java +++ b/src/main/java/emu/grasscutter/server/game/GameSession.java @@ -1,24 +1,26 @@ package emu.grasscutter.server.game; -import static emu.grasscutter.config.Configuration.*; -import static emu.grasscutter.utils.lang.Language.translate; - import emu.grasscutter.Grasscutter; import emu.grasscutter.Grasscutter.ServerDebugMode; import emu.grasscutter.game.Account; import emu.grasscutter.game.player.Player; +import emu.grasscutter.net.*; import emu.grasscutter.net.packet.*; import emu.grasscutter.server.event.game.SendPacketEvent; import emu.grasscutter.utils.*; import io.netty.buffer.*; +import lombok.*; + import java.io.File; import java.net.InetSocketAddress; import java.nio.file.Path; -import lombok.*; -public class GameSession implements GameSessionManager.KcpChannel { +import static emu.grasscutter.config.Configuration.*; +import static emu.grasscutter.utils.lang.Language.translate; + +public class GameSession implements KcpChannel { private final GameServer server; - private GameSessionManager.KcpTunnel tunnel; + private KcpTunnel tunnel; @Getter @Setter private Account account; @Getter private Player player; @@ -141,13 +143,13 @@ public class GameSession implements GameSessionManager.KcpChannel { } @Override - public void onConnected(GameSessionManager.KcpTunnel tunnel) { + public void onConnected(KcpTunnel tunnel) { this.tunnel = tunnel; Grasscutter.getLogger().info(translate("messages.game.connect", this.getAddress().toString())); } @Override - public void handleReceive(byte[] bytes) { + public void onMessage(byte[] bytes) { // Decrypt and turn back into a packet Crypto.xor(bytes, useSecretKey() ? Crypto.ENCRYPT_KEY : Crypto.DISPATCH_KEY); ByteBuf packet = Unpooled.wrappedBuffer(bytes); @@ -222,8 +224,9 @@ public class GameSession implements GameSessionManager.KcpChannel { } @Override - public void handleClose() { + public void onDisconnected() { setState(SessionState.INACTIVE); + // send disconnection pack in case of reconnection Grasscutter.getLogger() .info(translate("messages.game.disconnect", this.getAddress().toString())); diff --git a/src/main/java/emu/grasscutter/server/game/GameSessionManager.java b/src/main/java/emu/grasscutter/server/game/GameSessionManager.java deleted file mode 100644 index 5c6ef7770..000000000 --- a/src/main/java/emu/grasscutter/server/game/GameSessionManager.java +++ /dev/null @@ -1,114 +0,0 @@ -package emu.grasscutter.server.game; - -import emu.grasscutter.Grasscutter; -import emu.grasscutter.utils.Utils; -import io.netty.buffer.*; -import io.netty.channel.DefaultEventLoop; -import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import kcp.highway.*; -import lombok.Getter; - -public class GameSessionManager { - @Getter private static final DefaultEventLoop logicThread = new DefaultEventLoop(); - private static final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); - private static final KcpListener listener = - new KcpListener() { - @Override - public void onConnected(Ukcp ukcp) { - int times = 0; - GameServer server = Grasscutter.getGameServer(); - while (server == null) { // Waiting server to establish - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - ukcp.close(); - return; - } - if (times++ > 5) { - Grasscutter.getLogger().error("Service is not available!"); - ukcp.close(); - return; - } - server = Grasscutter.getGameServer(); - } - GameSession conversation = new GameSession(server); - conversation.onConnected( - new KcpTunnel() { - @Override - public InetSocketAddress getAddress() { - return ukcp.user().getRemoteAddress(); - } - - @Override - public void writeData(byte[] bytes) { - ByteBuf buf = Unpooled.wrappedBuffer(bytes); - ukcp.write(buf); - buf.release(); - } - - @Override - public void close() { - ukcp.close(); - } - - @Override - public int getSrtt() { - return ukcp.srtt(); - } - }); - sessions.put(ukcp, conversation); - } - - @Override - public void handleReceive(ByteBuf buf, Ukcp kcp) { - var byteData = Utils.byteBufToArray(buf); - logicThread.execute( - () -> { - try { - var conversation = sessions.get(kcp); - if (conversation != null) { - conversation.handleReceive(byteData); - } - } catch (Exception e) { - e.printStackTrace(); - } - }); - } - - @Override - public void handleException(Throwable ex, Ukcp ukcp) {} - - @Override - public void handleClose(Ukcp ukcp) { - GameSession conversation = sessions.get(ukcp); - if (conversation != null) { - conversation.handleClose(); - sessions.remove(ukcp); - } - } - }; - - public static KcpListener getListener() { - return listener; - } - - public interface KcpTunnel { - InetSocketAddress getAddress(); - - void writeData(byte[] bytes); - - void close(); - - int getSrtt(); - } - - interface KcpChannel { - void onConnected(KcpTunnel tunnel); - - void handleClose(); - - void handleReceive(byte[] bytes); - } -} diff --git a/src/main/java/emu/grasscutter/server/game/session/GameSessionHandler.java b/src/main/java/emu/grasscutter/server/game/session/GameSessionHandler.java new file mode 100644 index 000000000..fc85e253f --- /dev/null +++ b/src/main/java/emu/grasscutter/server/game/session/GameSessionHandler.java @@ -0,0 +1,31 @@ +package emu.grasscutter.server.game.session; + +import emu.grasscutter.net.KcpTunnel; +import io.netty.buffer.Unpooled; +import kcp.highway.Ukcp; +import lombok.*; + +import java.net.InetSocketAddress; + +@RequiredArgsConstructor +public final class GameSessionHandler implements KcpTunnel { + @Getter private final Ukcp handle; + + @Override + public InetSocketAddress getAddress() { + return this.getHandle().user().getRemoteAddress(); + } + + @Override + public void writeData(byte[] bytes) { + var buffer = Unpooled.wrappedBuffer(bytes); + this.getHandle().write(buffer); + + buffer.release(); + } + + @Override + public void close() { + this.getHandle().close(); + } +} diff --git a/src/main/java/emu/grasscutter/server/game/session/GameSessionManager.java b/src/main/java/emu/grasscutter/server/game/session/GameSessionManager.java new file mode 100644 index 000000000..29931e12a --- /dev/null +++ b/src/main/java/emu/grasscutter/server/game/session/GameSessionManager.java @@ -0,0 +1,91 @@ +package emu.grasscutter.server.game.session; + +import emu.grasscutter.Grasscutter; +import emu.grasscutter.server.game.*; +import emu.grasscutter.utils.Utils; +import io.netty.buffer.ByteBuf; +import kcp.highway.*; +import lombok.Getter; + +import java.util.Map; +import java.util.concurrent.*; + +public final class GameSessionManager implements KcpListener { + @Getter private static final GameSessionManager instance + = new GameSessionManager(); + @Getter private static final ExecutorService executor + = Executors.newFixedThreadPool(4); + @Getter private static final Map sessions + = new ConcurrentHashMap<>(); + + /** + * Waits for the game server to be ready. + * + * @return The game server. + */ + private GameServer waitForServer() { + var server = Grasscutter.getGameServer(); + var times = 0; while (server == null || !server.isStarted()) { + Utils.sleep(1000); // Wait 1s for the server to start. + if (times++ > 5) { + Grasscutter.getLogger().error("Game server has not started in a reasonable time."); + return null; + } + + server = Grasscutter.getGameServer(); + } + + return server; + } + + @Override + public void onConnected(Ukcp ukcp) { + // Fetch the game server. + var server = this.waitForServer(); + if (server == null) { + ukcp.close(); + return; + } + + // Create a new session. + var session = sessions.compute(ukcp, (k, existing) -> { + // Close an existing session. + if (existing != null) { + existing.close(); + } + + return new GameSession(server); + }); + + // Connect the session. + session.onConnected(new GameSessionHandler(ukcp)); + } + + @Override + public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) { + // Get the session. + var session = sessions.get(ukcp); + if (session == null) { + ukcp.close(); return; + } + + // Handle the message in a separate thread. + executor.submit(() -> { + var bytes = Utils.byteBufToArray(byteBuf); + session.onMessage(bytes); + }); + } + + @Override + public void handleException(Throwable throwable, Ukcp ukcp) { + Grasscutter.getLogger().error("Exception in game session.", throwable); + } + + @Override + public void handleClose(Ukcp ukcp) { + var session = sessions.remove(ukcp); + if (session != null) { + session.close(); + } + } +}