Abstract message passing interface further, prepare for decoupling

This commit is contained in:
Nobody 2022-10-06 18:12:40 +05:00
parent fc7ecd2d24
commit 169d78f45d
26 changed files with 508 additions and 118 deletions

View File

@ -15,6 +15,7 @@ proto = { path = "proto" }
lua_serde = { path = "lua_serde" } lua_serde = { path = "lua_serde" }
packet-processor-macro = { path = "packet-processor-macro" } packet-processor-macro = { path = "packet-processor-macro" }
packet-processor = { path = "packet-processor" } packet-processor = { path = "packet-processor" }
rs-ipc = { path = "rs-ipc" }
prost = "0.8" prost = "0.8"
bytes = "1.1.0" bytes = "1.1.0"

View File

@ -16,7 +16,7 @@ macro_rules! register_callback {
slef.$handler(user_id, &metadata, &req, &mut rsp); slef.$handler(user_id, &metadata, &req, &mut rsp);
let message = IpcMessage::new_from_proto(user_id, proto::PacketId::$rsp, metadata, &rsp); let message = IpcMessage::new_from_proto(proto::PacketId::$rsp, user_id, metadata, &rsp);
slef.packets_to_send_tx.send(message).unwrap(); slef.packets_to_send_tx.send(message).unwrap();
}); });
}; };
@ -36,8 +36,8 @@ macro_rules! build_and_send {
($self:ident, $user_id: ident, $metadata:ident, $id:ident { $($i:ident : $e:expr,)* }) => {{ ($self:ident, $user_id: ident, $metadata:ident, $id:ident { $($i:ident : $e:expr,)* }) => {{
$self.packets_to_send_tx.send( $self.packets_to_send_tx.send(
IpcMessage::new_from_proto( IpcMessage::new_from_proto(
$user_id,
proto::PacketId::$id, proto::PacketId::$id,
$user_id,
$metadata, $metadata,
&proto::$id { $($i: $e,)* ..proto::$id::default() } &proto::$id { $($i: $e,)* ..proto::$id::default() }
) )

13
rs-ipc/.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# Generated by Cargo
# will have compiled files and executables
/target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here http://doc.crates.io/guide.html#cargotoml-vs-cargolock
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# IDE files
.idea

14
rs-ipc/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "rs-ipc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
proto = { path = "../proto" }
num-traits = "0.2"
prost = "0.8"
futures = "0.3"
zeromq = { version = "0.3", default-features = false, features = ["async-std-runtime", "all-transport"] }

86
rs-ipc/src/ipc/message.rs Normal file
View File

@ -0,0 +1,86 @@
use std::convert::{From, TryInto};
use prost::Message;
use zeromq::ZmqMessage;
use num_traits::FromPrimitive;
pub struct IpcMessage(pub proto::PacketId, pub u32, pub Vec<u8>, pub Vec<u8>);
impl IpcMessage {
pub fn new_from_proto<M: prost::Message>(packet_id: proto::PacketId, user_id: u32, metadata: &proto::PacketHead, data: &M) -> IpcMessage {
println!("Replying with {:?}", packet_id);
println!("Data: {:?}", data);
let mut buf: Vec<u8> = vec!();
data.encode(&mut buf).unwrap();
let mut metabuf: Vec<u8> = vec!();
metadata.encode(&mut metabuf).unwrap();
return IpcMessage(
packet_id,
user_id,
metabuf,
buf
);
}
pub fn format_topic(topic: proto::PacketId) -> String {
format!("{:04x}", topic as u16)
}
}
impl From<Vec<u8>> for IpcMessage {
fn from (input: Vec<u8>) -> IpcMessage {
let packet_id = String::from_utf8_lossy(&input[0..4]);
let packet_id = u16::from_str_radix(&packet_id, 16).unwrap();
//let packet_id = (((input[0] - 48) as u16) << 12) | (((input[1] - 48) as u16) << 8) | (((input[2] - 48) as u16) << 4) | (((input[3] - 48) as u16) << 0);
let packet_id = FromPrimitive::from_u16(packet_id).unwrap(); // Should be 100% correct
let user_id: u32 = u32::from_le_bytes(input[4..8].try_into().unwrap());
let metadata_len: u32 = u32::from_le_bytes(input[8..12].try_into().unwrap());
let data_len: u32 = u32::from_le_bytes(input[12..16].try_into().unwrap());
let metadata = input[16..(metadata_len+16) as usize].to_owned();
let data = input[(metadata_len+16) as usize..(data_len+metadata_len+16) as usize].to_owned();
IpcMessage(packet_id, user_id, metadata, data)
}
}
impl From<ZmqMessage> for IpcMessage {
fn from (input: ZmqMessage) -> IpcMessage {
// ZmqMessage::into_vec returns a vector of Bytes object
// We flat_map them into Vec<u8>
let input: Vec<u8> = input.into_vec().iter().flat_map(|b| b.to_vec()).collect();
input.into()
}
}
impl From<IpcMessage> for Vec<u8> {
fn from (input: IpcMessage) -> Vec<u8> {
let mut data: Vec<u8> = vec![];
data.extend_from_slice(IpcMessage::format_topic(input.0).as_bytes());
data.extend_from_slice(&input.1.to_le_bytes());
data.extend_from_slice(&(input.2.len() as u32).to_le_bytes());
data.extend_from_slice(&(input.3.len() as u32).to_le_bytes());
data.extend_from_slice(&input.2);
data.extend_from_slice(&input.3);
data
}
}
impl From<IpcMessage> for ZmqMessage {
fn from (input: IpcMessage) -> ZmqMessage {
let input: Vec<u8> = input.into();
input.into()
}
}

5
rs-ipc/src/ipc/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod message;
mod socket;
pub use message::IpcMessage;
pub use socket::{SubSocket, PubSocket, PushSocket, PullSocket, Result};

169
rs-ipc/src/ipc/socket.rs Normal file
View File

@ -0,0 +1,169 @@
use std::fmt::{Debug, Error, Formatter};
use std::result::Result as StdResult;
use zeromq::{Socket, SocketRecv, SocketSend, ZmqResult};
use proto::PacketId;
use crate::IpcMessage;
pub type Result<T> = ZmqResult<T>;
/*
This is used to convert async operations into sync ones
*/
trait Block {
fn wait(self) -> <Self as futures::Future>::Output
where Self: Sized, Self: futures::Future
{
futures::executor::block_on(self)
}
}
impl<F,T> Block for F
where F: futures::Future<Output = T>
{}
// -------------
// Socket for client subscription; can only receive data
pub struct SubSocket {
socket: zeromq::SubSocket,
}
// Socket for server to publish the data; can only transmit data
pub struct PubSocket {
socket: zeromq::PubSocket,
}
// Socket for pushing the data towards the receiver
pub struct PushSocket {
socket: zeromq::PushSocket,
}
// Socket for pulling the data
pub struct PullSocket {
socket: zeromq::PullSocket,
}
impl Debug for PushSocket {
fn fmt(&self, _: &mut Formatter<'_>) -> StdResult<(), Error> {
return Ok(())
}
}
impl SubSocket {
pub fn connect_tcp(address: &str, port: u16) -> Result<Self> {
let mut socket = zeromq::SubSocket::new();
socket.connect(&format!("tcp://{}:{}", address, port)).wait()?;
Ok(SubSocket {
socket: socket,
})
}
pub fn connect_unix(address: &str) -> Result<Self> {
let mut socket = zeromq::SubSocket::new();
socket.connect(&format!("ipc://{}", address)).wait()?;
Ok(SubSocket {
socket: socket,
})
}
pub fn subscribe(&mut self, topics: Vec<PacketId>) -> Result<()> {
for topic in topics {
self.socket.subscribe(&IpcMessage::format_topic(topic)).wait()?;
}
Ok(())
}
pub fn subscribe_all(&mut self) -> Result<()> {
Ok(self.socket.subscribe("").wait()?)
}
pub fn recv(&mut self) -> Result<IpcMessage> {
Ok(self.socket.recv().wait()?.into())
}
}
impl PubSocket {
pub fn bind_tcp(address: &str, port: u16) -> Result<Self> {
let mut socket = zeromq::PubSocket::new();
socket.bind(&format!("tcp://{}:{}", address, port)).wait()?;
Ok(PubSocket {
socket: socket,
})
}
pub fn bind_unix(address: &str) -> Result<Self> {
let mut socket = zeromq::PubSocket::new();
socket.bind(&format!("ipc://{}", address)).wait()?;
Ok(PubSocket {
socket: socket,
})
}
pub fn send(&mut self, message: IpcMessage) -> Result<()> {
Ok(self.socket.send( message.into() ).wait()?)
}
}
impl PushSocket {
pub fn connect_tcp(address: &str, port: u16) -> Result<Self> {
let mut socket = zeromq::PushSocket::new();
socket.connect(&format!("tcp://{}:{}", address, port)).wait()?;
Ok(PushSocket {
socket: socket,
})
}
pub fn connect_unix(address: &str) -> Result<Self> {
let mut socket = zeromq::PushSocket::new();
socket.connect(&format!("ipc://{}", address)).wait()?;
Ok(PushSocket {
socket: socket,
})
}
pub fn send(&mut self, message: IpcMessage) -> Result<()> {
Ok(self.socket.send( message.into() ).wait()?)
}
}
impl PullSocket {
pub fn bind_tcp(address: &str, port: u16) -> Result<Self> {
let mut socket = zeromq::PullSocket::new();
socket.bind(&format!("tcp://{}:{}", address, port)).wait()?;
Ok(PullSocket {
socket: socket,
})
}
pub fn bind_unix(address: &str) -> Result<Self> {
let mut socket = zeromq::PullSocket::new();
socket.bind(&format!("ipc://{}", address)).wait()?;
Ok(PullSocket {
socket: socket,
})
}
pub fn recv(&mut self) -> Result<IpcMessage> {
Ok(self.socket.recv().wait()?.into())
}
}

4
rs-ipc/src/lib.rs Normal file
View File

@ -0,0 +1,4 @@
mod ipc;
pub use ipc::IpcMessage;
pub use ipc::{SubSocket, PubSocket, PushSocket, PullSocket, Result};

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use prost::Message; use prost::Message;
@ -18,9 +18,10 @@ use crate::{DatabaseManager, JsonManager, LuaManager};
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
use crate::luamanager::Vector; use crate::luamanager::Vector;
use crate::node::NodeConfig;
use super::entities::Entity; use super::entities::Entity;
#[derive(Debug, Clone)] #[derive(Debug)]
struct Player { struct Player {
player_id: u32, player_id: u32,
pos: Vector, pos: Vector,
@ -30,7 +31,7 @@ struct Player {
lua_manager: Arc<LuaManager>, lua_manager: Arc<LuaManager>,
json_manager: Arc<JsonManager>, json_manager: Arc<JsonManager>,
db_manager: Arc<DatabaseManager>, db_manager: Arc<DatabaseManager>,
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
} }
impl Player { impl Player {
@ -38,7 +39,7 @@ impl Player {
const SPAWN_DISTANCE: f32 = Self::DESPAWN_DISTANCE * 0.8; const SPAWN_DISTANCE: f32 = Self::DESPAWN_DISTANCE * 0.8;
const RESPAWN_TIME: i32 = 10; // In seconds const RESPAWN_TIME: i32 = 10; // In seconds
pub fn despawn_everything(&self) { pub fn despawn_everything(&mut self) {
let entity_list: Vec<u32> = self.entities.iter().map(|(k, v)| *k).collect(); let entity_list: Vec<u32> = self.entities.iter().map(|(k, v)| *k).collect();
if entity_list.len() > 0 { if entity_list.len() > 0 {
@ -118,7 +119,7 @@ impl Player {
} }
} }
pub fn enter_scene(&self, enter_type: &proto::EnterType, token: u32) { pub fn enter_scene(&mut self, enter_type: &proto::EnterType, token: u32) {
let world_level = self.db_manager.get_player_world_level(self.player_id).unwrap() as u32; let world_level = self.db_manager.get_player_world_level(self.player_id).unwrap() as u32;
let player_id = self.player_id; let player_id = self.player_id;
@ -153,7 +154,7 @@ impl Player {
} }
pub struct EntityManager { pub struct EntityManager {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
players: Arc<Mutex<HashMap<u32, Player>>>, players: Arc<Mutex<HashMap<u32, Player>>>,
players_moved: Sender<u32>, players_moved: Sender<u32>,
lua_manager: Arc<LuaManager>, lua_manager: Arc<LuaManager>,
@ -162,11 +163,11 @@ pub struct EntityManager {
} }
impl EntityManager { impl EntityManager {
pub fn new(lua_manager: Arc<LuaManager>, json_manager: Arc<JsonManager>, db_manager: Arc<DatabaseManager>, packets_to_send_tx: Sender<IpcMessage>) -> Self { pub fn new(lua_manager: Arc<LuaManager>, json_manager: Arc<JsonManager>, db_manager: Arc<DatabaseManager>, node_config: &NodeConfig) -> Self {
let (tx, rx): (Sender<u32>, Receiver<u32>) = mpsc::channel(); let (tx, rx): (Sender<u32>, Receiver<u32>) = mpsc::channel();
let mut es = Self { let mut es = Self {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
players_moved: tx, players_moved: tx,
players: Arc::new(Mutex::new(HashMap::new())), players: Arc::new(Mutex::new(HashMap::new())),
lua_manager: lua_manager, lua_manager: lua_manager,
@ -252,7 +253,7 @@ impl EntityManager {
player.enter_scene(reason, token); player.enter_scene(reason, token);
}, },
Vacant(entry) => { Vacant(entry) => {
let player = Player { let mut player = Player {
player_id: user_id, player_id: user_id,
pos: pos, pos: pos,
current_block: 0, current_block: 0,
@ -261,7 +262,7 @@ impl EntityManager {
lua_manager: self.lua_manager.clone(), lua_manager: self.lua_manager.clone(),
json_manager: self.json_manager.clone(), json_manager: self.json_manager.clone(),
db_manager: self.db_manager.clone(), db_manager: self.db_manager.clone(),
packets_to_send_tx: self.packets_to_send_tx.clone(), packets_to_send_tx: NodeConfig::new().connect_out_queue().unwrap(),//self.packets_to_send_tx.clone(),
}; };
player.enter_scene(reason, token); player.enter_scene(reason, token);

View File

@ -7,6 +7,8 @@ extern crate num_derive;
use std::thread; use std::thread;
mod node;
mod server; mod server;
mod utils; mod utils;
mod dbmanager; mod dbmanager;
@ -26,6 +28,8 @@ use entitymanager::EntityManager;
fn main() { fn main() {
//pretty_env_logger::init(); //pretty_env_logger::init();
//let mut rt_main = tokio::runtime::Runtime::new().unwrap();
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG) .with_max_level(tracing::Level::DEBUG)
.with_test_writer() .with_test_writer()

35
src/node/config.rs Normal file
View File

@ -0,0 +1,35 @@
use rs_ipc::{PubSocket, PullSocket, PushSocket, Result, SubSocket};
pub struct NodeConfig {
pub in_queue_addr: String,
pub in_queue_port: u16,
pub out_queue_addr: String,
pub out_queue_port: u16,
}
impl NodeConfig {
pub fn new() -> Self {
NodeConfig {
in_queue_addr: "127.0.0.1".to_string(),
in_queue_port: 9012,
out_queue_addr: "127.0.0.1".to_string(),
out_queue_port: 9014,
}
}
pub fn bind_in_queue(&self) -> Result<PubSocket> {
PubSocket::bind_tcp(&self.in_queue_addr, self.in_queue_port)
}
pub fn bind_out_queue(&self) -> Result<PullSocket> {
PullSocket::bind_tcp(&self.out_queue_addr, self.out_queue_port)
}
pub fn connect_in_queue(&self) -> Result<SubSocket> {
SubSocket::connect_tcp(&self.in_queue_addr, self.in_queue_port)
}
pub fn connect_out_queue(&self) -> Result<PushSocket> {
PushSocket::connect_tcp(&self.out_queue_addr, self.out_queue_port)
}
}

3
src/node/mod.rs Normal file
View File

@ -0,0 +1,3 @@
mod config;
pub use config::NodeConfig;

View File

@ -8,28 +8,30 @@ use std::convert::TryInto;
use prost::Message; use prost::Message;
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use packet_processor_macro::*; use packet_processor_macro::*;
#[macro_use] #[macro_use]
use packet_processor::*; use packet_processor::*;
use crate::node::NodeConfig;
#[packet_processor(GetPlayerTokenReq)] #[packet_processor(GetPlayerTokenReq)]
pub struct AuthManager { pub struct AuthManager {
conv_to_user: HashMap<u32, u32>, conv_to_user: HashMap<u32, u32>,
user_to_conv: HashMap<u32, u32>, user_to_conv: HashMap<u32, u32>,
packets_to_send_tx: mpsc::Sender<IpcMessage>, //packets_to_send_tx: mpsc::Sender<IpcMessage>,
packets_to_send_tx: PushSocket,
} }
impl AuthManager { impl AuthManager {
pub const SPOOFED_PLAYER_UID: u32 = 1337; pub const SPOOFED_PLAYER_UID: u32 = 1337;
pub fn new(packets_to_send_tx: mpsc::Sender<IpcMessage>) -> AuthManager { pub fn new(node_config: &NodeConfig) -> AuthManager {
let mut am = AuthManager { let mut am = AuthManager {
conv_to_user: HashMap::new(), conv_to_user: HashMap::new(),
user_to_conv: HashMap::new(), user_to_conv: HashMap::new(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
}; };
am.register(); am.register();

View File

@ -1,11 +1,12 @@
use std::sync::mpsc; use std::sync::{mpsc, Mutex};
use std::thread; use std::thread;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use rs_ipc::{SubSocket, IpcMessage, PushSocket};
use crate::server::GameWorld; use crate::server::GameWorld;
use packet_processor::PacketProcessor; use packet_processor::PacketProcessor;
use crate::server::IpcMessage;
use crate::{DatabaseManager, EntitySubsystem}; use crate::{DatabaseManager, EntitySubsystem};
use crate::JsonManager; use crate::JsonManager;
@ -13,12 +14,32 @@ use crate::LuaManager;
use crate::server::LoginManager; use crate::server::LoginManager;
use std::sync::Arc; use std::sync::Arc;
use crate::entitymanager::EntityManager; use crate::entitymanager::EntityManager;
use crate::node::NodeConfig;
use crate::subsystems::{InventorySubsystem, NpcSubsystem, ShopSubsystem}; use crate::subsystems::{InventorySubsystem, NpcSubsystem, ShopSubsystem};
use crate::subsystems::misc::{PauseSubsystem, SceneSubsystem, SocialSubsystem, TeleportSubsystem}; use crate::subsystems::misc::{PauseSubsystem, SceneSubsystem, SocialSubsystem, TeleportSubsystem};
/*
This is used to convert async operations into sync ones
*/
trait Block {
fn wait(self) -> <Self as futures::Future>::Output
where Self: Sized, Self: futures::Future
{
futures::executor::block_on(self)
}
}
impl<F,T> Block for F
where F: futures::Future<Output = T>
{}
// -------------
pub struct GameServer { pub struct GameServer {
packets_to_process_rx: mpsc::Receiver<IpcMessage>, //packets_to_process_rx: mpsc::Receiver<IpcMessage>,
packets_to_send_tx: mpsc::Sender<IpcMessage>, packets_to_process_rx: SubSocket,
//packets_to_send_tx: mpsc::Sender<IpcMessage>,
//packets_to_send_tx: PushSocket,
worlds: HashMap<u32, GameWorld>, worlds: HashMap<u32, GameWorld>,
login_manager: LoginManager, login_manager: LoginManager,
database_manager: Arc<DatabaseManager>, database_manager: Arc<DatabaseManager>,
@ -27,26 +48,30 @@ pub struct GameServer {
} }
impl GameServer { impl GameServer {
pub fn new(packets_to_process_rx: mpsc::Receiver<IpcMessage>, packets_to_send_tx: mpsc::Sender<IpcMessage>) -> GameServer { //pub fn new(packets_to_process_rx: mpsc::Receiver<IpcMessage>, packets_to_send_tx: mpsc::Sender<IpcMessage>) -> GameServer {
pub fn new(node_config: &NodeConfig) -> GameServer {
let jm = Arc::new(JsonManager::new("./data/json")); let jm = Arc::new(JsonManager::new("./data/json"));
let db = Arc::new(DatabaseManager::new("sqlite://./database.db3", jm.clone())); let db = Arc::new(DatabaseManager::new("sqlite://./database.db3", jm.clone()));
let lum = Arc::new(LuaManager::new("./data/lua", &jm.clone())); let lum = Arc::new(LuaManager::new("./data/lua", &jm.clone()));
let em = Arc::new(EntityManager::new(lum.clone(),jm.clone(), db.clone(), packets_to_send_tx.clone())); let em = Arc::new(EntityManager::new(lum.clone(),jm.clone(), db.clone(), node_config));
let lm = LoginManager::new(db.clone(), jm.clone(), em.clone(),packets_to_send_tx.clone()); let lm = LoginManager::new(db.clone(), jm.clone(), em.clone(),node_config);
let inv = Arc::new(InventorySubsystem::new(jm.clone(), db.clone(), packets_to_send_tx.clone())); let inv = InventorySubsystem::new(jm.clone(), db.clone(), node_config);
let es = EntitySubsystem::new(lum.clone(), jm.clone(), db.clone(), em.clone(), packets_to_send_tx.clone()); let es = EntitySubsystem::new(lum.clone(), jm.clone(), db.clone(), em.clone(), node_config);
let nt = NpcSubsystem::new(packets_to_send_tx.clone()); let nt = NpcSubsystem::new(node_config);
let ss = ShopSubsystem::new(jm.clone(), db.clone(), inv.clone(), packets_to_send_tx.clone()); let ss = ShopSubsystem::new(jm.clone(), db.clone(), Mutex::new(inv), node_config);
let scs = SceneSubsystem::new(db.clone(), packets_to_send_tx.clone()); let scs = SceneSubsystem::new(db.clone(), node_config);
let ps = PauseSubsystem::new(packets_to_send_tx.clone()); let ps = PauseSubsystem::new(node_config);
let socs = SocialSubsystem::new(db.clone(), packets_to_send_tx.clone()); let socs = SocialSubsystem::new(db.clone(), node_config);
let ts = TeleportSubsystem::new(jm.clone(), db.clone(), em.clone(), packets_to_send_tx.clone()); let ts = TeleportSubsystem::new(jm.clone(), db.clone(), em.clone(), node_config);
let mut packets_to_process_rx = node_config.connect_in_queue().unwrap();
packets_to_process_rx.subscribe_all();
//let mut packets_to_send_tx = PushSocket::connect_tcp("127.0.0.1", 9014).unwrap();
let gs = GameServer { let gs = GameServer {
packets_to_process_rx: packets_to_process_rx, packets_to_process_rx: packets_to_process_rx,
packets_to_send_tx: packets_to_send_tx,
worlds: HashMap::new(), worlds: HashMap::new(),
login_manager: lm, login_manager: lm,
database_manager: db.clone(), database_manager: db.clone(),
@ -66,7 +91,7 @@ impl GameServer {
}); });
loop { loop {
let IpcMessage(user_id, packet_id, metadata, data) = self.packets_to_process_rx.recv().unwrap(); let IpcMessage(packet_id, user_id, metadata, data) = self.packets_to_process_rx.recv().unwrap();
if (self.login_manager.is_supported(&packet_id)) { if (self.login_manager.is_supported(&packet_id)) {
self.login_manager.process(user_id, packet_id, metadata, data); self.login_manager.process(user_id, packet_id, metadata, data);
@ -75,7 +100,7 @@ impl GameServer {
let world = match self.worlds.entry(user_id) { let world = match self.worlds.entry(user_id) {
Occupied(world) => world.into_mut(), Occupied(world) => world.into_mut(),
Vacant(entry) => { Vacant(entry) => {
let world = GameWorld::new(self.database_manager.clone(),self.json_manager.clone(), self.packets_to_send_tx.clone()); let world = GameWorld::new(self.database_manager.clone(),self.json_manager.clone()/*, self.packets_to_send_tx.clone()*/);
entry.insert(world) entry.insert(world)
}, },
}; };

View File

@ -7,7 +7,7 @@ use prost::Message;
use chrono::Datelike; use chrono::Datelike;
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use crate::utils::{AvatarBuilder, Remapper}; use crate::utils::{AvatarBuilder, Remapper};
@ -40,13 +40,16 @@ macro_rules! collection {
EnterWorldAreaReq, EnterWorldAreaReq,
)] )]
pub struct GameWorld { pub struct GameWorld {
packets_to_send_tx: mpsc::Sender<IpcMessage>, //packets_to_send_tx: mpsc::Sender<IpcMessage>,
packets_to_send_tx: PushSocket,
db: Arc<DatabaseManager>, db: Arc<DatabaseManager>,
jm: Arc<JsonManager>, jm: Arc<JsonManager>,
} }
impl GameWorld { impl GameWorld {
pub fn new(db: Arc<DatabaseManager>, jm: Arc<JsonManager>, packets_to_send_tx: mpsc::Sender<IpcMessage>) -> GameWorld { pub fn new(db: Arc<DatabaseManager>, jm: Arc<JsonManager>/*, packets_to_send_tx: mpsc::Sender<IpcMessage>*/) -> GameWorld {
let mut packets_to_send_tx = PushSocket::connect_tcp("127.0.0.1", 9014).unwrap();
let mut gw = GameWorld { let mut gw = GameWorld {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: packets_to_send_tx,
db: db.clone(), db: db.clone(),
@ -63,7 +66,7 @@ impl GameWorld {
rsp.client_time = req.client_time; rsp.client_time = req.client_time;
} }
fn process_enter_scene_ready(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneReadyReq, rsp: &mut proto::EnterSceneReadyRsp) { fn process_enter_scene_ready(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneReadyReq, rsp: &mut proto::EnterSceneReadyRsp) {
rsp.enter_scene_token = req.enter_scene_token; rsp.enter_scene_token = req.enter_scene_token;
let current_scene_info = match self.db.get_player_scene_info(user_id) { let current_scene_info = match self.db.get_player_scene_info(user_id) {
@ -79,7 +82,7 @@ impl GameWorld {
}); });
} }
fn process_scene_init_finish(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::SceneInitFinishReq, rsp: &mut proto::SceneInitFinishRsp) { fn process_scene_init_finish(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::SceneInitFinishReq, rsp: &mut proto::SceneInitFinishRsp) {
let (current_avatar_guid, current_team_id) = match self.db.get_player_team_selection(user_id) { let (current_avatar_guid, current_team_id) = match self.db.get_player_team_selection(user_id) {
Some(team_selection) => (team_selection.avatar, team_selection.team), Some(team_selection) => (team_selection.avatar, team_selection.team),
None => panic!("Team selection info not found for user {}!", user_id), None => panic!("Team selection info not found for user {}!", user_id),
@ -205,7 +208,7 @@ impl GameWorld {
}); });
} }
fn process_enter_scene_done(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneDoneReq, rsp: &mut proto::EnterSceneDoneRsp) { fn process_enter_scene_done(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::EnterSceneDoneReq, rsp: &mut proto::EnterSceneDoneRsp) {
rsp.enter_scene_token = req.enter_scene_token; rsp.enter_scene_token = req.enter_scene_token;
build_and_send!(self, user_id, metadata, SceneEntityAppearNotify { build_and_send!(self, user_id, metadata, SceneEntityAppearNotify {

View File

@ -3,7 +3,7 @@ use std::collections::HashMap;
use prost::Message; use prost::Message;
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use packet_processor_macro::*; use packet_processor_macro::*;
#[macro_use] #[macro_use]
@ -17,20 +17,21 @@ use crate::utils::TimeManager;
use crate::dbmanager::database_manager::AvatarInfo as DbAvatarInfo; use crate::dbmanager::database_manager::AvatarInfo as DbAvatarInfo;
use crate::entitymanager::EntityManager; use crate::entitymanager::EntityManager;
use crate::node::NodeConfig;
#[packet_processor(PlayerLoginReq)] #[packet_processor(PlayerLoginReq)]
pub struct LoginManager { pub struct LoginManager {
packets_to_send_tx: mpsc::Sender<IpcMessage>, packets_to_send_tx: PushSocket,
db: Arc<DatabaseManager>, db: Arc<DatabaseManager>,
jm: Arc<JsonManager>, jm: Arc<JsonManager>,
em: Arc<EntityManager>, em: Arc<EntityManager>,
} }
impl LoginManager { impl LoginManager {
pub fn new(db: Arc<DatabaseManager>, jm: Arc<JsonManager>, em: Arc<EntityManager>, packets_to_send_tx: mpsc::Sender<IpcMessage>) -> LoginManager { pub fn new(db: Arc<DatabaseManager>, jm: Arc<JsonManager>, em: Arc<EntityManager>, node_config: &NodeConfig) -> LoginManager {
let mut lm = LoginManager { let mut lm = LoginManager {
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
db: db, db: db,
jm: jm, jm: jm,
em: em, em: em,
@ -41,7 +42,7 @@ impl LoginManager {
return lm; return lm;
} }
fn process_player_login(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::PlayerLoginReq, rsp: &mut proto::PlayerLoginRsp) { fn process_player_login(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::PlayerLoginReq, rsp: &mut proto::PlayerLoginRsp) {
let user = match self.db.get_player_info(user_id) { let user = match self.db.get_player_info(user_id) {
Some(user) => user, Some(user) => user,
None => panic!("User {} not found!", user_id), None => panic!("User {} not found!", user_id),

View File

@ -4,7 +4,6 @@ mod game_world;
mod auth_manager; mod auth_manager;
mod login_manager; mod login_manager;
mod client_connection; mod client_connection;
mod ipc_message;
pub use self::network_server::NetworkServer; pub use self::network_server::NetworkServer;
pub use self::game_server::GameServer; pub use self::game_server::GameServer;
@ -12,4 +11,3 @@ pub use self::game_world::GameWorld;
pub use self::auth_manager::AuthManager; pub use self::auth_manager::AuthManager;
pub use self::login_manager::LoginManager; pub use self::login_manager::LoginManager;
pub use self::client_connection::ClientConnection; pub use self::client_connection::ClientConnection;
pub use self::ipc_message::IpcMessage;

View File

@ -18,7 +18,7 @@ use crate::server::ClientConnection;
use crate::server::GameServer; use crate::server::GameServer;
use crate::server::AuthManager; use crate::server::AuthManager;
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PullSocket, PushSocket};
use proto::PacketHead; use proto::PacketHead;
use proto::GetPlayerTokenRsp; use proto::GetPlayerTokenRsp;
@ -26,15 +26,35 @@ use proto::get_player_token_rsp;
use prost::Message; use prost::Message;
use rs_ipc::{SubSocket, PubSocket};
use packet_processor::{PacketProcessor, EasilyUnpackable}; use packet_processor::{PacketProcessor, EasilyUnpackable};
use crate::node::NodeConfig;
extern crate kcp; extern crate kcp;
/*
This is used to convert async operations into sync ones
*/
trait Block {
fn wait(self) -> <Self as futures::Future>::Output
where Self: Sized, Self: futures::Future
{
futures::executor::block_on(self)
}
}
impl<F,T> Block for F
where F: futures::Future<Output = T>
{}
// -------------
pub struct NetworkServer { pub struct NetworkServer {
socket: UdpSocket, socket: UdpSocket,
clients: Arc<Mutex<HashMap<u32,ClientConnection>>>, clients: Arc<Mutex<HashMap<u32,ClientConnection>>>,
packets_to_process_tx: Option<mpsc::Sender<IpcMessage>>, node_config: NodeConfig,
packets_to_send_tx: Option<mpsc::Sender<IpcMessage>>, packets_to_process_tx: PubSocket,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -56,14 +76,18 @@ impl fmt::Display for NetworkServerError {
impl NetworkServer { impl NetworkServer {
pub fn new(host: &str, port: i16) -> Result<NetworkServer, NetworkServerError> { pub fn new(host: &str, port: i16) -> Result<NetworkServer, NetworkServerError> {
let node_config = NodeConfig::new();
let mut packets_to_process_tx = node_config.bind_in_queue().unwrap();
let gs = NetworkServer { let gs = NetworkServer {
socket: match UdpSocket::bind(format!("{}:{}", host, port).to_string()) { socket: match UdpSocket::bind(format!("{}:{}", host, port).to_string()) {
Ok(socket) => socket, Ok(socket) => socket,
Err(e) => return Err(NetworkServerError::new(format!("Failed to bind socket: {}", e).as_str())), Err(e) => return Err(NetworkServerError::new(format!("Failed to bind socket: {}", e).as_str())),
}, },
clients: Arc::new(Mutex::new(HashMap::new())), clients: Arc::new(Mutex::new(HashMap::new())),
packets_to_process_tx: None, node_config: node_config,
packets_to_send_tx: None, packets_to_process_tx: packets_to_process_tx,
}; };
print!("Connection established\n"); print!("Connection established\n");
@ -74,22 +98,15 @@ impl NetworkServer {
pub fn run(&mut self) -> Result<i16, NetworkServerError> { pub fn run(&mut self) -> Result<i16, NetworkServerError> {
print!("Starting server\n"); print!("Starting server\n");
// Channel for relaying packets from network thread to processing thread let mut packets_to_send_rx = self.node_config.bind_out_queue().unwrap();
let (packets_to_process_tx, packets_to_process_rx) = mpsc::channel();
// Channel for relaying packets from network thread to processing thread
let (packets_to_send_tx, packets_to_send_rx) = mpsc::channel();
self.packets_to_process_tx = Some(packets_to_process_tx);
self.packets_to_send_tx = Some(packets_to_send_tx.clone()); // TODO: hack!
let clients = self.clients.clone(); let clients = self.clients.clone();
let mut auth_manager = Arc::new(Mutex::new(AuthManager::new(packets_to_send_tx.clone()))); let mut auth_manager = Arc::new(Mutex::new(AuthManager::new(&self.node_config)));
let am = auth_manager.clone(); let am = auth_manager.clone();
let packet_relaying_thread = thread::spawn(move || { let packet_relaying_thread = thread::spawn(move || {
loop { loop {
let IpcMessage(user_id, packet_id, metadata, data) = packets_to_send_rx.recv().unwrap(); let IpcMessage(packet_id, user_id, metadata, data) = packets_to_send_rx.recv().unwrap();
let conv = match packet_id { let conv = match packet_id {
proto::PacketId::GetPlayerTokenRsp => user_id, // Mapping is not performed on those proto::PacketId::GetPlayerTokenRsp => user_id, // Mapping is not performed on those
@ -115,7 +132,8 @@ impl NetworkServer {
}); });
let game_thread = thread::spawn(move || { let game_thread = thread::spawn(move || {
let mut gs = GameServer::new(packets_to_process_rx, packets_to_send_tx); let node_config = NodeConfig::new();
let mut gs = GameServer::new(&node_config);
gs.run(); gs.run();
}); });
@ -222,10 +240,10 @@ impl NetworkServer {
fn send_packet_to_process(&mut self, user_id: u32, packet_id: u16, metadata: &[u8], data: &[u8]) fn send_packet_to_process(&mut self, user_id: u32, packet_id: u16, metadata: &[u8], data: &[u8])
{ {
let sender = match &self.packets_to_process_tx { /*let sender: &mut PubSocket = match &self.packets_to_process_tx {
Some(sender) => sender, Some(mut sender) => &mut sender,
None => panic!("Processing queue wasn't set up!"), None => panic!("Processing queue wasn't set up!"),
}; };*/
let packet_id: proto::PacketId = match FromPrimitive::from_u16(packet_id) { let packet_id: proto::PacketId = match FromPrimitive::from_u16(packet_id) {
Some(packet_id) => packet_id, Some(packet_id) => packet_id,
@ -236,6 +254,6 @@ impl NetworkServer {
}; };
println!("Got packet {:?}", packet_id); println!("Got packet {:?}", packet_id);
sender.send( IpcMessage(user_id, packet_id, metadata.to_vec(), data.to_vec()) ).unwrap(); self.packets_to_process_tx.send( IpcMessage(packet_id, user_id, metadata.to_vec(), data.to_vec()) );
} }
} }

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use prost::Message; use prost::Message;
@ -19,12 +19,13 @@ use crate::entitymanager::EntityManager;
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
use crate::luamanager::Vector; use crate::luamanager::Vector;
use crate::node::NodeConfig;
#[packet_processor( #[packet_processor(
CombatInvocationsNotify, CombatInvocationsNotify,
)] )]
pub struct EntitySubsystem { pub struct EntitySubsystem {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
lua_manager: Arc<LuaManager>, lua_manager: Arc<LuaManager>,
json_manager: Arc<JsonManager>, json_manager: Arc<JsonManager>,
db_manager: Arc<DatabaseManager>, db_manager: Arc<DatabaseManager>,
@ -32,9 +33,9 @@ pub struct EntitySubsystem {
} }
impl EntitySubsystem { impl EntitySubsystem {
pub fn new(lua_manager: Arc<LuaManager>, json_manager: Arc<JsonManager>, db_manager: Arc<DatabaseManager>, entity_manager: Arc<EntityManager>, packets_to_send_tx: Sender<IpcMessage>) -> EntitySubsystem { pub fn new(lua_manager: Arc<LuaManager>, json_manager: Arc<JsonManager>, db_manager: Arc<DatabaseManager>, entity_manager: Arc<EntityManager>, node_config: &NodeConfig) -> EntitySubsystem {
let mut es = EntitySubsystem { let mut es = EntitySubsystem {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
lua_manager: lua_manager, lua_manager: lua_manager,
json_manager: json_manager, json_manager: json_manager,
@ -47,7 +48,7 @@ impl EntitySubsystem {
return es; return es;
} }
fn process_combat_invocations(&self, user_id: u32, metadata: &proto::PacketHead, notify: &proto::CombatInvocationsNotify) { fn process_combat_invocations(&mut self, user_id: u32, metadata: &proto::PacketHead, notify: &proto::CombatInvocationsNotify) {
for invoke in notify.invoke_list.iter() { for invoke in notify.invoke_list.iter() {
self.handle_invoke(user_id, metadata, invoke); self.handle_invoke(user_id, metadata, invoke);
self.forward_invoke(user_id, metadata, invoke); self.forward_invoke(user_id, metadata, invoke);
@ -117,7 +118,7 @@ impl EntitySubsystem {
*/ */
// Main function // Main function
fn forward_invoke(&self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { fn forward_invoke(&mut self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) {
match ForwardType::from_i32(invoke.forward_type).unwrap() { // Panics in case of unknown (undescribed in protobuf file) forward type match ForwardType::from_i32(invoke.forward_type).unwrap() { // Panics in case of unknown (undescribed in protobuf file) forward type
ForwardType::ForwardLocal => self.fw_default(user_id, metadata, invoke), ForwardType::ForwardLocal => self.fw_default(user_id, metadata, invoke),
ForwardType::ForwardToAll => self.fw_to_all(user_id, metadata, invoke), ForwardType::ForwardToAll => self.fw_to_all(user_id, metadata, invoke),
@ -132,12 +133,12 @@ impl EntitySubsystem {
} }
} }
fn fw_default(&self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { fn fw_default(&mut self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) {
// TODO: this handler is just a stub! // TODO: this handler is just a stub!
println!("Unhandled CIN forward: {:?}", invoke); println!("Unhandled CIN forward: {:?}", invoke);
} }
fn fw_to_all(&self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) { fn fw_to_all(&mut self, user_id: u32, metadata: &proto::PacketHead, invoke: &proto::CombatInvokeEntry) {
// TODO: this handler sends data only back to the user itself for now! // TODO: this handler sends data only back to the user itself for now!
build_and_send!(self, user_id, metadata, CombatInvocationsNotify { build_and_send!(self, user_id, metadata, CombatInvocationsNotify {
invoke_list: vec![invoke.clone()], invoke_list: vec![invoke.clone()],

View File

@ -1,27 +1,28 @@
use std::sync::{Arc, mpsc}; use std::sync::{Arc, mpsc};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use crate::{DatabaseManager, JsonManager}; use crate::{DatabaseManager, JsonManager};
#[macro_use] #[macro_use]
use packet_processor::*; use packet_processor::*;
use crate::node::NodeConfig;
pub struct InventorySubsystem { pub struct InventorySubsystem {
packets_to_send_tx: mpsc::Sender<IpcMessage>, packets_to_send_tx: PushSocket,
db: Arc<DatabaseManager>, db: Arc<DatabaseManager>,
jm: Arc<JsonManager>, jm: Arc<JsonManager>,
} }
impl InventorySubsystem { impl InventorySubsystem {
pub fn new(jm: Arc<JsonManager>, db: Arc<DatabaseManager>, packets_to_send_tx: mpsc::Sender<IpcMessage>) -> Self { pub fn new(jm: Arc<JsonManager>, db: Arc<DatabaseManager>, node_config: &NodeConfig) -> Self {
Self { Self {
packets_to_send_tx: packets_to_send_tx.clone(), packets_to_send_tx: node_config.connect_out_queue().unwrap(),
db: db.clone(), db: db.clone(),
jm: jm.clone(), jm: jm.clone(),
} }
} }
pub fn add_item(&self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType, inform_user: bool) { pub fn add_item(&mut self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType, inform_user: bool) {
let (item, is_new) = if self.jm.is_item_weapon(item_id) || self.jm.is_item_reliquary(item_id) { let (item, is_new) = if self.jm.is_item_weapon(item_id) || self.jm.is_item_reliquary(item_id) {
assert!(count == 1); assert!(count == 1);
(self.db.add_equip(user_id, item_id).unwrap(), false) // TODO: is new equip considered a new item? (self.db.add_equip(user_id, item_id).unwrap(), false) // TODO: is new equip considered a new item?
@ -48,7 +49,7 @@ impl InventorySubsystem {
}); });
} }
pub fn sub_item(&self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType) { pub fn sub_item(&mut self, user_id: u32, metadata: &proto::PacketHead, item_id: u32, count: u32, reason: &proto::ActionReasonType) {
let old_amount = self.db.get_item_count_by_item_id(user_id, item_id); let old_amount = self.db.get_item_count_by_item_id(user_id, item_id);
assert!(old_amount >= count); assert!(old_amount >= count);

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use prost::Message; use prost::Message;
@ -15,19 +15,20 @@ use packet_processor_macro::*;
use packet_processor::*; use packet_processor::*;
use serde_json::de::Read; use serde_json::de::Read;
use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::{DatabaseManager, JsonManager, LuaManager};
use crate::node::NodeConfig;
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
#[packet_processor( #[packet_processor(
NpcTalkReq, NpcTalkReq,
)] )]
pub struct NpcSubsystem { pub struct NpcSubsystem {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
} }
impl NpcSubsystem { impl NpcSubsystem {
pub fn new(packets_to_send_tx: Sender<IpcMessage>) -> Self { pub fn new(node_config: &NodeConfig) -> Self {
let mut nt = Self { let mut nt = Self {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
}; };

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use prost::Message; use prost::Message;
@ -15,19 +15,20 @@ use packet_processor_macro::*;
use packet_processor::*; use packet_processor::*;
use serde_json::de::Read; use serde_json::de::Read;
use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::{DatabaseManager, JsonManager, LuaManager};
use crate::node::NodeConfig;
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
#[packet_processor( #[packet_processor(
PlayerSetPauseReq, PlayerSetPauseReq,
)] )]
pub struct PauseSubsystem { pub struct PauseSubsystem {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
} }
impl PauseSubsystem { impl PauseSubsystem {
pub fn new(packets_to_send_tx: Sender<IpcMessage>) -> Self { pub fn new(node_config: &NodeConfig) -> Self {
let mut ps = Self { let mut ps = Self {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
}; };

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use prost::Message; use prost::Message;
@ -15,6 +15,7 @@ use packet_processor_macro::*;
use packet_processor::*; use packet_processor::*;
use serde_json::de::Read; use serde_json::de::Read;
use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::{DatabaseManager, JsonManager, LuaManager};
use crate::node::NodeConfig;
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
#[packet_processor( #[packet_processor(
@ -22,14 +23,14 @@ GetSceneAreaReq,
GetScenePointReq, GetScenePointReq,
)] )]
pub struct SceneSubsystem { pub struct SceneSubsystem {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
db: Arc<DatabaseManager>, db: Arc<DatabaseManager>,
} }
impl SceneSubsystem { impl SceneSubsystem {
pub fn new(db: Arc<DatabaseManager>, packets_to_send_tx: Sender<IpcMessage>) -> Self { pub fn new(db: Arc<DatabaseManager>, node_config: &NodeConfig) -> Self {
let mut scs = Self { let mut scs = Self {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
db: db, db: db,
}; };
@ -56,11 +57,11 @@ impl SceneSubsystem {
rsp.scene_id = scene_id; rsp.scene_id = scene_id;
// TODO: implemented but for the sake of debugging we hardcode it for now // TODO: implemented but for the sake of debugging we hardcode it for now
rsp.unlocked_point_list = (1..300).collect(); rsp.unlocked_point_list = (1..500).collect();
//rsp.unlocked_point_list = self.db.get_scene_trans_points(user_id, scene_id); //rsp.unlocked_point_list = self.db.get_scene_trans_points(user_id, scene_id);
// TODO: hardcoded data! // TODO: hardcoded data!
rsp.unlock_area_list = (1..20).collect(); rsp.unlock_area_list = (1..50).collect();
//locked_point_list=vec![]; //locked_point_list=vec![];
} }

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use prost::Message; use prost::Message;
@ -15,6 +15,7 @@ use packet_processor_macro::*;
use packet_processor::*; use packet_processor::*;
use serde_json::de::Read; use serde_json::de::Read;
use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::{DatabaseManager, JsonManager, LuaManager};
use crate::node::NodeConfig;
use crate::subsystems::InventorySubsystem; use crate::subsystems::InventorySubsystem;
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
@ -23,20 +24,20 @@ GetShopReq,
BuyGoodsReq, BuyGoodsReq,
)] )]
pub struct ShopSubsystem { pub struct ShopSubsystem {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
json_manager: Arc<JsonManager>, json_manager: Arc<JsonManager>,
db_manager: Arc<DatabaseManager>, db_manager: Arc<DatabaseManager>,
inventory: Arc<InventorySubsystem>, inventory: Mutex<InventorySubsystem>,
} }
impl ShopSubsystem { impl ShopSubsystem {
pub fn new(jm: Arc<JsonManager>, db: Arc<DatabaseManager>, inv: Arc<InventorySubsystem>, packets_to_send_tx: Sender<IpcMessage>) -> Self { pub fn new(jm: Arc<JsonManager>, db: Arc<DatabaseManager>, inv: Mutex<InventorySubsystem>, node_config: &NodeConfig) -> Self {
let mut ss = Self { let mut ss = Self {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
json_manager: jm.clone(), json_manager: jm.clone(),
db_manager: db.clone(), db_manager: db.clone(),
inventory: inv.clone(), inventory: inv,
}; };
ss.register(); ss.register();
@ -115,7 +116,7 @@ impl ShopSubsystem {
})); }));
} }
fn process_buy_goods(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::BuyGoodsReq, rsp: &mut proto::BuyGoodsRsp) { fn process_buy_goods(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::BuyGoodsReq, rsp: &mut proto::BuyGoodsRsp) {
// Buying goods can produce the following packets: // Buying goods can produce the following packets:
// 1) Response packet // 1) Response packet
// 2) AddHintNotify (to show nice graphical image to user) // 2) AddHintNotify (to show nice graphical image to user)
@ -143,7 +144,7 @@ impl ShopSubsystem {
let total_count = goods_item.count * req.buy_count; let total_count = goods_item.count * req.buy_count;
// Ok, now add item to user's inventory and show nice graphical hint // Ok, now add item to user's inventory and show nice graphical hint
self.inventory.add_item(user_id, metadata, goods_item.item_id, total_count, &proto::ActionReasonType::ActionReasonShop, true); self.inventory.lock().unwrap().add_item(user_id, metadata, goods_item.item_id, total_count, &proto::ActionReasonType::ActionReasonShop, true);
// Tell the client to update / delete currency used // Tell the client to update / delete currency used

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use chrono::Datelike; use chrono::Datelike;
@ -17,6 +17,7 @@ use packet_processor_macro::*;
use packet_processor::*; use packet_processor::*;
use serde_json::de::Read; use serde_json::de::Read;
use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::{DatabaseManager, JsonManager, LuaManager};
use crate::node::NodeConfig;
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
#[packet_processor( #[packet_processor(
@ -25,14 +26,14 @@ GetPlayerFriendListReq,
GetPlayerSocialDetailReq, GetPlayerSocialDetailReq,
)] )]
pub struct SocialSubsystem { pub struct SocialSubsystem {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
db: Arc<DatabaseManager>, db: Arc<DatabaseManager>,
} }
impl SocialSubsystem { impl SocialSubsystem {
pub fn new(db: Arc<DatabaseManager>, packets_to_send_tx: Sender<IpcMessage>) -> Self { pub fn new(db: Arc<DatabaseManager>, node_config: &NodeConfig) -> Self {
let mut socs = Self { let mut socs = Self {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
db: db.clone(), db: db.clone(),
}; };

View File

@ -3,7 +3,7 @@ use std::thread;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::hash_map::Entry::{Occupied, Vacant};
use crate::server::IpcMessage; use rs_ipc::{IpcMessage, PushSocket};
use prost::Message; use prost::Message;
@ -17,6 +17,7 @@ use serde_json::de::Read;
use crate::{DatabaseManager, JsonManager, LuaManager}; use crate::{DatabaseManager, JsonManager, LuaManager};
use crate::entitymanager::EntityManager; use crate::entitymanager::EntityManager;
use crate::luamanager::Vector; use crate::luamanager::Vector;
use crate::node::NodeConfig;
use crate::utils::{IdManager, TimeManager}; use crate::utils::{IdManager, TimeManager};
#[packet_processor( #[packet_processor(
@ -24,16 +25,16 @@ SceneTransToPointReq,
UnlockTransPointReq, UnlockTransPointReq,
)] )]
pub struct TeleportSubsystem { pub struct TeleportSubsystem {
packets_to_send_tx: Sender<IpcMessage>, packets_to_send_tx: PushSocket,
jm: Arc<JsonManager>, jm: Arc<JsonManager>,
em: Arc<EntityManager>, em: Arc<EntityManager>,
db: Arc<DatabaseManager> db: Arc<DatabaseManager>
} }
impl TeleportSubsystem { impl TeleportSubsystem {
pub fn new(jm: Arc<JsonManager>, db: Arc<DatabaseManager>, em: Arc<EntityManager>, packets_to_send_tx: Sender<IpcMessage>) -> Self { pub fn new(jm: Arc<JsonManager>, db: Arc<DatabaseManager>, em: Arc<EntityManager>, node_config: &NodeConfig) -> Self {
let mut nt = Self { let mut nt = Self {
packets_to_send_tx: packets_to_send_tx, packets_to_send_tx: node_config.connect_out_queue().unwrap(),
packet_callbacks: HashMap::new(), packet_callbacks: HashMap::new(),
jm: jm, jm: jm,
em: em, em: em,
@ -77,7 +78,7 @@ impl TeleportSubsystem {
self.em.player_teleported(user_id, pos, s_id, scene_info.scene_token, &proto::EnterType::EnterGoto); self.em.player_teleported(user_id, pos, s_id, scene_info.scene_token, &proto::EnterType::EnterGoto);
} }
pub fn process_unlock_trans_point(&self, user_id: u32, metadata: &proto::PacketHead, req: &proto::UnlockTransPointReq, rsp: &mut proto::UnlockTransPointRsp) { pub fn process_unlock_trans_point(&mut self, user_id: u32, metadata: &proto::PacketHead, req: &proto::UnlockTransPointReq, rsp: &mut proto::UnlockTransPointRsp) {
let scene_id = req.scene_id; let scene_id = req.scene_id;
let point_id = req.point_id; let point_id = req.point_id;