Move ClientManager to a separate thread.

This commit is contained in:
levlam 2022-10-04 18:27:50 +03:00
parent 2cca516445
commit 04825c4b70
6 changed files with 27 additions and 27 deletions

View File

@ -3893,8 +3893,8 @@ void Client::start_up() {
}; };
td::ClientActor::Options options; td::ClientActor::Options options;
options.net_query_stats = parameters_->net_query_stats_; options.net_query_stats = parameters_->net_query_stats_;
td_client_ = td::create_actor<td::ClientActor>("TdClientActor", td::make_unique<TdCallback>(actor_id(this)), td_client_ = td::create_actor_on_scheduler<td::ClientActor>(
std::move(options)); "TdClientActor", 0, td::make_unique<TdCallback>(actor_id(this)), std::move(options));
} }
void Client::send(PromisedQueryPtr query) { void Client::send(PromisedQueryPtr query) {
@ -4993,16 +4993,12 @@ void Client::timeout_expired() {
td::int32 Client::get_database_scheduler_id() { td::int32 Client::get_database_scheduler_id() {
// the same scheduler as for database in Td // the same scheduler as for database in Td
auto current_scheduler_id = td::Scheduler::instance()->sched_id(); return 1;
auto scheduler_count = td::Scheduler::instance()->sched_count();
return td::min(current_scheduler_id + 1, scheduler_count - 1);
} }
td::int32 Client::get_file_gc_scheduler_id() { td::int32 Client::get_file_gc_scheduler_id() {
// the same scheduler as for file GC in Td // the same scheduler as for file GC in Td
auto current_scheduler_id = td::Scheduler::instance()->sched_id(); return 2;
auto scheduler_count = td::Scheduler::instance()->sched_count();
return td::min(current_scheduler_id + 2, scheduler_count - 1);
} }
void Client::clear_tqueue() { void Client::clear_tqueue() {

View File

@ -232,7 +232,7 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n'; sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n';
sb << "active_webhook_connections\t" << WebhookActor::get_total_connections_count() << '\n'; sb << "active_webhook_connections\t" << WebhookActor::get_total_connections_count() << '\n';
sb << "active_requests\t" << parameters_->shared_data_->query_count_.load() << '\n'; sb << "active_requests\t" << parameters_->shared_data_->query_count_.load(std::memory_order_relaxed) << '\n';
sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n'; sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n';
auto stats = stat_.as_vector(now); auto stats = stat_.as_vector(now);
for (auto &stat : stats) { for (auto &stat : stats) {
@ -294,9 +294,7 @@ td::int64 ClientManager::get_tqueue_id(td::int64 user_id, bool is_test_dc) {
void ClientManager::start_up() { void ClientManager::start_up() {
//NB: the same scheduler as for database in Td //NB: the same scheduler as for database in Td
auto current_scheduler_id = td::Scheduler::instance()->sched_id(); auto scheduler_id = 1;
auto scheduler_count = td::Scheduler::instance()->sched_count();
auto scheduler_id = td::min(current_scheduler_id + 1, scheduler_count - 1);
// init tqueue // init tqueue
{ {

View File

@ -29,10 +29,10 @@ namespace telegram_bot_api {
struct SharedData { struct SharedData {
std::atomic<td::uint64> query_count_{0}; std::atomic<td::uint64> query_count_{0};
std::atomic<size_t> query_list_size_{0};
std::atomic<int> next_verbosity_level_{-1}; std::atomic<int> next_verbosity_level_{-1};
// not thread-safe // not thread-safe, must be used from a single thread
size_t query_list_size_ = 0;
td::ListNode query_list_; td::ListNode query_list_;
td::unique_ptr<td::KeyValueSyncInterface> webhook_db_; td::unique_ptr<td::KeyValueSyncInterface> webhook_db_;
td::unique_ptr<td::TQueue> tqueue_; td::unique_ptr<td::TQueue> tqueue_;

View File

@ -45,9 +45,9 @@ Query::Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_t
start_timestamp_ = td::Time::now(); start_timestamp_ = td::Time::now();
LOG(INFO) << "QUERY: create " << td::tag("ptr", this) << *this; LOG(INFO) << "QUERY: create " << td::tag("ptr", this) << *this;
if (shared_data_) { if (shared_data_) {
shared_data_->query_count_++; shared_data_->query_count_.fetch_add(1, std::memory_order_relaxed);
if (method_ != "getupdates") { if (method_ != "getupdates") {
shared_data_->query_list_size_++; shared_data_->query_list_size_.fetch_add(1, std::memory_order_relaxed);
shared_data_->query_list_.put(this); shared_data_->query_list_.put(this);
} }
} }

View File

@ -109,9 +109,9 @@ class Query final : public td::ListNode {
Query &operator=(Query &&) = delete; Query &operator=(Query &&) = delete;
~Query() { ~Query() {
if (shared_data_) { if (shared_data_) {
shared_data_->query_count_--; shared_data_->query_count_.fetch_sub(1, std::memory_order_relaxed);
if (!empty()) { if (!empty()) {
shared_data_->query_list_size_--; shared_data_->query_list_size_.fetch_sub(1, std::memory_order_relaxed);
} }
} }
} }

View File

@ -158,10 +158,10 @@ static void dump_statistics(const std::shared_ptr<SharedData> &shared_data,
LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem())); LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem()));
LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size())); LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size()));
auto query_list_size = shared_data->query_list_size_; auto query_list_size = shared_data->query_list_size_.load(std::memory_order_relaxed);
auto query_count = shared_data->query_count_.load(); auto query_count = shared_data->query_count_.load(std::memory_order_relaxed);
LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size); LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size);
/*
td::uint64 i = 0; td::uint64 i = 0;
bool was_gap = false; bool was_gap = false;
for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) { for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) {
@ -175,7 +175,7 @@ static void dump_statistics(const std::shared_ptr<SharedData> &shared_data,
was_gap = true; was_gap = true;
} }
} }
*/
td::dump_pending_network_queries(*net_query_stats); td::dump_pending_network_queries(*net_query_stats);
} }
@ -504,7 +504,12 @@ int main(int argc, char *argv[]) {
// << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started"; // << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started";
LOG(WARNING) << "Bot API " << parameters->version_ << " server started"; LOG(WARNING) << "Bot API " << parameters->version_ << " server started";
const int thread_count = 6; // +3 for Td, one for watchdog, one for slow HTTP connections, one for DNS resolving // +3 threads for Td
// one thread for ClientManager and all Clients
// one thread for watchdog
// one thread for slow HTTP connections
// one thread for DNS resolving
const int thread_count = 7;
td::ConcurrentScheduler sched(thread_count, cpu_affinity); td::ConcurrentScheduler sched(thread_count, cpu_affinity);
td::GetHostByNameActor::Options get_host_by_name_options; td::GetHostByNameActor::Options get_host_by_name_options;
@ -514,11 +519,12 @@ int main(int argc, char *argv[]) {
.release(); .release();
auto client_manager = auto client_manager =
sched.create_actor_unsafe<ClientManager>(0, "ClientManager", std::move(parameters), token_range).release(); sched.create_actor_unsafe<ClientManager>(thread_count - 3, "ClientManager", std::move(parameters), token_range)
.release();
sched sched
.create_actor_unsafe<HttpServer>( .create_actor_unsafe<HttpServer>(
0, "HttpServer", http_ip_address, http_port, thread_count - 3, "HttpServer", http_ip_address, http_port,
[client_manager, shared_data] { [client_manager, shared_data] {
return td::ActorOwn<td::HttpInboundConnection::Callback>( return td::ActorOwn<td::HttpInboundConnection::Callback>(
td::create_actor<HttpConnection>("HttpConnection", client_manager, shared_data)); td::create_actor<HttpConnection>("HttpConnection", client_manager, shared_data));
@ -528,7 +534,7 @@ int main(int argc, char *argv[]) {
if (http_stat_port != 0) { if (http_stat_port != 0) {
sched sched
.create_actor_unsafe<HttpServer>( .create_actor_unsafe<HttpServer>(
0, "HttpStatsServer", http_stat_ip_address, http_stat_port, thread_count - 3, "HttpStatsServer", http_stat_ip_address, http_stat_port,
[client_manager] { [client_manager] {
return td::ActorOwn<td::HttpInboundConnection::Callback>( return td::ActorOwn<td::HttpInboundConnection::Callback>(
td::create_actor<HttpStatConnection>("HttpStatConnection", client_manager)); td::create_actor<HttpStatConnection>("HttpStatConnection", client_manager));
@ -613,7 +619,7 @@ int main(int argc, char *argv[]) {
next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 2; next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 2;
} }
if (now > last_tqueue_gc_time + 60.0) { if (now > last_tqueue_gc_time + 60.0 && false) {
auto unix_time = shared_data->get_unix_time(now); auto unix_time = shared_data->get_unix_time(now);
LOG(INFO) << "Run TQueue GC at " << unix_time; LOG(INFO) << "Run TQueue GC at " << unix_time;
last_tqueue_gc_time = now; last_tqueue_gc_time = now;