Organize & sort the KCP system

This commit is contained in:
KingRainbow44 2023-09-09 15:13:43 -04:00
parent 8db1f597ce
commit 5b5ec9b6b4
No known key found for this signature in database
GPG Key ID: FC2CB64B00D257BE
7 changed files with 199 additions and 147 deletions

View File

@ -0,0 +1,22 @@
package emu.grasscutter.net;
public interface KcpChannel {
/**
* Event fired when the client connects.
*
* @param tunnel The tunnel.
*/
void onConnected(KcpTunnel tunnel);
/**
* Event fired when the client disconnects.
*/
void onDisconnected();
/**
* Event fired when data is received from the client.
*
* @param bytes The data received.
*/
void onMessage(byte[] bytes);
}

View File

@ -0,0 +1,22 @@
package emu.grasscutter.net;
import java.net.InetSocketAddress;
public interface KcpTunnel {
/**
* @return The address of the client.
*/
InetSocketAddress getAddress();
/**
* Sends bytes to the client.
*
* @param bytes The bytes to send.
*/
void writeData(byte[] bytes);
/**
* Closes the connection.
*/
void close();
}

View File

@ -1,55 +1,49 @@
package emu.grasscutter.server.game; package emu.grasscutter.server.game;
import static emu.grasscutter.config.Configuration.*;
import static emu.grasscutter.utils.lang.Language.translate;
import emu.grasscutter.*; import emu.grasscutter.*;
import emu.grasscutter.Grasscutter.ServerRunMode; import emu.grasscutter.Grasscutter.ServerRunMode;
import emu.grasscutter.database.DatabaseHelper; import emu.grasscutter.database.DatabaseHelper;
import emu.grasscutter.game.Account; import emu.grasscutter.game.Account;
import emu.grasscutter.game.battlepass.BattlePassSystem; import emu.grasscutter.game.battlepass.BattlePassSystem;
import emu.grasscutter.game.chat.ChatSystem; import emu.grasscutter.game.chat.*;
import emu.grasscutter.game.chat.ChatSystemHandler;
import emu.grasscutter.game.combine.CombineManger; import emu.grasscutter.game.combine.CombineManger;
import emu.grasscutter.game.drop.DropSystem; import emu.grasscutter.game.drop.*;
import emu.grasscutter.game.drop.DropSystemLegacy;
import emu.grasscutter.game.dungeons.DungeonSystem; import emu.grasscutter.game.dungeons.DungeonSystem;
import emu.grasscutter.game.expedition.ExpeditionSystem; import emu.grasscutter.game.expedition.ExpeditionSystem;
import emu.grasscutter.game.gacha.GachaSystem; import emu.grasscutter.game.gacha.GachaSystem;
import emu.grasscutter.game.home.HomeWorld; import emu.grasscutter.game.home.*;
import emu.grasscutter.game.home.HomeWorldMPSystem; import emu.grasscutter.game.managers.cooking.*;
import emu.grasscutter.game.managers.cooking.CookingCompoundManager;
import emu.grasscutter.game.managers.cooking.CookingManager;
import emu.grasscutter.game.managers.energy.EnergyManager; import emu.grasscutter.game.managers.energy.EnergyManager;
import emu.grasscutter.game.managers.stamina.StaminaManager; import emu.grasscutter.game.managers.stamina.StaminaManager;
import emu.grasscutter.game.player.Player; import emu.grasscutter.game.player.Player;
import emu.grasscutter.game.quest.QuestSystem; import emu.grasscutter.game.quest.QuestSystem;
import emu.grasscutter.game.shop.ShopSystem; import emu.grasscutter.game.shop.ShopSystem;
import emu.grasscutter.game.systems.AnnouncementSystem; import emu.grasscutter.game.systems.*;
import emu.grasscutter.game.systems.InventorySystem;
import emu.grasscutter.game.systems.MultiplayerSystem;
import emu.grasscutter.game.talk.TalkSystem; import emu.grasscutter.game.talk.TalkSystem;
import emu.grasscutter.game.tower.TowerSystem; import emu.grasscutter.game.tower.TowerSystem;
import emu.grasscutter.game.world.World; import emu.grasscutter.game.world.*;
import emu.grasscutter.game.world.WorldDataSystem;
import emu.grasscutter.net.packet.PacketHandler; import emu.grasscutter.net.packet.PacketHandler;
import emu.grasscutter.net.proto.SocialDetailOuterClass.SocialDetail; import emu.grasscutter.net.proto.SocialDetailOuterClass.SocialDetail;
import emu.grasscutter.server.dispatch.DispatchClient; import emu.grasscutter.server.dispatch.DispatchClient;
import emu.grasscutter.server.event.game.ServerTickEvent; import emu.grasscutter.server.event.game.ServerTickEvent;
import emu.grasscutter.server.event.internal.ServerStartEvent; import emu.grasscutter.server.event.internal.*;
import emu.grasscutter.server.event.internal.ServerStopEvent;
import emu.grasscutter.server.event.types.ServerEvent; import emu.grasscutter.server.event.types.ServerEvent;
import emu.grasscutter.server.scheduler.ServerTaskScheduler; import emu.grasscutter.server.scheduler.ServerTaskScheduler;
import emu.grasscutter.task.TaskMap; import emu.grasscutter.task.TaskMap;
import emu.grasscutter.utils.Utils; import emu.grasscutter.utils.Utils;
import it.unimi.dsi.fastutil.ints.*; import it.unimi.dsi.fastutil.ints.*;
import kcp.highway.*;
import lombok.*;
import org.jetbrains.annotations.*;
import emu.grasscutter.server.game.session.GameSessionManager;
import java.net.*; import java.net.*;
import java.time.*; import java.time.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import kcp.highway.*;
import lombok.*; import static emu.grasscutter.config.Configuration.*;
import org.jetbrains.annotations.*; import static emu.grasscutter.utils.lang.Language.translate;
@Getter @Getter
public final class GameServer extends KcpServer implements Iterable<Player> { public final class GameServer extends KcpServer implements Iterable<Player> {
@ -60,6 +54,7 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
private final Set<World> worlds; private final Set<World> worlds;
private final Int2ObjectMap<HomeWorld> homeWorlds; private final Int2ObjectMap<HomeWorld> homeWorlds;
@Getter private boolean started = false;
@Setter private DispatchClient dispatchClient; @Setter private DispatchClient dispatchClient;
// Server systems // Server systems
@ -140,7 +135,7 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
channelConfig.setUseConvChannel(true); channelConfig.setUseConvChannel(true);
channelConfig.setAckNoDelay(false); channelConfig.setAckNoDelay(false);
this.init(GameSessionManager.getListener(), channelConfig, address); this.init(GameSessionManager.getInstance(), channelConfig, address);
EnergyManager.initialize(); EnergyManager.initialize();
StaminaManager.initialize(); StaminaManager.initialize();
@ -350,6 +345,8 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
.info(translate("messages.game.address_bind", GAME_INFO.accessAddress, address.getPort())); .info(translate("messages.game.address_bind", GAME_INFO.accessAddress, address.getPort()));
ServerStartEvent event = new ServerStartEvent(ServerEvent.Type.GAME, OffsetDateTime.now()); ServerStartEvent event = new ServerStartEvent(ServerEvent.Type.GAME, OffsetDateTime.now());
event.call(); event.call();
this.started = true;
} }
public void onServerShutdown() { public void onServerShutdown() {
@ -364,10 +361,10 @@ public final class GameServer extends KcpServer implements Iterable<Player> {
this.stop(); // Stop the server. this.stop(); // Stop the server.
try { try {
var threadPool = GameSessionManager.getLogicThread(); var threadPool = GameSessionManager.getExecutor();
// Shutdown network thread. // Shutdown network thread.
threadPool.shutdownGracefully(); threadPool.shutdown();
// Wait for the network thread to finish. // Wait for the network thread to finish.
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) { if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
Grasscutter.getLogger().error("Logic thread did not terminate!"); Grasscutter.getLogger().error("Logic thread did not terminate!");

View File

@ -1,24 +1,26 @@
package emu.grasscutter.server.game; package emu.grasscutter.server.game;
import static emu.grasscutter.config.Configuration.*;
import static emu.grasscutter.utils.lang.Language.translate;
import emu.grasscutter.Grasscutter; import emu.grasscutter.Grasscutter;
import emu.grasscutter.Grasscutter.ServerDebugMode; import emu.grasscutter.Grasscutter.ServerDebugMode;
import emu.grasscutter.game.Account; import emu.grasscutter.game.Account;
import emu.grasscutter.game.player.Player; import emu.grasscutter.game.player.Player;
import emu.grasscutter.net.*;
import emu.grasscutter.net.packet.*; import emu.grasscutter.net.packet.*;
import emu.grasscutter.server.event.game.SendPacketEvent; import emu.grasscutter.server.event.game.SendPacketEvent;
import emu.grasscutter.utils.*; import emu.grasscutter.utils.*;
import io.netty.buffer.*; import io.netty.buffer.*;
import lombok.*;
import java.io.File; import java.io.File;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.file.Path; import java.nio.file.Path;
import lombok.*;
public class GameSession implements GameSessionManager.KcpChannel { import static emu.grasscutter.config.Configuration.*;
import static emu.grasscutter.utils.lang.Language.translate;
public class GameSession implements KcpChannel {
private final GameServer server; private final GameServer server;
private GameSessionManager.KcpTunnel tunnel; private KcpTunnel tunnel;
@Getter @Setter private Account account; @Getter @Setter private Account account;
@Getter private Player player; @Getter private Player player;
@ -141,13 +143,13 @@ public class GameSession implements GameSessionManager.KcpChannel {
} }
@Override @Override
public void onConnected(GameSessionManager.KcpTunnel tunnel) { public void onConnected(KcpTunnel tunnel) {
this.tunnel = tunnel; this.tunnel = tunnel;
Grasscutter.getLogger().info(translate("messages.game.connect", this.getAddress().toString())); Grasscutter.getLogger().info(translate("messages.game.connect", this.getAddress().toString()));
} }
@Override @Override
public void handleReceive(byte[] bytes) { public void onMessage(byte[] bytes) {
// Decrypt and turn back into a packet // Decrypt and turn back into a packet
Crypto.xor(bytes, useSecretKey() ? Crypto.ENCRYPT_KEY : Crypto.DISPATCH_KEY); Crypto.xor(bytes, useSecretKey() ? Crypto.ENCRYPT_KEY : Crypto.DISPATCH_KEY);
ByteBuf packet = Unpooled.wrappedBuffer(bytes); ByteBuf packet = Unpooled.wrappedBuffer(bytes);
@ -222,8 +224,9 @@ public class GameSession implements GameSessionManager.KcpChannel {
} }
@Override @Override
public void handleClose() { public void onDisconnected() {
setState(SessionState.INACTIVE); setState(SessionState.INACTIVE);
// send disconnection pack in case of reconnection // send disconnection pack in case of reconnection
Grasscutter.getLogger() Grasscutter.getLogger()
.info(translate("messages.game.disconnect", this.getAddress().toString())); .info(translate("messages.game.disconnect", this.getAddress().toString()));

View File

@ -1,114 +0,0 @@
package emu.grasscutter.server.game;
import emu.grasscutter.Grasscutter;
import emu.grasscutter.utils.Utils;
import io.netty.buffer.*;
import io.netty.channel.DefaultEventLoop;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import kcp.highway.*;
import lombok.Getter;
public class GameSessionManager {
@Getter private static final DefaultEventLoop logicThread = new DefaultEventLoop();
private static final ConcurrentHashMap<Ukcp, GameSession> sessions = new ConcurrentHashMap<>();
private static final KcpListener listener =
new KcpListener() {
@Override
public void onConnected(Ukcp ukcp) {
int times = 0;
GameServer server = Grasscutter.getGameServer();
while (server == null) { // Waiting server to establish
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
ukcp.close();
return;
}
if (times++ > 5) {
Grasscutter.getLogger().error("Service is not available!");
ukcp.close();
return;
}
server = Grasscutter.getGameServer();
}
GameSession conversation = new GameSession(server);
conversation.onConnected(
new KcpTunnel() {
@Override
public InetSocketAddress getAddress() {
return ukcp.user().getRemoteAddress();
}
@Override
public void writeData(byte[] bytes) {
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
ukcp.write(buf);
buf.release();
}
@Override
public void close() {
ukcp.close();
}
@Override
public int getSrtt() {
return ukcp.srtt();
}
});
sessions.put(ukcp, conversation);
}
@Override
public void handleReceive(ByteBuf buf, Ukcp kcp) {
var byteData = Utils.byteBufToArray(buf);
logicThread.execute(
() -> {
try {
var conversation = sessions.get(kcp);
if (conversation != null) {
conversation.handleReceive(byteData);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
@Override
public void handleException(Throwable ex, Ukcp ukcp) {}
@Override
public void handleClose(Ukcp ukcp) {
GameSession conversation = sessions.get(ukcp);
if (conversation != null) {
conversation.handleClose();
sessions.remove(ukcp);
}
}
};
public static KcpListener getListener() {
return listener;
}
public interface KcpTunnel {
InetSocketAddress getAddress();
void writeData(byte[] bytes);
void close();
int getSrtt();
}
interface KcpChannel {
void onConnected(KcpTunnel tunnel);
void handleClose();
void handleReceive(byte[] bytes);
}
}

View File

@ -0,0 +1,31 @@
package emu.grasscutter.server.game.session;
import emu.grasscutter.net.KcpTunnel;
import io.netty.buffer.Unpooled;
import kcp.highway.Ukcp;
import lombok.*;
import java.net.InetSocketAddress;
@RequiredArgsConstructor
public final class GameSessionHandler implements KcpTunnel {
@Getter private final Ukcp handle;
@Override
public InetSocketAddress getAddress() {
return this.getHandle().user().getRemoteAddress();
}
@Override
public void writeData(byte[] bytes) {
var buffer = Unpooled.wrappedBuffer(bytes);
this.getHandle().write(buffer);
buffer.release();
}
@Override
public void close() {
this.getHandle().close();
}
}

View File

@ -0,0 +1,91 @@
package emu.grasscutter.server.game.session;
import emu.grasscutter.Grasscutter;
import emu.grasscutter.server.game.*;
import emu.grasscutter.utils.Utils;
import io.netty.buffer.ByteBuf;
import kcp.highway.*;
import lombok.Getter;
import java.util.Map;
import java.util.concurrent.*;
public final class GameSessionManager implements KcpListener {
@Getter private static final GameSessionManager instance
= new GameSessionManager();
@Getter private static final ExecutorService executor
= Executors.newFixedThreadPool(4);
@Getter private static final Map<Ukcp, GameSession> sessions
= new ConcurrentHashMap<>();
/**
* Waits for the game server to be ready.
*
* @return The game server.
*/
private GameServer waitForServer() {
var server = Grasscutter.getGameServer();
var times = 0; while (server == null || !server.isStarted()) {
Utils.sleep(1000); // Wait 1s for the server to start.
if (times++ > 5) {
Grasscutter.getLogger().error("Game server has not started in a reasonable time.");
return null;
}
server = Grasscutter.getGameServer();
}
return server;
}
@Override
public void onConnected(Ukcp ukcp) {
// Fetch the game server.
var server = this.waitForServer();
if (server == null) {
ukcp.close();
return;
}
// Create a new session.
var session = sessions.compute(ukcp, (k, existing) -> {
// Close an existing session.
if (existing != null) {
existing.close();
}
return new GameSession(server);
});
// Connect the session.
session.onConnected(new GameSessionHandler(ukcp));
}
@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
// Get the session.
var session = sessions.get(ukcp);
if (session == null) {
ukcp.close(); return;
}
// Handle the message in a separate thread.
executor.submit(() -> {
var bytes = Utils.byteBufToArray(byteBuf);
session.onMessage(bytes);
});
}
@Override
public void handleException(Throwable throwable, Ukcp ukcp) {
Grasscutter.getLogger().error("Exception in game session.", throwable);
}
@Override
public void handleClose(Ukcp ukcp) {
var session = sessions.remove(ukcp);
if (session != null) {
session.close();
}
}
}