diff --git a/telegram-bot-api/Client.cpp b/telegram-bot-api/Client.cpp index efc40af..6a34008 100644 --- a/telegram-bot-api/Client.cpp +++ b/telegram-bot-api/Client.cpp @@ -3893,8 +3893,8 @@ void Client::start_up() { }; td::ClientActor::Options options; options.net_query_stats = parameters_->net_query_stats_; - td_client_ = td::create_actor("TdClientActor", td::make_unique(actor_id(this)), - std::move(options)); + td_client_ = td::create_actor_on_scheduler( + "TdClientActor", 0, td::make_unique(actor_id(this)), std::move(options)); } void Client::send(PromisedQueryPtr query) { @@ -4993,16 +4993,12 @@ void Client::timeout_expired() { td::int32 Client::get_database_scheduler_id() { // the same scheduler as for database in Td - auto current_scheduler_id = td::Scheduler::instance()->sched_id(); - auto scheduler_count = td::Scheduler::instance()->sched_count(); - return td::min(current_scheduler_id + 1, scheduler_count - 1); + return 1; } td::int32 Client::get_file_gc_scheduler_id() { // the same scheduler as for file GC in Td - auto current_scheduler_id = td::Scheduler::instance()->sched_id(); - auto scheduler_count = td::Scheduler::instance()->sched_count(); - return td::min(current_scheduler_id + 2, scheduler_count - 1); + return 2; } void Client::clear_tqueue() { diff --git a/telegram-bot-api/ClientManager.cpp b/telegram-bot-api/ClientManager.cpp index bbc7ecf..b7174e1 100644 --- a/telegram-bot-api/ClientManager.cpp +++ b/telegram-bot-api/ClientManager.cpp @@ -232,7 +232,7 @@ void ClientManager::get_stats(td::Promise promise, sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n'; sb << "active_webhook_connections\t" << WebhookActor::get_total_connections_count() << '\n'; - sb << "active_requests\t" << parameters_->shared_data_->query_count_.load() << '\n'; + sb << "active_requests\t" << parameters_->shared_data_->query_count_.load(std::memory_order_relaxed) << '\n'; sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n'; auto stats = stat_.as_vector(now); for (auto &stat : stats) { @@ -294,9 +294,7 @@ td::int64 ClientManager::get_tqueue_id(td::int64 user_id, bool is_test_dc) { void ClientManager::start_up() { //NB: the same scheduler as for database in Td - auto current_scheduler_id = td::Scheduler::instance()->sched_id(); - auto scheduler_count = td::Scheduler::instance()->sched_count(); - auto scheduler_id = td::min(current_scheduler_id + 1, scheduler_count - 1); + auto scheduler_id = 1; // init tqueue { diff --git a/telegram-bot-api/ClientParameters.h b/telegram-bot-api/ClientParameters.h index 455e72f..b4d8f93 100644 --- a/telegram-bot-api/ClientParameters.h +++ b/telegram-bot-api/ClientParameters.h @@ -29,10 +29,10 @@ namespace telegram_bot_api { struct SharedData { std::atomic query_count_{0}; + std::atomic query_list_size_{0}; std::atomic next_verbosity_level_{-1}; - // not thread-safe - size_t query_list_size_ = 0; + // not thread-safe, must be used from a single thread td::ListNode query_list_; td::unique_ptr webhook_db_; td::unique_ptr tqueue_; diff --git a/telegram-bot-api/Query.cpp b/telegram-bot-api/Query.cpp index 32ff8b4..e917ba3 100644 --- a/telegram-bot-api/Query.cpp +++ b/telegram-bot-api/Query.cpp @@ -45,9 +45,9 @@ Query::Query(td::vector &&container, td::Slice token, bool is_t start_timestamp_ = td::Time::now(); LOG(INFO) << "QUERY: create " << td::tag("ptr", this) << *this; if (shared_data_) { - shared_data_->query_count_++; + shared_data_->query_count_.fetch_add(1, std::memory_order_relaxed); if (method_ != "getupdates") { - shared_data_->query_list_size_++; + shared_data_->query_list_size_.fetch_add(1, std::memory_order_relaxed); shared_data_->query_list_.put(this); } } diff --git a/telegram-bot-api/Query.h b/telegram-bot-api/Query.h index c315fd5..ca67fa3 100644 --- a/telegram-bot-api/Query.h +++ b/telegram-bot-api/Query.h @@ -109,9 +109,9 @@ class Query final : public td::ListNode { Query &operator=(Query &&) = delete; ~Query() { if (shared_data_) { - shared_data_->query_count_--; + shared_data_->query_count_.fetch_sub(1, std::memory_order_relaxed); if (!empty()) { - shared_data_->query_list_size_--; + shared_data_->query_list_size_.fetch_sub(1, std::memory_order_relaxed); } } } diff --git a/telegram-bot-api/telegram-bot-api.cpp b/telegram-bot-api/telegram-bot-api.cpp index ac91d78..0737655 100644 --- a/telegram-bot-api/telegram-bot-api.cpp +++ b/telegram-bot-api/telegram-bot-api.cpp @@ -158,10 +158,10 @@ static void dump_statistics(const std::shared_ptr &shared_data, LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem())); LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size())); - auto query_list_size = shared_data->query_list_size_; - auto query_count = shared_data->query_count_.load(); + auto query_list_size = shared_data->query_list_size_.load(std::memory_order_relaxed); + auto query_count = shared_data->query_count_.load(std::memory_order_relaxed); LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size); - +/* td::uint64 i = 0; bool was_gap = false; for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) { @@ -175,7 +175,7 @@ static void dump_statistics(const std::shared_ptr &shared_data, was_gap = true; } } - +*/ td::dump_pending_network_queries(*net_query_stats); } @@ -504,7 +504,12 @@ int main(int argc, char *argv[]) { // << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started"; LOG(WARNING) << "Bot API " << parameters->version_ << " server started"; - const int thread_count = 6; // +3 for Td, one for watchdog, one for slow HTTP connections, one for DNS resolving + // +3 threads for Td + // one thread for ClientManager and all Clients + // one thread for watchdog + // one thread for slow HTTP connections + // one thread for DNS resolving + const int thread_count = 7; td::ConcurrentScheduler sched(thread_count, cpu_affinity); td::GetHostByNameActor::Options get_host_by_name_options; @@ -514,11 +519,12 @@ int main(int argc, char *argv[]) { .release(); auto client_manager = - sched.create_actor_unsafe(0, "ClientManager", std::move(parameters), token_range).release(); + sched.create_actor_unsafe(thread_count - 3, "ClientManager", std::move(parameters), token_range) + .release(); sched .create_actor_unsafe( - 0, "HttpServer", http_ip_address, http_port, + thread_count - 3, "HttpServer", http_ip_address, http_port, [client_manager, shared_data] { return td::ActorOwn( td::create_actor("HttpConnection", client_manager, shared_data)); @@ -528,7 +534,7 @@ int main(int argc, char *argv[]) { if (http_stat_port != 0) { sched .create_actor_unsafe( - 0, "HttpStatsServer", http_stat_ip_address, http_stat_port, + thread_count - 3, "HttpStatsServer", http_stat_ip_address, http_stat_port, [client_manager] { return td::ActorOwn( td::create_actor("HttpStatConnection", client_manager)); @@ -613,7 +619,7 @@ int main(int argc, char *argv[]) { next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 2; } - if (now > last_tqueue_gc_time + 60.0) { + if (now > last_tqueue_gc_time + 60.0 && false) { auto unix_time = shared_data->get_unix_time(now); LOG(INFO) << "Run TQueue GC at " << unix_time; last_tqueue_gc_time = now;