feat(networking): Abstract game session networking

includes:
- abstracted form of session handling
- existing implementation using new abstracted system
- general clean-up of GameSession.java
This commit is contained in:
KingRainbow44 2024-07-06 21:58:02 -04:00
parent db4542653a
commit d0e3720748
No known key found for this signature in database
GPG Key ID: FC2CB64B00D257BE
9 changed files with 316 additions and 205 deletions

View File

@ -35,9 +35,10 @@ public class ConfigContainer {
* HTTP server should start immediately.
* Version 13 - 'game.useUniquePacketKey' was added to control whether the
* encryption key used for packets is a constant or randomly generated.
* Version 14 - 'game.timeout' was added to control the UDP client timeout.
*/
private static int version() {
return 13;
return 14;
}
/**
@ -183,6 +184,9 @@ public class ConfigContainer {
/* Kcp internal work interval (milliseconds) */
public int kcpInterval = 20;
/* Time to wait (in seconds) before terminating a connection. */
public long timeout = 30;
/* Controls whether packets should be logged in console or not */
public ServerDebugMode logPackets = ServerDebugMode.NONE;
/* Show packet payload in console or no (in any case the payload is shown in encrypted view) */

View File

@ -0,0 +1,32 @@
package emu.grasscutter.net;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
/**
* This is most closely related to the previous `KcpTunnel` interface.
*/
public interface IKcpSession {
/**
* @return The session's unique logger.
*/
Logger getLogger();
/**
* @return The connecting client's address.
*/
InetSocketAddress getAddress();
/**
* Closes the server's connection to the client.
*/
void close();
/**
* Sends raw data to the client.
*
* @param data The data to send. This should not be KCP-encoded.
*/
void send(byte[] data);
}

View File

@ -0,0 +1,39 @@
package emu.grasscutter.net;
import emu.grasscutter.Grasscutter;
import emu.grasscutter.server.game.GameServer;
import java.net.InetSocketAddress;
public interface INetworkTransport {
/**
* Waits for the server to be active.
* This should be used to ensure that the server is ready to accept connections.
*/
default GameServer waitForServer() throws InterruptedException {
int depth = 0;
GameServer server;
while ((server = Grasscutter.getGameServer()) == null) {
Thread.sleep(1000);
if (depth++ > 5) {
throw new IllegalStateException("Game server is not available!");
}
}
return server;
}
/**
* This is invoked when the transport should start listening for incoming connections.
*
* @param listening The address/port to listen on.
*/
void start(InetSocketAddress listening);
/**
* This is invoked when the transport should stop listening for incoming connections.
* This should also close all active connections.
*/
void shutdown();
}

View File

@ -0,0 +1,49 @@
package emu.grasscutter.net.impl;
import emu.grasscutter.net.IKcpSession;
import emu.grasscutter.net.INetworkTransport;
import io.netty.buffer.Unpooled;
import kcp.highway.Ukcp;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* This is the default implementation of a KCP session.
* It uses {@link Ukcp} as the underlying wrapper.
*/
@Getter
public class KcpSessionImpl implements IKcpSession {
private final Ukcp handle;
private final Logger logger;
public KcpSessionImpl(Ukcp handle) {
this.handle = handle;
this.logger = LoggerFactory.getLogger("KcpSession " + handle.getConv());
}
@Override
public InetSocketAddress getAddress() {
return this.getHandle().user().getRemoteAddress();
}
@Override
public void close() {
this.getHandle().close(true);
}
@Override
public void send(byte[] data) {
var buffer = Unpooled.wrappedBuffer(data);
try {
this.getHandle().write(buffer);
} catch (Exception ex) {
this.getLogger().warn("Unable to send packet.", ex);
} finally {
buffer.release();
}
}
}

View File

@ -0,0 +1,112 @@
package emu.grasscutter.net.impl;
import emu.grasscutter.net.INetworkTransport;
import emu.grasscutter.server.game.GameSession;
import io.netty.buffer.ByteBuf;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import kcp.highway.ChannelConfig;
import kcp.highway.KcpListener;
import kcp.highway.KcpServer;
import kcp.highway.Ukcp;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static emu.grasscutter.config.Configuration.GAME_INFO;
/**
* The default implementation of a {@link INetworkTransport}.
* Uses {@link KcpServer} as the underlying transport.
*/
@Slf4j
public class NetworkTransportImpl extends KcpServer implements INetworkTransport {
private final EventLoop networkLoop = new DefaultEventLoop();
private final ConcurrentHashMap<Ukcp, GameSession> sessions = new ConcurrentHashMap<>();
@Override
public void start(InetSocketAddress listening) {
var settings = new ChannelConfig();
settings.setTimeoutMillis(GAME_INFO.timeout * 1000);
settings.nodelay(true, GAME_INFO.kcpInterval, 2, true);
settings.setMtu(1400);
settings.setSndwnd(256);
settings.setRcvwnd(256);
settings.setUseConvChannel(true);
settings.setAckNoDelay(false);
this.init(new Listener(), settings, listening);
}
@Override
public void shutdown() {
this.stop();
try {
this.networkLoop.shutdownGracefully();
if (!this.networkLoop.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("Network loop did not terminate in time.");
}
} catch (Exception ex) {
log.warn("Failed to shutdown network loop.", ex);
}
}
class Listener implements KcpListener {
@Override
public void onConnected(Ukcp ukcp) {
var transport = NetworkTransportImpl.this;
try {
var server = transport.waitForServer();
var session = new KcpSessionImpl(ukcp);
var gameSession = new GameSession(server, session);
transport.sessions.put(ukcp, gameSession);
gameSession.onConnected();
} catch (InterruptedException | IllegalStateException ex) {
NetworkTransportImpl.log.warn("Unable to establish connection.", ex);
ukcp.close();
}
}
@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
var transport = NetworkTransportImpl.this;
try {
var session = transport.sessions.get(ukcp);
if (session == null) {
NetworkTransportImpl.log.debug("Received data from unknown session.");
return;
}
transport.networkLoop.submit(() -> session.onReceived(byteBuf.array()));
} catch (Exception ex) {
NetworkTransportImpl.log.warn("Unable to handle received data.", ex);
} finally {
byteBuf.release();
}
}
@Override
public void handleException(Throwable throwable, Ukcp ukcp) {
NetworkTransportImpl.log.debug("Exception occurred in session.", throwable);
}
@Override
public void handleClose(Ukcp ukcp) {
var sessions = NetworkTransportImpl.this.sessions;
var session = sessions.get(ukcp);
if (session == null) {
NetworkTransportImpl.log.debug("Received close from unknown session.");
return;
}
session.onDisconnected();
sessions.remove(ukcp);
}
}
}

View File

@ -32,6 +32,8 @@ import emu.grasscutter.game.talk.TalkSystem;
import emu.grasscutter.game.tower.TowerSystem;
import emu.grasscutter.game.world.World;
import emu.grasscutter.game.world.WorldDataSystem;
import emu.grasscutter.net.INetworkTransport;
import emu.grasscutter.net.impl.NetworkTransportImpl;
import emu.grasscutter.net.packet.PacketHandler;
import emu.grasscutter.net.proto.SocialDetailOuterClass.SocialDetail;
import emu.grasscutter.server.dispatch.DispatchClient;
@ -47,14 +49,22 @@ import java.net.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import kcp.highway.*;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.*;
@Getter
public final class GameServer extends KcpServer implements Iterable<Player> {
@Slf4j
public final class GameServer implements Iterable<Player> {
/**
* This can be set by plugins to change the network transport implementation.
*/
@Setter private static Class<? extends INetworkTransport> transport = NetworkTransportImpl.class;
// Game server base
private final InetSocketAddress address;
private final INetworkTransport netTransport;
private final GameServerPacketHandler packetHandler;
private final Map<Integer, Player> players;
private final Set<World> worlds;
@ -106,6 +116,7 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
this.taskMap = null;
this.address = null;
this.netTransport = null;
this.packetHandler = null;
this.dispatchClient = null;
this.players = null;
@ -131,16 +142,20 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
return;
}
var channelConfig = new ChannelConfig();
channelConfig.nodelay(true, GAME_INFO.kcpInterval, 2, true);
channelConfig.setMtu(1400);
channelConfig.setSndwnd(256);
channelConfig.setRcvwnd(256);
channelConfig.setTimeoutMillis(30 * 1000); // 30s
channelConfig.setUseConvChannel(true);
channelConfig.setAckNoDelay(false);
// Create the network transport.
INetworkTransport transport;
try {
transport = GameServer.transport
.getDeclaredConstructor()
.newInstance();
} catch (Exception ex) {
log.error("Failed to create network transport.", ex);
transport = new NetworkTransportImpl();
}
this.init(GameSessionManager.getListener(), channelConfig, address);
// Initialize the transport.
this.netTransport = transport;
this.netTransport.start(this.address = address);
EnergyManager.initialize();
StaminaManager.initialize();
@ -149,7 +164,6 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
CombineManger.initialize();
// Game Server base
this.address = address;
this.packetHandler = new GameServerPacketHandler(PacketHandler.class);
this.dispatchClient = new DispatchClient(GameServer.getDispatchUrl());
this.players = new ConcurrentHashMap<>();
@ -184,7 +198,7 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
private static InetSocketAddress getAdapterInetSocketAddress() {
InetSocketAddress inetSocketAddress;
if (GAME_INFO.bindAddress.equals("")) {
if (GAME_INFO.bindAddress.isEmpty()) {
inetSocketAddress = new InetSocketAddress(GAME_INFO.bindPort);
} else {
inetSocketAddress = new InetSocketAddress(GAME_INFO.bindAddress, GAME_INFO.bindPort);
@ -353,19 +367,6 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
this.getWorlds().forEach(World::save);
Utils.sleep(1000L); // Wait 1 second for operations to finish.
this.stop(); // Stop the server.
try {
var threadPool = GameSessionManager.getLogicThread();
// Shutdown network thread.
threadPool.shutdownGracefully();
// Wait for the network thread to finish.
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
Grasscutter.getLogger().error("Logic thread did not terminate!");
}
} catch (InterruptedException ignored) {
}
}
@NotNull @Override

View File

@ -7,18 +7,17 @@ import emu.grasscutter.Grasscutter;
import emu.grasscutter.Grasscutter.ServerDebugMode;
import emu.grasscutter.game.Account;
import emu.grasscutter.game.player.Player;
import emu.grasscutter.net.IKcpSession;
import emu.grasscutter.net.packet.*;
import emu.grasscutter.server.event.game.SendPacketEvent;
import emu.grasscutter.utils.*;
import io.netty.buffer.*;
import java.io.File;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import lombok.*;
public class GameSession implements GameSessionManager.KcpChannel {
private final GameServer server;
private GameSessionManager.KcpTunnel tunnel;
public class GameSession implements IGameSession {
@Getter private final GameServer server;
private IKcpSession session;
@Getter @Setter private Account account;
@Getter private Player player;
@ -33,8 +32,10 @@ public class GameSession implements GameSessionManager.KcpChannel {
@Getter private long lastPingTime;
private int lastClientSeq = 10;
public GameSession(GameServer server) {
public GameSession(GameServer server, IKcpSession session) {
this.server = server;
this.session = session;
this.state = SessionState.WAITING_FOR_TOKEN;
this.lastPingTime = System.currentTimeMillis();
@ -44,24 +45,8 @@ public class GameSession implements GameSessionManager.KcpChannel {
}
}
public GameServer getServer() {
return server;
}
public InetSocketAddress getAddress() {
try {
return tunnel.getAddress();
} catch (Throwable ignore) {
return null;
}
}
public boolean useSecretKey() {
return useSecretKey;
}
public String getAccountId() {
return this.getAccount().getId();
return this.session.getAddress();
}
public synchronized void setPlayer(Player player) {
@ -83,30 +68,16 @@ public class GameSession implements GameSessionManager.KcpChannel {
return ++lastClientSeq;
}
public void replayPacket(int opcode, String name) {
Path filePath = FileUtils.getPluginPath(name);
File p = filePath.toFile();
if (!p.exists()) return;
byte[] packet = FileUtils.read(p);
BasePacket basePacket = new BasePacket(opcode);
basePacket.setData(packet);
send(basePacket);
}
public void logPacket(String sendOrRecv, int opcode, byte[] payload) {
Grasscutter.getLogger()
.info(sendOrRecv + ": " + PacketOpcodesUtils.getOpcodeName(opcode) + " (" + opcode + ")");
this.session.getLogger().info("{}: {} ({})",
sendOrRecv, PacketOpcodesUtils.getOpcodeName(opcode), opcode);
if (GAME_INFO.isShowPacketPayload) System.out.println(Utils.bytesToHex(payload));
}
public void send(BasePacket packet) {
// Test
if (packet.getOpcode() <= 0) {
Grasscutter.getLogger().warn("Tried to send packet with missing cmd id!");
this.session.getLogger().warn("Attempted to send packet with unknown ID!");
return;
}
@ -146,28 +117,24 @@ public class GameSession implements GameSessionManager.KcpChannel {
if (packet.shouldEncrypt) {
Crypto.xor(bytes, packet.useDispatchKey() ? Crypto.DISPATCH_KEY : this.encryptKey);
}
tunnel.writeData(bytes);
} catch (Exception ignored) {
Grasscutter.getLogger().debug("Unable to send packet to client.");
this.session.send(bytes);
} catch (Exception ex) {
this.session.getLogger().debug("Unable to send packet to client.", ex);
}
}
}
@Override
public void onConnected(GameSessionManager.KcpTunnel tunnel) {
this.tunnel = tunnel;
public void onConnected() {
Grasscutter.getLogger().info(translate("messages.game.connect", this.getAddress().toString()));
}
@Override
public void handleReceive(byte[] bytes) {
public void onReceived(byte[] bytes) {
// Decrypt and turn back into a packet
Crypto.xor(bytes, useSecretKey() ? this.encryptKey : Crypto.DISPATCH_KEY);
Crypto.xor(bytes, this.useSecretKey ? this.encryptKey : Crypto.DISPATCH_KEY);
ByteBuf packet = Unpooled.wrappedBuffer(bytes);
// Log
// logPacket(packet);
// Handle
try {
boolean allDebug = GAME_INFO.logPackets == ServerDebugMode.ALL;
while (packet.readableBytes() > 0) {
@ -179,11 +146,11 @@ public class GameSession implements GameSessionManager.KcpChannel {
int const1 = packet.readShort();
if (const1 != 17767) {
if (allDebug) {
Grasscutter.getLogger()
.error("Bad Data Package Received: got {} ,expect 17767", const1);
this.session.getLogger().error("Invalid packet header received: got {}, expected 17767", const1);
}
return; // Bad packet
}
// Data
int opcode = packet.readShort();
int headerLength = packet.readShort();
@ -197,8 +164,7 @@ public class GameSession implements GameSessionManager.KcpChannel {
int const2 = packet.readShort();
if (const2 != -30293) {
if (allDebug) {
Grasscutter.getLogger()
.error("Bad Data Package Received: got {} ,expect -30293", const2);
this.session.getLogger().error("Invalid packet footer received: got {}, expected -30293", const2);
}
return; // Bad packet
}
@ -226,16 +192,15 @@ public class GameSession implements GameSessionManager.KcpChannel {
// Handle
getServer().getPacketHandler().handle(this, opcode, header, payload);
}
} catch (Exception e) {
e.printStackTrace();
} catch (Exception ex) {
this.session.getLogger().warn("Unable to process packet.", ex);
} finally {
// byteBuf.release(); //Needn't
packet.release();
}
}
@Override
public void handleClose() {
public void onDisconnected() {
setState(SessionState.INACTIVE);
// send disconnection pack in case of reconnection
Grasscutter.getLogger()
@ -247,19 +212,20 @@ public class GameSession implements GameSessionManager.KcpChannel {
player.onLogout();
}
try {
send(new BasePacket(PacketOpcodes.ServerDisconnectClientNotify));
} catch (Throwable ignore) {
Grasscutter.getLogger().warn("closing {} error", getAddress().getAddress().getHostAddress());
this.send(new BasePacket(PacketOpcodes.ServerDisconnectClientNotify));
} catch (Throwable ex) {
this.session.getLogger().warn("Failed to disconnect client.", ex);
}
tunnel = null;
this.session = null;
}
public void close() {
tunnel.close();
this.session.close();
}
public boolean isActive() {
return getState() == SessionState.ACTIVE;
return this.getState() == SessionState.ACTIVE;
}
public enum SessionState {

View File

@ -1,114 +0,0 @@
package emu.grasscutter.server.game;
import emu.grasscutter.Grasscutter;
import emu.grasscutter.utils.Utils;
import io.netty.buffer.*;
import io.netty.channel.DefaultEventLoop;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import kcp.highway.*;
import lombok.Getter;
public class GameSessionManager {
@Getter private static final DefaultEventLoop logicThread = new DefaultEventLoop();
private static final ConcurrentHashMap<Ukcp, GameSession> sessions = new ConcurrentHashMap<>();
private static final KcpListener listener =
new KcpListener() {
@Override
public void onConnected(Ukcp ukcp) {
int times = 0;
GameServer server = Grasscutter.getGameServer();
while (server == null) { // Waiting server to establish
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
ukcp.close();
return;
}
if (times++ > 5) {
Grasscutter.getLogger().error("Service is not available!");
ukcp.close();
return;
}
server = Grasscutter.getGameServer();
}
GameSession conversation = new GameSession(server);
conversation.onConnected(
new KcpTunnel() {
@Override
public InetSocketAddress getAddress() {
return ukcp.user().getRemoteAddress();
}
@Override
public void writeData(byte[] bytes) {
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
ukcp.write(buf);
buf.release();
}
@Override
public void close() {
ukcp.close();
}
@Override
public int getSrtt() {
return ukcp.srtt();
}
});
sessions.put(ukcp, conversation);
}
@Override
public void handleReceive(ByteBuf buf, Ukcp kcp) {
var byteData = Utils.byteBufToArray(buf);
logicThread.execute(
() -> {
try {
var conversation = sessions.get(kcp);
if (conversation != null) {
conversation.handleReceive(byteData);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
@Override
public void handleException(Throwable ex, Ukcp ukcp) {}
@Override
public void handleClose(Ukcp ukcp) {
GameSession conversation = sessions.get(ukcp);
if (conversation != null) {
conversation.handleClose();
sessions.remove(ukcp);
}
}
};
public static KcpListener getListener() {
return listener;
}
public interface KcpTunnel {
InetSocketAddress getAddress();
void writeData(byte[] bytes);
void close();
int getSrtt();
}
interface KcpChannel {
void onConnected(KcpTunnel tunnel);
void handleClose();
void handleReceive(byte[] bytes);
}
}

View File

@ -0,0 +1,22 @@
package emu.grasscutter.server.game;
public interface IGameSession {
/**
* Invoked when the server establishes a connection to the client.
* <p>
* This is invoked after the KCP handshake is completed.
*/
void onConnected();
/**
* Invoked when the server loses connection to the client.
*/
void onDisconnected();
/**
* Invoked when the server receives data from the client.
*
* @param data The raw data (not KCP-encoded) received from the client.
*/
void onReceived(byte[] data);
}