diff --git a/Cargo.toml b/Cargo.toml index 6b9409b..1d83c8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ proto = { path = "proto" } lua_serde = { path = "lua_serde" } packet-processor-macro = { path = "packet-processor-macro" } packet-processor = { path = "packet-processor" } +rs-ipc = { path = "rs-ipc" } prost = "0.8" bytes = "1.1.0" diff --git a/packet-processor/src/lib.rs b/packet-processor/src/lib.rs index e7b234f..900ebcf 100644 --- a/packet-processor/src/lib.rs +++ b/packet-processor/src/lib.rs @@ -16,7 +16,7 @@ macro_rules! register_callback { slef.$handler(user_id, &metadata, &req, &mut rsp); - let message = IpcMessage::new_from_proto(user_id, proto::PacketId::$rsp, metadata, &rsp); + let message = IpcMessage::new_from_proto(proto::PacketId::$rsp, user_id, metadata, &rsp); slef.packets_to_send_tx.send(message).unwrap(); }); }; @@ -36,8 +36,8 @@ macro_rules! build_and_send { ($self:ident, $user_id: ident, $metadata:ident, $id:ident { $($i:ident : $e:expr,)* }) => {{ $self.packets_to_send_tx.send( IpcMessage::new_from_proto( - $user_id, proto::PacketId::$id, + $user_id, $metadata, &proto::$id { $($i: $e,)* ..proto::$id::default() } ) diff --git a/rs-ipc/.gitignore b/rs-ipc/.gitignore new file mode 100644 index 0000000..77ec2cd --- /dev/null +++ b/rs-ipc/.gitignore @@ -0,0 +1,13 @@ +# Generated by Cargo +# will have compiled files and executables +/target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here http://doc.crates.io/guide.html#cargotoml-vs-cargolock +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# IDE files +.idea diff --git a/rs-ipc/Cargo.toml b/rs-ipc/Cargo.toml new file mode 100644 index 0000000..ed99215 --- /dev/null +++ b/rs-ipc/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "rs-ipc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +proto = { path = "../proto" } + +num-traits = "0.2" +prost = "0.8" +futures = "0.3" +zeromq = { version = "0.3", default-features = false, features = ["async-std-runtime", "all-transport"] } diff --git a/rs-ipc/src/ipc/message.rs b/rs-ipc/src/ipc/message.rs new file mode 100644 index 0000000..38033ec --- /dev/null +++ b/rs-ipc/src/ipc/message.rs @@ -0,0 +1,86 @@ +use std::convert::{From, TryInto}; + +use prost::Message; +use zeromq::ZmqMessage; +use num_traits::FromPrimitive; + +pub struct IpcMessage(pub proto::PacketId, pub u32, pub Vec, pub Vec); + +impl IpcMessage { + pub fn new_from_proto(packet_id: proto::PacketId, user_id: u32, metadata: &proto::PacketHead, data: &M) -> IpcMessage { + println!("Replying with {:?}", packet_id); + println!("Data: {:?}", data); + + let mut buf: Vec = vec!(); + + data.encode(&mut buf).unwrap(); + + let mut metabuf: Vec = vec!(); + + metadata.encode(&mut metabuf).unwrap(); + + return IpcMessage( + packet_id, + user_id, + metabuf, + buf + ); + } + + pub fn format_topic(topic: proto::PacketId) -> String { + format!("{:04x}", topic as u16) + } +} + +impl From> for IpcMessage { + fn from (input: Vec) -> IpcMessage { + let packet_id = String::from_utf8_lossy(&input[0..4]); + let packet_id = u16::from_str_radix(&packet_id, 16).unwrap(); + //let packet_id = (((input[0] - 48) as u16) << 12) | (((input[1] - 48) as u16) << 8) | (((input[2] - 48) as u16) << 4) | (((input[3] - 48) as u16) << 0); + let packet_id = FromPrimitive::from_u16(packet_id).unwrap(); // Should be 100% correct + + let user_id: u32 = u32::from_le_bytes(input[4..8].try_into().unwrap()); + + let metadata_len: u32 = u32::from_le_bytes(input[8..12].try_into().unwrap()); + let data_len: u32 = u32::from_le_bytes(input[12..16].try_into().unwrap()); + + let metadata = input[16..(metadata_len+16) as usize].to_owned(); + let data = input[(metadata_len+16) as usize..(data_len+metadata_len+16) as usize].to_owned(); + + IpcMessage(packet_id, user_id, metadata, data) + } +} + +impl From for IpcMessage { + fn from (input: ZmqMessage) -> IpcMessage { + // ZmqMessage::into_vec returns a vector of Bytes object + // We flat_map them into Vec + let input: Vec = input.into_vec().iter().flat_map(|b| b.to_vec()).collect(); + + input.into() + } +} + +impl From for Vec { + fn from (input: IpcMessage) -> Vec { + let mut data: Vec = vec![]; + + data.extend_from_slice(IpcMessage::format_topic(input.0).as_bytes()); + data.extend_from_slice(&input.1.to_le_bytes()); + data.extend_from_slice(&(input.2.len() as u32).to_le_bytes()); + data.extend_from_slice(&(input.3.len() as u32).to_le_bytes()); + + data.extend_from_slice(&input.2); + + data.extend_from_slice(&input.3); + + data + } +} + +impl From for ZmqMessage { + fn from (input: IpcMessage) -> ZmqMessage { + let input: Vec = input.into(); + input.into() + } +} \ No newline at end of file diff --git a/rs-ipc/src/ipc/mod.rs b/rs-ipc/src/ipc/mod.rs new file mode 100644 index 0000000..25128ad --- /dev/null +++ b/rs-ipc/src/ipc/mod.rs @@ -0,0 +1,5 @@ +mod message; +mod socket; + +pub use message::IpcMessage; +pub use socket::{SubSocket, PubSocket, PushSocket, PullSocket, Result}; \ No newline at end of file diff --git a/rs-ipc/src/ipc/socket.rs b/rs-ipc/src/ipc/socket.rs new file mode 100644 index 0000000..b8bc117 --- /dev/null +++ b/rs-ipc/src/ipc/socket.rs @@ -0,0 +1,169 @@ +use std::fmt::{Debug, Error, Formatter}; +use std::result::Result as StdResult; + +use zeromq::{Socket, SocketRecv, SocketSend, ZmqResult}; + +use proto::PacketId; + +use crate::IpcMessage; + +pub type Result = ZmqResult; + +/* + This is used to convert async operations into sync ones + */ +trait Block { + fn wait(self) -> ::Output + where Self: Sized, Self: futures::Future + { + futures::executor::block_on(self) + } +} + +impl Block for F + where F: futures::Future +{} + +// ------------- + +// Socket for client subscription; can only receive data +pub struct SubSocket { + socket: zeromq::SubSocket, +} + +// Socket for server to publish the data; can only transmit data +pub struct PubSocket { + socket: zeromq::PubSocket, +} + +// Socket for pushing the data towards the receiver +pub struct PushSocket { + socket: zeromq::PushSocket, +} + +// Socket for pulling the data +pub struct PullSocket { + socket: zeromq::PullSocket, +} + +impl Debug for PushSocket { + fn fmt(&self, _: &mut Formatter<'_>) -> StdResult<(), Error> { + return Ok(()) + } +} + +impl SubSocket { + pub fn connect_tcp(address: &str, port: u16) -> Result { + let mut socket = zeromq::SubSocket::new(); + + socket.connect(&format!("tcp://{}:{}", address, port)).wait()?; + + Ok(SubSocket { + socket: socket, + }) + } + + pub fn connect_unix(address: &str) -> Result { + let mut socket = zeromq::SubSocket::new(); + + socket.connect(&format!("ipc://{}", address)).wait()?; + + Ok(SubSocket { + socket: socket, + }) + } + + pub fn subscribe(&mut self, topics: Vec) -> Result<()> { + for topic in topics { + self.socket.subscribe(&IpcMessage::format_topic(topic)).wait()?; + } + + Ok(()) + } + + pub fn subscribe_all(&mut self) -> Result<()> { + Ok(self.socket.subscribe("").wait()?) + } + + pub fn recv(&mut self) -> Result { + Ok(self.socket.recv().wait()?.into()) + } +} + +impl PubSocket { + pub fn bind_tcp(address: &str, port: u16) -> Result { + let mut socket = zeromq::PubSocket::new(); + + socket.bind(&format!("tcp://{}:{}", address, port)).wait()?; + + Ok(PubSocket { + socket: socket, + }) + } + + pub fn bind_unix(address: &str) -> Result { + let mut socket = zeromq::PubSocket::new(); + + socket.bind(&format!("ipc://{}", address)).wait()?; + + Ok(PubSocket { + socket: socket, + }) + } + + pub fn send(&mut self, message: IpcMessage) -> Result<()> { + Ok(self.socket.send( message.into() ).wait()?) + } +} + +impl PushSocket { + pub fn connect_tcp(address: &str, port: u16) -> Result { + let mut socket = zeromq::PushSocket::new(); + + socket.connect(&format!("tcp://{}:{}", address, port)).wait()?; + + Ok(PushSocket { + socket: socket, + }) + } + + pub fn connect_unix(address: &str) -> Result { + let mut socket = zeromq::PushSocket::new(); + + socket.connect(&format!("ipc://{}", address)).wait()?; + + Ok(PushSocket { + socket: socket, + }) + } + + pub fn send(&mut self, message: IpcMessage) -> Result<()> { + Ok(self.socket.send( message.into() ).wait()?) + } +} + +impl PullSocket { + pub fn bind_tcp(address: &str, port: u16) -> Result { + let mut socket = zeromq::PullSocket::new(); + + socket.bind(&format!("tcp://{}:{}", address, port)).wait()?; + + Ok(PullSocket { + socket: socket, + }) + } + + pub fn bind_unix(address: &str) -> Result { + let mut socket = zeromq::PullSocket::new(); + + socket.bind(&format!("ipc://{}", address)).wait()?; + + Ok(PullSocket { + socket: socket, + }) + } + + pub fn recv(&mut self) -> Result { + Ok(self.socket.recv().wait()?.into()) + } +} \ No newline at end of file diff --git a/rs-ipc/src/lib.rs b/rs-ipc/src/lib.rs new file mode 100644 index 0000000..0582df2 --- /dev/null +++ b/rs-ipc/src/lib.rs @@ -0,0 +1,4 @@ +mod ipc; + +pub use ipc::IpcMessage; +pub use ipc::{SubSocket, PubSocket, PushSocket, PullSocket, Result}; \ No newline at end of file diff --git a/src/entitymanager/entity_manager.rs b/src/entitymanager/entity_manager.rs index 1cfbe89..470876d 100644 --- a/src/entitymanager/entity_manager.rs +++ b/src/entitymanager/entity_manager.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use prost::Message; @@ -18,9 +18,10 @@ use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::utils::{IdManager, TimeManager}; use crate::luamanager::Vector; +use crate::node::NodeConfig; use super::entities::Entity; -#[derive(Debug, Clone)] +#[derive(Debug)] struct Player { player_id: u32, pos: Vector, @@ -30,7 +31,7 @@ struct Player { lua_manager: Arc, json_manager: Arc, db_manager: Arc, - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, } impl Player { @@ -38,7 +39,7 @@ impl Player { const SPAWN_DISTANCE: f32 = Self::DESPAWN_DISTANCE * 0.8; const RESPAWN_TIME: i32 = 10; // In seconds - pub fn despawn_everything(&self) { + pub fn despawn_everything(&mut self) { let entity_list: Vec = self.entities.iter().map(|(k, v)| *k).collect(); if entity_list.len() > 0 { @@ -118,7 +119,7 @@ impl Player { } } - pub fn enter_scene(&self, enter_type: &proto::EnterType, token: u32) { + pub fn enter_scene(&mut self, enter_type: &proto::EnterType, token: u32) { let world_level = self.db_manager.get_player_world_level(self.player_id).unwrap() as u32; let player_id = self.player_id; @@ -153,7 +154,7 @@ impl Player { } pub struct EntityManager { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, players: Arc>>, players_moved: Sender, lua_manager: Arc, @@ -162,11 +163,11 @@ pub struct EntityManager { } impl EntityManager { - pub fn new(lua_manager: Arc, json_manager: Arc, db_manager: Arc, packets_to_send_tx: Sender) -> Self { + pub fn new(lua_manager: Arc, json_manager: Arc, db_manager: Arc, node_config: &NodeConfig) -> Self { let (tx, rx): (Sender, Receiver) = mpsc::channel(); let mut es = Self { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), players_moved: tx, players: Arc::new(Mutex::new(HashMap::new())), lua_manager: lua_manager, @@ -252,7 +253,7 @@ impl EntityManager { player.enter_scene(reason, token); }, Vacant(entry) => { - let player = Player { + let mut player = Player { player_id: user_id, pos: pos, current_block: 0, @@ -261,7 +262,7 @@ impl EntityManager { lua_manager: self.lua_manager.clone(), json_manager: self.json_manager.clone(), db_manager: self.db_manager.clone(), - packets_to_send_tx: self.packets_to_send_tx.clone(), + packets_to_send_tx: NodeConfig::new().connect_out_queue().unwrap(),//self.packets_to_send_tx.clone(), }; player.enter_scene(reason, token); diff --git a/src/main.rs b/src/main.rs index 634bb16..c0f3c80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,8 @@ extern crate num_derive; use std::thread; +mod node; + mod server; mod utils; mod dbmanager; @@ -26,6 +28,8 @@ use entitymanager::EntityManager; fn main() { //pretty_env_logger::init(); + //let mut rt_main = tokio::runtime::Runtime::new().unwrap(); + tracing_subscriber::fmt() .with_max_level(tracing::Level::DEBUG) .with_test_writer() diff --git a/src/node/config.rs b/src/node/config.rs new file mode 100644 index 0000000..dd889fa --- /dev/null +++ b/src/node/config.rs @@ -0,0 +1,35 @@ +use rs_ipc::{PubSocket, PullSocket, PushSocket, Result, SubSocket}; + +pub struct NodeConfig { + pub in_queue_addr: String, + pub in_queue_port: u16, + pub out_queue_addr: String, + pub out_queue_port: u16, +} + +impl NodeConfig { + pub fn new() -> Self { + NodeConfig { + in_queue_addr: "127.0.0.1".to_string(), + in_queue_port: 9012, + out_queue_addr: "127.0.0.1".to_string(), + out_queue_port: 9014, + } + } + + pub fn bind_in_queue(&self) -> Result { + PubSocket::bind_tcp(&self.in_queue_addr, self.in_queue_port) + } + + pub fn bind_out_queue(&self) -> Result { + PullSocket::bind_tcp(&self.out_queue_addr, self.out_queue_port) + } + + pub fn connect_in_queue(&self) -> Result { + SubSocket::connect_tcp(&self.in_queue_addr, self.in_queue_port) + } + + pub fn connect_out_queue(&self) -> Result { + PushSocket::connect_tcp(&self.out_queue_addr, self.out_queue_port) + } +} \ No newline at end of file diff --git a/src/node/mod.rs b/src/node/mod.rs new file mode 100644 index 0000000..6368087 --- /dev/null +++ b/src/node/mod.rs @@ -0,0 +1,3 @@ +mod config; + +pub use config::NodeConfig; \ No newline at end of file diff --git a/src/server/auth_manager.rs b/src/server/auth_manager.rs index 84c093b..9619014 100644 --- a/src/server/auth_manager.rs +++ b/src/server/auth_manager.rs @@ -8,28 +8,30 @@ use std::convert::TryInto; use prost::Message; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use packet_processor_macro::*; #[macro_use] use packet_processor::*; +use crate::node::NodeConfig; #[packet_processor(GetPlayerTokenReq)] pub struct AuthManager { conv_to_user: HashMap, user_to_conv: HashMap, - packets_to_send_tx: mpsc::Sender, + //packets_to_send_tx: mpsc::Sender, + packets_to_send_tx: PushSocket, } impl AuthManager { pub const SPOOFED_PLAYER_UID: u32 = 1337; - pub fn new(packets_to_send_tx: mpsc::Sender) -> AuthManager { + pub fn new(node_config: &NodeConfig) -> AuthManager { let mut am = AuthManager { conv_to_user: HashMap::new(), user_to_conv: HashMap::new(), packet_callbacks: HashMap::new(), - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), }; am.register(); diff --git a/src/server/game_server.rs b/src/server/game_server.rs index 76d171c..61b42b9 100644 --- a/src/server/game_server.rs +++ b/src/server/game_server.rs @@ -1,11 +1,12 @@ -use std::sync::mpsc; +use std::sync::{mpsc, Mutex}; use std::thread; use std::collections::HashMap; use std::collections::hash_map::Entry::{Occupied, Vacant}; +use rs_ipc::{SubSocket, IpcMessage, PushSocket}; + use crate::server::GameWorld; use packet_processor::PacketProcessor; -use crate::server::IpcMessage; use crate::{DatabaseManager, EntitySubsystem}; use crate::JsonManager; @@ -13,12 +14,32 @@ use crate::LuaManager; use crate::server::LoginManager; use std::sync::Arc; use crate::entitymanager::EntityManager; +use crate::node::NodeConfig; use crate::subsystems::{InventorySubsystem, NpcSubsystem, ShopSubsystem}; use crate::subsystems::misc::{PauseSubsystem, SceneSubsystem, SocialSubsystem, TeleportSubsystem}; +/* + This is used to convert async operations into sync ones + */ +trait Block { + fn wait(self) -> ::Output + where Self: Sized, Self: futures::Future + { + futures::executor::block_on(self) + } +} + +impl Block for F + where F: futures::Future +{} + +// ------------- + pub struct GameServer { - packets_to_process_rx: mpsc::Receiver, - packets_to_send_tx: mpsc::Sender, + //packets_to_process_rx: mpsc::Receiver, + packets_to_process_rx: SubSocket, + //packets_to_send_tx: mpsc::Sender, + //packets_to_send_tx: PushSocket, worlds: HashMap, login_manager: LoginManager, database_manager: Arc, @@ -27,26 +48,30 @@ pub struct GameServer { } impl GameServer { - pub fn new(packets_to_process_rx: mpsc::Receiver, packets_to_send_tx: mpsc::Sender) -> GameServer { + //pub fn new(packets_to_process_rx: mpsc::Receiver, packets_to_send_tx: mpsc::Sender) -> GameServer { + pub fn new(node_config: &NodeConfig) -> GameServer { let jm = Arc::new(JsonManager::new("./data/json")); let db = Arc::new(DatabaseManager::new("sqlite://./database.db3", jm.clone())); let lum = Arc::new(LuaManager::new("./data/lua", &jm.clone())); - let em = Arc::new(EntityManager::new(lum.clone(),jm.clone(), db.clone(), packets_to_send_tx.clone())); - let lm = LoginManager::new(db.clone(), jm.clone(), em.clone(),packets_to_send_tx.clone()); + let em = Arc::new(EntityManager::new(lum.clone(),jm.clone(), db.clone(), node_config)); + let lm = LoginManager::new(db.clone(), jm.clone(), em.clone(),node_config); - let inv = Arc::new(InventorySubsystem::new(jm.clone(), db.clone(), packets_to_send_tx.clone())); + let inv = InventorySubsystem::new(jm.clone(), db.clone(), node_config); - let es = EntitySubsystem::new(lum.clone(), jm.clone(), db.clone(), em.clone(), packets_to_send_tx.clone()); - let nt = NpcSubsystem::new(packets_to_send_tx.clone()); - let ss = ShopSubsystem::new(jm.clone(), db.clone(), inv.clone(), packets_to_send_tx.clone()); - let scs = SceneSubsystem::new(db.clone(), packets_to_send_tx.clone()); - let ps = PauseSubsystem::new(packets_to_send_tx.clone()); - let socs = SocialSubsystem::new(db.clone(), packets_to_send_tx.clone()); - let ts = TeleportSubsystem::new(jm.clone(), db.clone(), em.clone(), packets_to_send_tx.clone()); + let es = EntitySubsystem::new(lum.clone(), jm.clone(), db.clone(), em.clone(), node_config); + let nt = NpcSubsystem::new(node_config); + let ss = ShopSubsystem::new(jm.clone(), db.clone(), Mutex::new(inv), node_config); + let scs = SceneSubsystem::new(db.clone(), node_config); + let ps = PauseSubsystem::new(node_config); + let socs = SocialSubsystem::new(db.clone(), node_config); + let ts = TeleportSubsystem::new(jm.clone(), db.clone(), em.clone(), node_config); + + let mut packets_to_process_rx = node_config.connect_in_queue().unwrap(); + packets_to_process_rx.subscribe_all(); + //let mut packets_to_send_tx = PushSocket::connect_tcp("127.0.0.1", 9014).unwrap(); let gs = GameServer { packets_to_process_rx: packets_to_process_rx, - packets_to_send_tx: packets_to_send_tx, worlds: HashMap::new(), login_manager: lm, database_manager: db.clone(), @@ -66,7 +91,7 @@ impl GameServer { }); loop { - let IpcMessage(user_id, packet_id, metadata, data) = self.packets_to_process_rx.recv().unwrap(); + let IpcMessage(packet_id, user_id, metadata, data) = self.packets_to_process_rx.recv().unwrap(); if (self.login_manager.is_supported(&packet_id)) { self.login_manager.process(user_id, packet_id, metadata, data); @@ -75,7 +100,7 @@ impl GameServer { let world = match self.worlds.entry(user_id) { Occupied(world) => world.into_mut(), Vacant(entry) => { - let world = GameWorld::new(self.database_manager.clone(),self.json_manager.clone(), self.packets_to_send_tx.clone()); + let world = GameWorld::new(self.database_manager.clone(),self.json_manager.clone()/*, self.packets_to_send_tx.clone()*/); entry.insert(world) }, }; diff --git a/src/server/game_world.rs b/src/server/game_world.rs index 66cd2b0..4b5f4b3 100644 --- a/src/server/game_world.rs +++ b/src/server/game_world.rs @@ -7,7 +7,7 @@ use prost::Message; use chrono::Datelike; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use crate::utils::{AvatarBuilder, Remapper}; @@ -40,13 +40,16 @@ macro_rules! collection { EnterWorldAreaReq, )] pub struct GameWorld { - packets_to_send_tx: mpsc::Sender, + //packets_to_send_tx: mpsc::Sender, + packets_to_send_tx: PushSocket, db: Arc, jm: Arc, } impl GameWorld { - pub fn new(db: Arc, jm: Arc, packets_to_send_tx: mpsc::Sender) -> GameWorld { + pub fn new(db: Arc, jm: Arc/*, packets_to_send_tx: mpsc::Sender*/) -> GameWorld { + let mut packets_to_send_tx = PushSocket::connect_tcp("127.0.0.1", 9014).unwrap(); + let mut gw = GameWorld { packets_to_send_tx: packets_to_send_tx, db: db.clone(), @@ -63,7 +66,7 @@ impl GameWorld { rsp.client_time = req.client_time; } - fn process_enter_scene_ready(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneReadyReq, rsp: &mut proto::EnterSceneReadyRsp) { + fn process_enter_scene_ready(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneReadyReq, rsp: &mut proto::EnterSceneReadyRsp) { rsp.enter_scene_token = req.enter_scene_token; let current_scene_info = match self.db.get_player_scene_info(user_id) { @@ -79,7 +82,7 @@ impl GameWorld { }); } - fn process_scene_init_finish(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::SceneInitFinishReq, rsp: &mut proto::SceneInitFinishRsp) { + fn process_scene_init_finish(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::SceneInitFinishReq, rsp: &mut proto::SceneInitFinishRsp) { let (current_avatar_guid, current_team_id) = match self.db.get_player_team_selection(user_id) { Some(team_selection) => (team_selection.avatar, team_selection.team), None => panic!("Team selection info not found for user {}!", user_id), @@ -205,7 +208,7 @@ impl GameWorld { }); } - fn process_enter_scene_done(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneDoneReq, rsp: &mut proto::EnterSceneDoneRsp) { + fn process_enter_scene_done(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneDoneReq, rsp: &mut proto::EnterSceneDoneRsp) { rsp.enter_scene_token = req.enter_scene_token; build_and_send!(self, user_id, metadata, SceneEntityAppearNotify { diff --git a/src/server/login_manager.rs b/src/server/login_manager.rs index ce30de3..ec142ea 100644 --- a/src/server/login_manager.rs +++ b/src/server/login_manager.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use prost::Message; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use packet_processor_macro::*; #[macro_use] @@ -17,20 +17,21 @@ use crate::utils::TimeManager; use crate::dbmanager::database_manager::AvatarInfo as DbAvatarInfo; use crate::entitymanager::EntityManager; +use crate::node::NodeConfig; #[packet_processor(PlayerLoginReq)] pub struct LoginManager { - packets_to_send_tx: mpsc::Sender, + packets_to_send_tx: PushSocket, db: Arc, jm: Arc, em: Arc, } impl LoginManager { - pub fn new(db: Arc, jm: Arc, em: Arc, packets_to_send_tx: mpsc::Sender) -> LoginManager { + pub fn new(db: Arc, jm: Arc, em: Arc, node_config: &NodeConfig) -> LoginManager { let mut lm = LoginManager { packet_callbacks: HashMap::new(), - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), db: db, jm: jm, em: em, @@ -41,7 +42,7 @@ impl LoginManager { return lm; } - fn process_player_login(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::PlayerLoginReq, rsp: &mut proto::PlayerLoginRsp) { + fn process_player_login(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::PlayerLoginReq, rsp: &mut proto::PlayerLoginRsp) { let user = match self.db.get_player_info(user_id) { Some(user) => user, None => panic!("User {} not found!", user_id), diff --git a/src/server/mod.rs b/src/server/mod.rs index b82347a..df302c5 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -4,7 +4,6 @@ mod game_world; mod auth_manager; mod login_manager; mod client_connection; -mod ipc_message; pub use self::network_server::NetworkServer; pub use self::game_server::GameServer; @@ -12,4 +11,3 @@ pub use self::game_world::GameWorld; pub use self::auth_manager::AuthManager; pub use self::login_manager::LoginManager; pub use self::client_connection::ClientConnection; -pub use self::ipc_message::IpcMessage; diff --git a/src/server/network_server.rs b/src/server/network_server.rs index 0a1dfbf..5961fcc 100644 --- a/src/server/network_server.rs +++ b/src/server/network_server.rs @@ -18,7 +18,7 @@ use crate::server::ClientConnection; use crate::server::GameServer; use crate::server::AuthManager; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PullSocket, PushSocket}; use proto::PacketHead; use proto::GetPlayerTokenRsp; @@ -26,15 +26,35 @@ use proto::get_player_token_rsp; use prost::Message; +use rs_ipc::{SubSocket, PubSocket}; + use packet_processor::{PacketProcessor, EasilyUnpackable}; +use crate::node::NodeConfig; extern crate kcp; +/* + This is used to convert async operations into sync ones + */ +trait Block { + fn wait(self) -> ::Output + where Self: Sized, Self: futures::Future + { + futures::executor::block_on(self) + } +} + +impl Block for F + where F: futures::Future +{} + +// ------------- + pub struct NetworkServer { socket: UdpSocket, clients: Arc>>, - packets_to_process_tx: Option>, - packets_to_send_tx: Option>, + node_config: NodeConfig, + packets_to_process_tx: PubSocket, } #[derive(Debug, Clone)] @@ -56,14 +76,18 @@ impl fmt::Display for NetworkServerError { impl NetworkServer { pub fn new(host: &str, port: i16) -> Result { + let node_config = NodeConfig::new(); + + let mut packets_to_process_tx = node_config.bind_in_queue().unwrap(); + let gs = NetworkServer { socket: match UdpSocket::bind(format!("{}:{}", host, port).to_string()) { Ok(socket) => socket, Err(e) => return Err(NetworkServerError::new(format!("Failed to bind socket: {}", e).as_str())), }, clients: Arc::new(Mutex::new(HashMap::new())), - packets_to_process_tx: None, - packets_to_send_tx: None, + node_config: node_config, + packets_to_process_tx: packets_to_process_tx, }; print!("Connection established\n"); @@ -74,22 +98,15 @@ impl NetworkServer { pub fn run(&mut self) -> Result { print!("Starting server\n"); - // Channel for relaying packets from network thread to processing thread - let (packets_to_process_tx, packets_to_process_rx) = mpsc::channel(); - - // Channel for relaying packets from network thread to processing thread - let (packets_to_send_tx, packets_to_send_rx) = mpsc::channel(); - - self.packets_to_process_tx = Some(packets_to_process_tx); - self.packets_to_send_tx = Some(packets_to_send_tx.clone()); // TODO: hack! + let mut packets_to_send_rx = self.node_config.bind_out_queue().unwrap(); let clients = self.clients.clone(); - let mut auth_manager = Arc::new(Mutex::new(AuthManager::new(packets_to_send_tx.clone()))); + let mut auth_manager = Arc::new(Mutex::new(AuthManager::new(&self.node_config))); let am = auth_manager.clone(); let packet_relaying_thread = thread::spawn(move || { loop { - let IpcMessage(user_id, packet_id, metadata, data) = packets_to_send_rx.recv().unwrap(); + let IpcMessage(packet_id, user_id, metadata, data) = packets_to_send_rx.recv().unwrap(); let conv = match packet_id { proto::PacketId::GetPlayerTokenRsp => user_id, // Mapping is not performed on those @@ -115,7 +132,8 @@ impl NetworkServer { }); let game_thread = thread::spawn(move || { - let mut gs = GameServer::new(packets_to_process_rx, packets_to_send_tx); + let node_config = NodeConfig::new(); + let mut gs = GameServer::new(&node_config); gs.run(); }); @@ -222,10 +240,10 @@ impl NetworkServer { fn send_packet_to_process(&mut self, user_id: u32, packet_id: u16, metadata: &[u8], data: &[u8]) { - let sender = match &self.packets_to_process_tx { - Some(sender) => sender, + /*let sender: &mut PubSocket = match &self.packets_to_process_tx { + Some(mut sender) => &mut sender, None => panic!("Processing queue wasn't set up!"), - }; + };*/ let packet_id: proto::PacketId = match FromPrimitive::from_u16(packet_id) { Some(packet_id) => packet_id, @@ -236,6 +254,6 @@ impl NetworkServer { }; println!("Got packet {:?}", packet_id); - sender.send( IpcMessage(user_id, packet_id, metadata.to_vec(), data.to_vec()) ).unwrap(); + self.packets_to_process_tx.send( IpcMessage(packet_id, user_id, metadata.to_vec(), data.to_vec()) ); } } diff --git a/src/subsystems/entity_subsystem/entity_subsystem.rs b/src/subsystems/entity_subsystem/entity_subsystem.rs index b1cd3f0..8b8fc0b 100644 --- a/src/subsystems/entity_subsystem/entity_subsystem.rs +++ b/src/subsystems/entity_subsystem/entity_subsystem.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use prost::Message; @@ -19,12 +19,13 @@ use crate::entitymanager::EntityManager; use crate::utils::{IdManager, TimeManager}; use crate::luamanager::Vector; +use crate::node::NodeConfig; #[packet_processor( CombatInvocationsNotify, )] pub struct EntitySubsystem { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, lua_manager: Arc, json_manager: Arc, db_manager: Arc, @@ -32,9 +33,9 @@ pub struct EntitySubsystem { } impl EntitySubsystem { - pub fn new(lua_manager: Arc, json_manager: Arc, db_manager: Arc, entity_manager: Arc, packets_to_send_tx: Sender) -> EntitySubsystem { + pub fn new(lua_manager: Arc, json_manager: Arc, db_manager: Arc, entity_manager: Arc, node_config: &NodeConfig) -> EntitySubsystem { let mut es = EntitySubsystem { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), packet_callbacks: HashMap::new(), lua_manager: lua_manager, json_manager: json_manager, @@ -47,7 +48,7 @@ impl EntitySubsystem { return es; } - fn process_combat_invocations(&self, user_id: u32, metadata: &proto::PacketHead, notify: &proto::CombatInvocationsNotify) { + fn process_combat_invocations(&mut self, user_id: u32, metadata: &proto::PacketHead, notify: &proto::CombatInvocationsNotify) { for invoke in notify.invoke_list.iter() { self.handle_invoke(user_id, metadata, invoke); self.forward_invoke(user_id, metadata, invoke); @@ -117,7 +118,7 @@ impl EntitySubsystem { */ // Main function - fn forward_invoke(&self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { + fn forward_invoke(&mut self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { match ForwardType::from_i32(invoke.forward_type).unwrap() { // Panics in case of unknown (undescribed in protobuf file) forward type ForwardType::ForwardLocal => self.fw_default(user_id, metadata, invoke), ForwardType::ForwardToAll => self.fw_to_all(user_id, metadata, invoke), @@ -132,12 +133,12 @@ impl EntitySubsystem { } } - fn fw_default(&self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { + fn fw_default(&mut self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { // TODO: this handler is just a stub! println!("Unhandled CIN forward: {:?}", invoke); } - fn fw_to_all(&self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { + fn fw_to_all(&mut self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { // TODO: this handler sends data only back to the user itself for now! build_and_send!(self, user_id, metadata, CombatInvocationsNotify { invoke_list: vec![invoke.clone()], diff --git a/src/subsystems/inventory_subsystem/inventory_subsystem.rs b/src/subsystems/inventory_subsystem/inventory_subsystem.rs index 7e685dd..df177fe 100644 --- a/src/subsystems/inventory_subsystem/inventory_subsystem.rs +++ b/src/subsystems/inventory_subsystem/inventory_subsystem.rs @@ -1,27 +1,28 @@ use std::sync::{Arc, mpsc}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use crate::{DatabaseManager, JsonManager}; #[macro_use] use packet_processor::*; +use crate::node::NodeConfig; pub struct InventorySubsystem { - packets_to_send_tx: mpsc::Sender, + packets_to_send_tx: PushSocket, db: Arc, jm: Arc, } impl InventorySubsystem { - pub fn new(jm: Arc, db: Arc, packets_to_send_tx: mpsc::Sender) -> Self { + pub fn new(jm: Arc, db: Arc, node_config: &NodeConfig) -> Self { Self { - packets_to_send_tx: packets_to_send_tx.clone(), + packets_to_send_tx: node_config.connect_out_queue().unwrap(), db: db.clone(), jm: jm.clone(), } } - pub fn add_item(&self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType, inform_user: bool) { + pub fn add_item(&mut self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType, inform_user: bool) { let (item, is_new) = if self.jm.is_item_weapon(item_id) || self.jm.is_item_reliquary(item_id) { assert!(count == 1); (self.db.add_equip(user_id, item_id).unwrap(), false) // TODO: is new equip considered a new item? @@ -48,7 +49,7 @@ impl InventorySubsystem { }); } - pub fn sub_item(&self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType) { + pub fn sub_item(&mut self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType) { let old_amount = self.db.get_item_count_by_item_id(user_id, item_id); assert!(old_amount >= count); diff --git a/src/subsystems/misc/npc.rs b/src/subsystems/misc/npc.rs index cd6ac74..e0f273a 100644 --- a/src/subsystems/misc/npc.rs +++ b/src/subsystems/misc/npc.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use prost::Message; @@ -15,19 +15,20 @@ use packet_processor_macro::*; use packet_processor::*; use serde_json::de::Read; use crate::{DatabaseManager, JsonManager, LuaManager}; +use crate::node::NodeConfig; use crate::utils::{IdManager, TimeManager}; #[packet_processor( NpcTalkReq, )] pub struct NpcSubsystem { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, } impl NpcSubsystem { - pub fn new(packets_to_send_tx: Sender) -> Self { + pub fn new(node_config: &NodeConfig) -> Self { let mut nt = Self { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), packet_callbacks: HashMap::new(), }; @@ -44,4 +45,4 @@ impl NpcSubsystem { rsp.cur_talk_id = req.talk_id; rsp.entity_id = req.entity_id; } -} \ No newline at end of file +} diff --git a/src/subsystems/misc/pause.rs b/src/subsystems/misc/pause.rs index 5deeee8..03a9b03 100644 --- a/src/subsystems/misc/pause.rs +++ b/src/subsystems/misc/pause.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use prost::Message; @@ -15,19 +15,20 @@ use packet_processor_macro::*; use packet_processor::*; use serde_json::de::Read; use crate::{DatabaseManager, JsonManager, LuaManager}; +use crate::node::NodeConfig; use crate::utils::{IdManager, TimeManager}; #[packet_processor( PlayerSetPauseReq, )] pub struct PauseSubsystem { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, } impl PauseSubsystem { - pub fn new(packets_to_send_tx: Sender) -> Self { + pub fn new(node_config: &NodeConfig) -> Self { let mut ps = Self { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), packet_callbacks: HashMap::new(), }; @@ -39,4 +40,4 @@ impl PauseSubsystem { fn process_player_set_pause(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::PlayerSetPauseReq, rsp: &mut proto::PlayerSetPauseRsp) { // Nothing to do here, maybe check req.is_paused } -} \ No newline at end of file +} diff --git a/src/subsystems/misc/scene.rs b/src/subsystems/misc/scene.rs index f25be39..57e25c4 100644 --- a/src/subsystems/misc/scene.rs +++ b/src/subsystems/misc/scene.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use prost::Message; @@ -15,6 +15,7 @@ use packet_processor_macro::*; use packet_processor::*; use serde_json::de::Read; use crate::{DatabaseManager, JsonManager, LuaManager}; +use crate::node::NodeConfig; use crate::utils::{IdManager, TimeManager}; #[packet_processor( @@ -22,14 +23,14 @@ GetSceneAreaReq, GetScenePointReq, )] pub struct SceneSubsystem { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, db: Arc, } impl SceneSubsystem { - pub fn new(db: Arc, packets_to_send_tx: Sender) -> Self { + pub fn new(db: Arc, node_config: &NodeConfig) -> Self { let mut scs = Self { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), packet_callbacks: HashMap::new(), db: db, }; @@ -56,12 +57,12 @@ impl SceneSubsystem { rsp.scene_id = scene_id; // TODO: implemented but for the sake of debugging we hardcode it for now - rsp.unlocked_point_list = (1..300).collect(); + rsp.unlocked_point_list = (1..500).collect(); //rsp.unlocked_point_list = self.db.get_scene_trans_points(user_id, scene_id); // TODO: hardcoded data! - rsp.unlock_area_list = (1..20).collect(); + rsp.unlock_area_list = (1..50).collect(); //locked_point_list=vec![]; } -} \ No newline at end of file +} diff --git a/src/subsystems/misc/shop.rs b/src/subsystems/misc/shop.rs index a79e8c7..8349f81 100644 --- a/src/subsystems/misc/shop.rs +++ b/src/subsystems/misc/shop.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use prost::Message; @@ -15,6 +15,7 @@ use packet_processor_macro::*; use packet_processor::*; use serde_json::de::Read; use crate::{DatabaseManager, JsonManager, LuaManager}; +use crate::node::NodeConfig; use crate::subsystems::InventorySubsystem; use crate::utils::{IdManager, TimeManager}; @@ -23,20 +24,20 @@ GetShopReq, BuyGoodsReq, )] pub struct ShopSubsystem { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, json_manager: Arc, db_manager: Arc, - inventory: Arc, + inventory: Mutex, } impl ShopSubsystem { - pub fn new(jm: Arc, db: Arc, inv: Arc, packets_to_send_tx: Sender) -> Self { + pub fn new(jm: Arc, db: Arc, inv: Mutex, node_config: &NodeConfig) -> Self { let mut ss = Self { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), packet_callbacks: HashMap::new(), json_manager: jm.clone(), db_manager: db.clone(), - inventory: inv.clone(), + inventory: inv, }; ss.register(); @@ -115,7 +116,7 @@ impl ShopSubsystem { })); } - fn process_buy_goods(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::BuyGoodsReq, rsp: &mut proto::BuyGoodsRsp) { + fn process_buy_goods(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::BuyGoodsReq, rsp: &mut proto::BuyGoodsRsp) { // Buying goods can produce the following packets: // 1) Response packet // 2) AddHintNotify (to show nice graphical image to user) @@ -143,7 +144,7 @@ impl ShopSubsystem { let total_count = goods_item.count * req.buy_count; // Ok, now add item to user's inventory and show nice graphical hint - self.inventory.add_item(user_id, metadata, goods_item.item_id, total_count, &proto::ActionReasonType::ActionReasonShop, true); + self.inventory.lock().unwrap().add_item(user_id, metadata, goods_item.item_id, total_count, &proto::ActionReasonType::ActionReasonShop, true); // Tell the client to update / delete currency used @@ -155,4 +156,4 @@ impl ShopSubsystem { // TODO: handle daily, weekly and monthly updates (TimeManager::timestamp() + 86400) as u32 } -} \ No newline at end of file +} diff --git a/src/subsystems/misc/social.rs b/src/subsystems/misc/social.rs index b0a4bcc..dce135e 100644 --- a/src/subsystems/misc/social.rs +++ b/src/subsystems/misc/social.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use chrono::Datelike; @@ -17,6 +17,7 @@ use packet_processor_macro::*; use packet_processor::*; use serde_json::de::Read; use crate::{DatabaseManager, JsonManager, LuaManager}; +use crate::node::NodeConfig; use crate::utils::{IdManager, TimeManager}; #[packet_processor( @@ -25,14 +26,14 @@ GetPlayerFriendListReq, GetPlayerSocialDetailReq, )] pub struct SocialSubsystem { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, db: Arc, } impl SocialSubsystem { - pub fn new(db: Arc, packets_to_send_tx: Sender) -> Self { + pub fn new(db: Arc, node_config: &NodeConfig) -> Self { let mut socs = Self { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), packet_callbacks: HashMap::new(), db: db.clone(), }; @@ -90,4 +91,4 @@ impl SocialSubsystem { rsp.detail_data = Some(details); } -} \ No newline at end of file +} diff --git a/src/subsystems/misc/teleport.rs b/src/subsystems/misc/teleport.rs index 644b285..2338a6b 100644 --- a/src/subsystems/misc/teleport.rs +++ b/src/subsystems/misc/teleport.rs @@ -3,7 +3,7 @@ use std::thread; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use crate::server::IpcMessage; +use rs_ipc::{IpcMessage, PushSocket}; use prost::Message; @@ -17,6 +17,7 @@ use serde_json::de::Read; use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::entitymanager::EntityManager; use crate::luamanager::Vector; +use crate::node::NodeConfig; use crate::utils::{IdManager, TimeManager}; #[packet_processor( @@ -24,16 +25,16 @@ SceneTransToPointReq, UnlockTransPointReq, )] pub struct TeleportSubsystem { - packets_to_send_tx: Sender, + packets_to_send_tx: PushSocket, jm: Arc, em: Arc, db: Arc } impl TeleportSubsystem { - pub fn new(jm: Arc, db: Arc, em: Arc, packets_to_send_tx: Sender) -> Self { + pub fn new(jm: Arc, db: Arc, em: Arc, node_config: &NodeConfig) -> Self { let mut nt = Self { - packets_to_send_tx: packets_to_send_tx, + packets_to_send_tx: node_config.connect_out_queue().unwrap(), packet_callbacks: HashMap::new(), jm: jm, em: em, @@ -77,7 +78,7 @@ impl TeleportSubsystem { self.em.player_teleported(user_id, pos, s_id, scene_info.scene_token, &proto::EnterType::EnterGoto); } - pub fn process_unlock_trans_point(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::UnlockTransPointReq, rsp: &mut proto::UnlockTransPointRsp) { + pub fn process_unlock_trans_point(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::UnlockTransPointReq, rsp: &mut proto::UnlockTransPointRsp) { let scene_id = req.scene_id; let point_id = req.point_id; @@ -90,4 +91,4 @@ impl TeleportSubsystem { point_list: vec![point_id], }); } -} \ No newline at end of file +}