From d0e37207482a6a065103a535fdfcb57f45ad1b5b Mon Sep 17 00:00:00 2001 From: KingRainbow44 Date: Sat, 6 Jul 2024 21:58:02 -0400 Subject: [PATCH] feat(networking): Abstract game session networking includes: - abstracted form of session handling - existing implementation using new abstracted system - general clean-up of GameSession.java --- .../grasscutter/config/ConfigContainer.java | 6 +- .../java/emu/grasscutter/net/IKcpSession.java | 32 +++++ .../grasscutter/net/INetworkTransport.java | 39 ++++++ .../grasscutter/net/impl/KcpSessionImpl.java | 49 ++++++++ .../net/impl/NetworkTransportImpl.java | 112 +++++++++++++++++ .../grasscutter/server/game/GameServer.java | 53 ++++---- .../grasscutter/server/game/GameSession.java | 94 +++++---------- .../server/game/GameSessionManager.java | 114 ------------------ .../grasscutter/server/game/IGameSession.java | 22 ++++ 9 files changed, 316 insertions(+), 205 deletions(-) create mode 100644 src/main/java/emu/grasscutter/net/IKcpSession.java create mode 100644 src/main/java/emu/grasscutter/net/INetworkTransport.java create mode 100644 src/main/java/emu/grasscutter/net/impl/KcpSessionImpl.java create mode 100644 src/main/java/emu/grasscutter/net/impl/NetworkTransportImpl.java delete mode 100644 src/main/java/emu/grasscutter/server/game/GameSessionManager.java create mode 100644 src/main/java/emu/grasscutter/server/game/IGameSession.java diff --git a/src/main/java/emu/grasscutter/config/ConfigContainer.java b/src/main/java/emu/grasscutter/config/ConfigContainer.java index 2b6d20978..f23429931 100644 --- a/src/main/java/emu/grasscutter/config/ConfigContainer.java +++ b/src/main/java/emu/grasscutter/config/ConfigContainer.java @@ -35,9 +35,10 @@ public class ConfigContainer { * HTTP server should start immediately. * Version 13 - 'game.useUniquePacketKey' was added to control whether the * encryption key used for packets is a constant or randomly generated. + * Version 14 - 'game.timeout' was added to control the UDP client timeout. */ private static int version() { - return 13; + return 14; } /** @@ -183,6 +184,9 @@ public class ConfigContainer { /* Kcp internal work interval (milliseconds) */ public int kcpInterval = 20; + /* Time to wait (in seconds) before terminating a connection. */ + public long timeout = 30; + /* Controls whether packets should be logged in console or not */ public ServerDebugMode logPackets = ServerDebugMode.NONE; /* Show packet payload in console or no (in any case the payload is shown in encrypted view) */ diff --git a/src/main/java/emu/grasscutter/net/IKcpSession.java b/src/main/java/emu/grasscutter/net/IKcpSession.java new file mode 100644 index 000000000..f30c0fbbc --- /dev/null +++ b/src/main/java/emu/grasscutter/net/IKcpSession.java @@ -0,0 +1,32 @@ +package emu.grasscutter.net; + +import org.slf4j.Logger; + +import java.net.InetSocketAddress; + +/** + * This is most closely related to the previous `KcpTunnel` interface. + */ +public interface IKcpSession { + /** + * @return The session's unique logger. + */ + Logger getLogger(); + + /** + * @return The connecting client's address. + */ + InetSocketAddress getAddress(); + + /** + * Closes the server's connection to the client. + */ + void close(); + + /** + * Sends raw data to the client. + * + * @param data The data to send. This should not be KCP-encoded. + */ + void send(byte[] data); +} diff --git a/src/main/java/emu/grasscutter/net/INetworkTransport.java b/src/main/java/emu/grasscutter/net/INetworkTransport.java new file mode 100644 index 000000000..12495b325 --- /dev/null +++ b/src/main/java/emu/grasscutter/net/INetworkTransport.java @@ -0,0 +1,39 @@ +package emu.grasscutter.net; + +import emu.grasscutter.Grasscutter; +import emu.grasscutter.server.game.GameServer; + +import java.net.InetSocketAddress; + +public interface INetworkTransport { + /** + * Waits for the server to be active. + * This should be used to ensure that the server is ready to accept connections. + */ + default GameServer waitForServer() throws InterruptedException { + int depth = 0; + + GameServer server; + while ((server = Grasscutter.getGameServer()) == null) { + Thread.sleep(1000); + if (depth++ > 5) { + throw new IllegalStateException("Game server is not available!"); + } + } + + return server; + } + + /** + * This is invoked when the transport should start listening for incoming connections. + * + * @param listening The address/port to listen on. + */ + void start(InetSocketAddress listening); + + /** + * This is invoked when the transport should stop listening for incoming connections. + * This should also close all active connections. + */ + void shutdown(); +} diff --git a/src/main/java/emu/grasscutter/net/impl/KcpSessionImpl.java b/src/main/java/emu/grasscutter/net/impl/KcpSessionImpl.java new file mode 100644 index 000000000..e980a5d97 --- /dev/null +++ b/src/main/java/emu/grasscutter/net/impl/KcpSessionImpl.java @@ -0,0 +1,49 @@ +package emu.grasscutter.net.impl; + +import emu.grasscutter.net.IKcpSession; +import emu.grasscutter.net.INetworkTransport; +import io.netty.buffer.Unpooled; +import kcp.highway.Ukcp; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * This is the default implementation of a KCP session. + * It uses {@link Ukcp} as the underlying wrapper. + */ +@Getter +public class KcpSessionImpl implements IKcpSession { + private final Ukcp handle; + private final Logger logger; + + public KcpSessionImpl(Ukcp handle) { + this.handle = handle; + this.logger = LoggerFactory.getLogger("KcpSession " + handle.getConv()); + } + + @Override + public InetSocketAddress getAddress() { + return this.getHandle().user().getRemoteAddress(); + } + + @Override + public void close() { + this.getHandle().close(true); + } + + @Override + public void send(byte[] data) { + var buffer = Unpooled.wrappedBuffer(data); + try { + this.getHandle().write(buffer); + } catch (Exception ex) { + this.getLogger().warn("Unable to send packet.", ex); + } finally { + buffer.release(); + } + } +} diff --git a/src/main/java/emu/grasscutter/net/impl/NetworkTransportImpl.java b/src/main/java/emu/grasscutter/net/impl/NetworkTransportImpl.java new file mode 100644 index 000000000..7d2837147 --- /dev/null +++ b/src/main/java/emu/grasscutter/net/impl/NetworkTransportImpl.java @@ -0,0 +1,112 @@ +package emu.grasscutter.net.impl; + +import emu.grasscutter.net.INetworkTransport; +import emu.grasscutter.server.game.GameSession; +import io.netty.buffer.ByteBuf; +import io.netty.channel.DefaultEventLoop; +import io.netty.channel.EventLoop; +import kcp.highway.ChannelConfig; +import kcp.highway.KcpListener; +import kcp.highway.KcpServer; +import kcp.highway.Ukcp; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static emu.grasscutter.config.Configuration.GAME_INFO; + +/** + * The default implementation of a {@link INetworkTransport}. + * Uses {@link KcpServer} as the underlying transport. + */ +@Slf4j +public class NetworkTransportImpl extends KcpServer implements INetworkTransport { + private final EventLoop networkLoop = new DefaultEventLoop(); + private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); + + @Override + public void start(InetSocketAddress listening) { + var settings = new ChannelConfig(); + settings.setTimeoutMillis(GAME_INFO.timeout * 1000); + settings.nodelay(true, GAME_INFO.kcpInterval, 2, true); + settings.setMtu(1400); + settings.setSndwnd(256); + settings.setRcvwnd(256); + settings.setUseConvChannel(true); + settings.setAckNoDelay(false); + + this.init(new Listener(), settings, listening); + } + + @Override + public void shutdown() { + this.stop(); + + try { + this.networkLoop.shutdownGracefully(); + if (!this.networkLoop.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("Network loop did not terminate in time."); + } + } catch (Exception ex) { + log.warn("Failed to shutdown network loop.", ex); + } + } + + class Listener implements KcpListener { + @Override + public void onConnected(Ukcp ukcp) { + var transport = NetworkTransportImpl.this; + + try { + var server = transport.waitForServer(); + var session = new KcpSessionImpl(ukcp); + var gameSession = new GameSession(server, session); + + transport.sessions.put(ukcp, gameSession); + gameSession.onConnected(); + } catch (InterruptedException | IllegalStateException ex) { + NetworkTransportImpl.log.warn("Unable to establish connection.", ex); + ukcp.close(); + } + } + + @Override + public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) { + var transport = NetworkTransportImpl.this; + + try { + var session = transport.sessions.get(ukcp); + if (session == null) { + NetworkTransportImpl.log.debug("Received data from unknown session."); + return; + } + + transport.networkLoop.submit(() -> session.onReceived(byteBuf.array())); + } catch (Exception ex) { + NetworkTransportImpl.log.warn("Unable to handle received data.", ex); + } finally { + byteBuf.release(); + } + } + + @Override + public void handleException(Throwable throwable, Ukcp ukcp) { + NetworkTransportImpl.log.debug("Exception occurred in session.", throwable); + } + + @Override + public void handleClose(Ukcp ukcp) { + var sessions = NetworkTransportImpl.this.sessions; + var session = sessions.get(ukcp); + if (session == null) { + NetworkTransportImpl.log.debug("Received close from unknown session."); + return; + } + + session.onDisconnected(); + sessions.remove(ukcp); + } + } +} diff --git a/src/main/java/emu/grasscutter/server/game/GameServer.java b/src/main/java/emu/grasscutter/server/game/GameServer.java index ecd6b1425..ccc5ef625 100644 --- a/src/main/java/emu/grasscutter/server/game/GameServer.java +++ b/src/main/java/emu/grasscutter/server/game/GameServer.java @@ -32,6 +32,8 @@ import emu.grasscutter.game.talk.TalkSystem; import emu.grasscutter.game.tower.TowerSystem; import emu.grasscutter.game.world.World; import emu.grasscutter.game.world.WorldDataSystem; +import emu.grasscutter.net.INetworkTransport; +import emu.grasscutter.net.impl.NetworkTransportImpl; import emu.grasscutter.net.packet.PacketHandler; import emu.grasscutter.net.proto.SocialDetailOuterClass.SocialDetail; import emu.grasscutter.server.dispatch.DispatchClient; @@ -47,14 +49,22 @@ import java.net.*; import java.time.*; import java.util.*; import java.util.concurrent.*; -import kcp.highway.*; import lombok.*; +import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.*; @Getter -public final class GameServer extends KcpServer implements Iterable { +@Slf4j +public final class GameServer implements Iterable { + /** + * This can be set by plugins to change the network transport implementation. + */ + @Setter private static Class transport = NetworkTransportImpl.class; + // Game server base private final InetSocketAddress address; + private final INetworkTransport netTransport; + private final GameServerPacketHandler packetHandler; private final Map players; private final Set worlds; @@ -106,6 +116,7 @@ public final class GameServer extends KcpServer implements Iterable { this.taskMap = null; this.address = null; + this.netTransport = null; this.packetHandler = null; this.dispatchClient = null; this.players = null; @@ -131,16 +142,20 @@ public final class GameServer extends KcpServer implements Iterable { return; } - var channelConfig = new ChannelConfig(); - channelConfig.nodelay(true, GAME_INFO.kcpInterval, 2, true); - channelConfig.setMtu(1400); - channelConfig.setSndwnd(256); - channelConfig.setRcvwnd(256); - channelConfig.setTimeoutMillis(30 * 1000); // 30s - channelConfig.setUseConvChannel(true); - channelConfig.setAckNoDelay(false); + // Create the network transport. + INetworkTransport transport; + try { + transport = GameServer.transport + .getDeclaredConstructor() + .newInstance(); + } catch (Exception ex) { + log.error("Failed to create network transport.", ex); + transport = new NetworkTransportImpl(); + } - this.init(GameSessionManager.getListener(), channelConfig, address); + // Initialize the transport. + this.netTransport = transport; + this.netTransport.start(this.address = address); EnergyManager.initialize(); StaminaManager.initialize(); @@ -149,7 +164,6 @@ public final class GameServer extends KcpServer implements Iterable { CombineManger.initialize(); // Game Server base - this.address = address; this.packetHandler = new GameServerPacketHandler(PacketHandler.class); this.dispatchClient = new DispatchClient(GameServer.getDispatchUrl()); this.players = new ConcurrentHashMap<>(); @@ -184,7 +198,7 @@ public final class GameServer extends KcpServer implements Iterable { private static InetSocketAddress getAdapterInetSocketAddress() { InetSocketAddress inetSocketAddress; - if (GAME_INFO.bindAddress.equals("")) { + if (GAME_INFO.bindAddress.isEmpty()) { inetSocketAddress = new InetSocketAddress(GAME_INFO.bindPort); } else { inetSocketAddress = new InetSocketAddress(GAME_INFO.bindAddress, GAME_INFO.bindPort); @@ -353,19 +367,6 @@ public final class GameServer extends KcpServer implements Iterable { this.getWorlds().forEach(World::save); Utils.sleep(1000L); // Wait 1 second for operations to finish. - this.stop(); // Stop the server. - - try { - var threadPool = GameSessionManager.getLogicThread(); - - // Shutdown network thread. - threadPool.shutdownGracefully(); - // Wait for the network thread to finish. - if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) { - Grasscutter.getLogger().error("Logic thread did not terminate!"); - } - } catch (InterruptedException ignored) { - } } @NotNull @Override diff --git a/src/main/java/emu/grasscutter/server/game/GameSession.java b/src/main/java/emu/grasscutter/server/game/GameSession.java index fa0d1a357..7115317a5 100644 --- a/src/main/java/emu/grasscutter/server/game/GameSession.java +++ b/src/main/java/emu/grasscutter/server/game/GameSession.java @@ -7,18 +7,17 @@ import emu.grasscutter.Grasscutter; import emu.grasscutter.Grasscutter.ServerDebugMode; import emu.grasscutter.game.Account; import emu.grasscutter.game.player.Player; +import emu.grasscutter.net.IKcpSession; import emu.grasscutter.net.packet.*; import emu.grasscutter.server.event.game.SendPacketEvent; import emu.grasscutter.utils.*; import io.netty.buffer.*; -import java.io.File; import java.net.InetSocketAddress; -import java.nio.file.Path; import lombok.*; -public class GameSession implements GameSessionManager.KcpChannel { - private final GameServer server; - private GameSessionManager.KcpTunnel tunnel; +public class GameSession implements IGameSession { + @Getter private final GameServer server; + private IKcpSession session; @Getter @Setter private Account account; @Getter private Player player; @@ -33,8 +32,10 @@ public class GameSession implements GameSessionManager.KcpChannel { @Getter private long lastPingTime; private int lastClientSeq = 10; - public GameSession(GameServer server) { + public GameSession(GameServer server, IKcpSession session) { this.server = server; + this.session = session; + this.state = SessionState.WAITING_FOR_TOKEN; this.lastPingTime = System.currentTimeMillis(); @@ -44,24 +45,8 @@ public class GameSession implements GameSessionManager.KcpChannel { } } - public GameServer getServer() { - return server; - } - public InetSocketAddress getAddress() { - try { - return tunnel.getAddress(); - } catch (Throwable ignore) { - return null; - } - } - - public boolean useSecretKey() { - return useSecretKey; - } - - public String getAccountId() { - return this.getAccount().getId(); + return this.session.getAddress(); } public synchronized void setPlayer(Player player) { @@ -83,30 +68,16 @@ public class GameSession implements GameSessionManager.KcpChannel { return ++lastClientSeq; } - public void replayPacket(int opcode, String name) { - Path filePath = FileUtils.getPluginPath(name); - File p = filePath.toFile(); - - if (!p.exists()) return; - - byte[] packet = FileUtils.read(p); - - BasePacket basePacket = new BasePacket(opcode); - basePacket.setData(packet); - - send(basePacket); - } - public void logPacket(String sendOrRecv, int opcode, byte[] payload) { - Grasscutter.getLogger() - .info(sendOrRecv + ": " + PacketOpcodesUtils.getOpcodeName(opcode) + " (" + opcode + ")"); + this.session.getLogger().info("{}: {} ({})", + sendOrRecv, PacketOpcodesUtils.getOpcodeName(opcode), opcode); if (GAME_INFO.isShowPacketPayload) System.out.println(Utils.bytesToHex(payload)); } public void send(BasePacket packet) { // Test if (packet.getOpcode() <= 0) { - Grasscutter.getLogger().warn("Tried to send packet with missing cmd id!"); + this.session.getLogger().warn("Attempted to send packet with unknown ID!"); return; } @@ -146,28 +117,24 @@ public class GameSession implements GameSessionManager.KcpChannel { if (packet.shouldEncrypt) { Crypto.xor(bytes, packet.useDispatchKey() ? Crypto.DISPATCH_KEY : this.encryptKey); } - tunnel.writeData(bytes); - } catch (Exception ignored) { - Grasscutter.getLogger().debug("Unable to send packet to client."); + this.session.send(bytes); + } catch (Exception ex) { + this.session.getLogger().debug("Unable to send packet to client.", ex); } } } @Override - public void onConnected(GameSessionManager.KcpTunnel tunnel) { - this.tunnel = tunnel; + public void onConnected() { Grasscutter.getLogger().info(translate("messages.game.connect", this.getAddress().toString())); } @Override - public void handleReceive(byte[] bytes) { + public void onReceived(byte[] bytes) { // Decrypt and turn back into a packet - Crypto.xor(bytes, useSecretKey() ? this.encryptKey : Crypto.DISPATCH_KEY); + Crypto.xor(bytes, this.useSecretKey ? this.encryptKey : Crypto.DISPATCH_KEY); ByteBuf packet = Unpooled.wrappedBuffer(bytes); - // Log - // logPacket(packet); - // Handle try { boolean allDebug = GAME_INFO.logPackets == ServerDebugMode.ALL; while (packet.readableBytes() > 0) { @@ -179,11 +146,11 @@ public class GameSession implements GameSessionManager.KcpChannel { int const1 = packet.readShort(); if (const1 != 17767) { if (allDebug) { - Grasscutter.getLogger() - .error("Bad Data Package Received: got {} ,expect 17767", const1); + this.session.getLogger().error("Invalid packet header received: got {}, expected 17767", const1); } return; // Bad packet } + // Data int opcode = packet.readShort(); int headerLength = packet.readShort(); @@ -197,8 +164,7 @@ public class GameSession implements GameSessionManager.KcpChannel { int const2 = packet.readShort(); if (const2 != -30293) { if (allDebug) { - Grasscutter.getLogger() - .error("Bad Data Package Received: got {} ,expect -30293", const2); + this.session.getLogger().error("Invalid packet footer received: got {}, expected -30293", const2); } return; // Bad packet } @@ -226,16 +192,15 @@ public class GameSession implements GameSessionManager.KcpChannel { // Handle getServer().getPacketHandler().handle(this, opcode, header, payload); } - } catch (Exception e) { - e.printStackTrace(); + } catch (Exception ex) { + this.session.getLogger().warn("Unable to process packet.", ex); } finally { - // byteBuf.release(); //Needn't packet.release(); } } @Override - public void handleClose() { + public void onDisconnected() { setState(SessionState.INACTIVE); // send disconnection pack in case of reconnection Grasscutter.getLogger() @@ -247,19 +212,20 @@ public class GameSession implements GameSessionManager.KcpChannel { player.onLogout(); } try { - send(new BasePacket(PacketOpcodes.ServerDisconnectClientNotify)); - } catch (Throwable ignore) { - Grasscutter.getLogger().warn("closing {} error", getAddress().getAddress().getHostAddress()); + this.send(new BasePacket(PacketOpcodes.ServerDisconnectClientNotify)); + } catch (Throwable ex) { + this.session.getLogger().warn("Failed to disconnect client.", ex); } - tunnel = null; + + this.session = null; } public void close() { - tunnel.close(); + this.session.close(); } public boolean isActive() { - return getState() == SessionState.ACTIVE; + return this.getState() == SessionState.ACTIVE; } public enum SessionState { diff --git a/src/main/java/emu/grasscutter/server/game/GameSessionManager.java b/src/main/java/emu/grasscutter/server/game/GameSessionManager.java deleted file mode 100644 index 5c6ef7770..000000000 --- a/src/main/java/emu/grasscutter/server/game/GameSessionManager.java +++ /dev/null @@ -1,114 +0,0 @@ -package emu.grasscutter.server.game; - -import emu.grasscutter.Grasscutter; -import emu.grasscutter.utils.Utils; -import io.netty.buffer.*; -import io.netty.channel.DefaultEventLoop; -import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import kcp.highway.*; -import lombok.Getter; - -public class GameSessionManager { - @Getter private static final DefaultEventLoop logicThread = new DefaultEventLoop(); - private static final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); - private static final KcpListener listener = - new KcpListener() { - @Override - public void onConnected(Ukcp ukcp) { - int times = 0; - GameServer server = Grasscutter.getGameServer(); - while (server == null) { // Waiting server to establish - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - ukcp.close(); - return; - } - if (times++ > 5) { - Grasscutter.getLogger().error("Service is not available!"); - ukcp.close(); - return; - } - server = Grasscutter.getGameServer(); - } - GameSession conversation = new GameSession(server); - conversation.onConnected( - new KcpTunnel() { - @Override - public InetSocketAddress getAddress() { - return ukcp.user().getRemoteAddress(); - } - - @Override - public void writeData(byte[] bytes) { - ByteBuf buf = Unpooled.wrappedBuffer(bytes); - ukcp.write(buf); - buf.release(); - } - - @Override - public void close() { - ukcp.close(); - } - - @Override - public int getSrtt() { - return ukcp.srtt(); - } - }); - sessions.put(ukcp, conversation); - } - - @Override - public void handleReceive(ByteBuf buf, Ukcp kcp) { - var byteData = Utils.byteBufToArray(buf); - logicThread.execute( - () -> { - try { - var conversation = sessions.get(kcp); - if (conversation != null) { - conversation.handleReceive(byteData); - } - } catch (Exception e) { - e.printStackTrace(); - } - }); - } - - @Override - public void handleException(Throwable ex, Ukcp ukcp) {} - - @Override - public void handleClose(Ukcp ukcp) { - GameSession conversation = sessions.get(ukcp); - if (conversation != null) { - conversation.handleClose(); - sessions.remove(ukcp); - } - } - }; - - public static KcpListener getListener() { - return listener; - } - - public interface KcpTunnel { - InetSocketAddress getAddress(); - - void writeData(byte[] bytes); - - void close(); - - int getSrtt(); - } - - interface KcpChannel { - void onConnected(KcpTunnel tunnel); - - void handleClose(); - - void handleReceive(byte[] bytes); - } -} diff --git a/src/main/java/emu/grasscutter/server/game/IGameSession.java b/src/main/java/emu/grasscutter/server/game/IGameSession.java new file mode 100644 index 000000000..dca3efe54 --- /dev/null +++ b/src/main/java/emu/grasscutter/server/game/IGameSession.java @@ -0,0 +1,22 @@ +package emu.grasscutter.server.game; + +public interface IGameSession { + /** + * Invoked when the server establishes a connection to the client. + *

+ * This is invoked after the KCP handshake is completed. + */ + void onConnected(); + + /** + * Invoked when the server loses connection to the client. + */ + void onDisconnected(); + + /** + * Invoked when the server receives data from the client. + * + * @param data The raw data (not KCP-encoded) received from the client. + */ + void onReceived(byte[] data); +}