Use destroy_on_scheduler in WebhookActor.

This commit is contained in:
levlam 2022-11-27 01:31:00 +03:00
parent a016d13c15
commit ed0532bcf7
6 changed files with 37 additions and 30 deletions

View File

@ -187,9 +187,9 @@ Client::Client(td::ActorShared<> parent, const td::string &bot_token, bool is_te
} }
Client::~Client() { Client::~Client() {
td::Scheduler::instance()->destroy_on_scheduler(get_file_gc_scheduler_id(), messages_, users_, groups_, supergroups_, td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), messages_, users_, groups_,
chats_, reply_message_ids_, yet_unsent_reply_message_ids_, supergroups_, chats_, reply_message_ids_,
sticker_set_names_); yet_unsent_reply_message_ids_, sticker_set_names_);
} }
bool Client::init_methods() { bool Client::init_methods() {
@ -4833,7 +4833,7 @@ void Client::on_update(object_ptr<td_api::Object> result) {
deleted_messages.push_back(std::move(deleted_message)); deleted_messages.push_back(std::move(deleted_message));
} }
} }
td::Scheduler::instance()->destroy_on_scheduler(get_file_gc_scheduler_id(), deleted_messages); td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), deleted_messages);
break; break;
} }
case td_api::updateFile::ID: { case td_api::updateFile::ID: {
@ -5168,7 +5168,7 @@ void Client::on_closed() {
if (logging_out_) { if (logging_out_) {
parameters_->shared_data_->webhook_db_->erase(bot_token_with_dc_); parameters_->shared_data_->webhook_db_->erase(bot_token_with_dc_);
td::Scheduler::instance()->run_on_scheduler(get_file_gc_scheduler_id(), td::Scheduler::instance()->run_on_scheduler(SharedData::get_file_gc_scheduler_id(),
[actor_id = actor_id(this), dir = dir_](td::Unit) { [actor_id = actor_id(this), dir = dir_](td::Unit) {
CHECK(dir.size() >= 24); CHECK(dir.size() >= 24);
CHECK(dir.back() == TD_DIR_SLASH); CHECK(dir.back() == TD_DIR_SLASH);
@ -5194,16 +5194,6 @@ void Client::timeout_expired() {
stop(); stop();
} }
td::int32 Client::get_database_scheduler_id() {
// the same scheduler as for database in Td
return 1;
}
td::int32 Client::get_file_gc_scheduler_id() {
// the same scheduler as for file GC in Td
return 2;
}
void Client::clear_tqueue() { void Client::clear_tqueue() {
CHECK(webhook_id_.empty()); CHECK(webhook_id_.empty());
auto &tqueue = parameters_->shared_data_->tqueue_; auto &tqueue = parameters_->shared_data_->tqueue_;
@ -9034,7 +9024,7 @@ void Client::webhook_error(Status status) {
void Client::webhook_closed(Status status) { void Client::webhook_closed(Status status) {
if (has_webhook_certificate_) { if (has_webhook_certificate_) {
td::Scheduler::instance()->run_on_scheduler(get_database_scheduler_id(), td::Scheduler::instance()->run_on_scheduler(SharedData::get_database_scheduler_id(),
[actor_id = actor_id(this), path = get_webhook_certificate_path(), [actor_id = actor_id(this), path = get_webhook_certificate_path(),
status = std::move(status)](td::Unit) mutable { status = std::move(status)](td::Unit) mutable {
LOG(INFO) << "Unlink certificate " << path; LOG(INFO) << "Unlink certificate " << path;
@ -9147,8 +9137,9 @@ void Client::do_set_webhook(PromisedQueryPtr query, bool was_deleted) {
CHECK(!webhook_set_query_); CHECK(!webhook_set_query_);
active_webhook_set_query_ = std::move(query); active_webhook_set_query_ = std::move(query);
td::Scheduler::instance()->run_on_scheduler( td::Scheduler::instance()->run_on_scheduler(
get_database_scheduler_id(), [actor_id = actor_id(this), from_path = cert_file_ptr->temp_file_name, SharedData::get_database_scheduler_id(),
to_path = get_webhook_certificate_path(), size](td::Unit) mutable { [actor_id = actor_id(this), from_path = cert_file_ptr->temp_file_name,
to_path = get_webhook_certificate_path(), size](td::Unit) mutable {
LOG(INFO) << "Copy certificate to " << to_path; LOG(INFO) << "Copy certificate to " << to_path;
auto status = td::copy_file(from_path, to_path, size); auto status = td::copy_file(from_path, to_path, size);
send_closure(actor_id, &Client::on_webhook_certificate_copied, std::move(status)); send_closure(actor_id, &Client::on_webhook_certificate_copied, std::move(status));

View File

@ -320,10 +320,6 @@ class Client final : public WebhookActor::Callback {
void on_closed(); void on_closed();
void finish_closing(); void finish_closing();
static int32 get_database_scheduler_id();
static int32 get_file_gc_scheduler_id();
void clear_tqueue(); void clear_tqueue();
bool allow_update_before_authorization(const td_api::Object *update) const; bool allow_update_before_authorization(const td_api::Object *update) const;

View File

@ -236,7 +236,7 @@ void ClientManager::get_stats(td::Promise<td::BufferSlice> promise,
} }
sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n'; sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n';
sb << "active_webhook_connections\t" << WebhookActor::get_total_connections_count() << '\n'; sb << "active_webhook_connections\t" << WebhookActor::get_total_connection_count() << '\n';
sb << "active_requests\t" << parameters_->shared_data_->query_count_.load(std::memory_order_relaxed) << '\n'; sb << "active_requests\t" << parameters_->shared_data_->query_count_.load(std::memory_order_relaxed) << '\n';
sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n'; sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n';
auto stats = stat_.as_vector(now); auto stats = stat_.as_vector(now);

View File

@ -52,6 +52,16 @@ struct SharedData {
} }
return static_cast<td::int32>(result); return static_cast<td::int32>(result);
} }
static td::int32 get_database_scheduler_id() {
// the same scheduler as for database in Td
return 1;
}
static td::int32 get_file_gc_scheduler_id() {
// the same scheduler as for file GC in Td
return 2;
}
}; };
struct ClientParameters { struct ClientParameters {

View File

@ -35,7 +35,7 @@ namespace telegram_bot_api {
static int VERBOSITY_NAME(webhook) = VERBOSITY_NAME(DEBUG); static int VERBOSITY_NAME(webhook) = VERBOSITY_NAME(DEBUG);
std::atomic<td::uint64> WebhookActor::total_connections_count_{0}; std::atomic<td::uint64> WebhookActor::total_connection_count_{0};
WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url, WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url,
td::string cert_path, td::int32 max_connections, bool from_db_flag, td::string cert_path, td::int32 max_connections, bool from_db_flag,
@ -73,6 +73,11 @@ WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_
<< ", max_connections = " << max_connections_; << ", max_connections = " << max_connections_;
} }
WebhookActor::~WebhookActor() {
td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), update_map_, queue_updates_,
queues_);
}
void WebhookActor::relax_wakeup_at(double wakeup_at, const char *source) { void WebhookActor::relax_wakeup_at(double wakeup_at, const char *source) {
if (wakeup_at_ == 0 || wakeup_at < wakeup_at_) { if (wakeup_at_ == 0 || wakeup_at < wakeup_at_) {
VLOG(webhook) << "Wake up in " << wakeup_at - td::Time::now() << " from " << source; VLOG(webhook) << "Wake up in " << wakeup_at - td::Time::now() << " from " << source;
@ -232,7 +237,7 @@ td::Status WebhookActor::create_connection(td::BufferedFd<td::SocketFd> fd) {
conn->event_id_ = {}; conn->event_id_ = {};
conn->id_ = id; conn->id_ = id;
ready_connections_.put(conn->to_list_node()); ready_connections_.put(conn->to_list_node());
total_connections_count_.fetch_add(1, std::memory_order_relaxed); total_connection_count_.fetch_add(1, std::memory_order_relaxed);
if (!was_checked_) { if (!was_checked_) {
was_checked_ = true; was_checked_ = true;
@ -657,7 +662,7 @@ void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
if (need_close || close_connection) { if (need_close || close_connection) {
VLOG(webhook) << "Close connection " << connection_id; VLOG(webhook) << "Close connection " << connection_id;
connections_.erase(connection_ptr->id_); connections_.erase(connection_ptr->id_);
total_connections_count_.fetch_sub(1, std::memory_order_relaxed); total_connection_count_.fetch_sub(1, std::memory_order_relaxed);
} else { } else {
ready_connections_.put(connection_ptr->to_list_node()); ready_connections_.put(connection_ptr->to_list_node());
} }
@ -725,7 +730,7 @@ void WebhookActor::close() {
} }
void WebhookActor::tear_down() { void WebhookActor::tear_down() {
total_connections_count_.fetch_sub(connections_.size(), std::memory_order_relaxed); total_connection_count_.fetch_sub(connections_.size(), std::memory_order_relaxed);
} }
void WebhookActor::on_webhook_verified() { void WebhookActor::on_webhook_verified() {

View File

@ -55,13 +55,18 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url, td::string cert_path, WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_id, td::HttpUrl url, td::string cert_path,
td::int32 max_connections, bool from_db_flag, td::string cached_ip_address, bool fix_ip_address, td::int32 max_connections, bool from_db_flag, td::string cached_ip_address, bool fix_ip_address,
td::string secret_token, std::shared_ptr<const ClientParameters> parameters); td::string secret_token, std::shared_ptr<const ClientParameters> parameters);
WebhookActor(const WebhookActor &) = delete;
WebhookActor &operator=(const WebhookActor &) = delete;
WebhookActor(WebhookActor &&) = delete;
WebhookActor &operator=(WebhookActor &&) = delete;
~WebhookActor();
void update(); void update();
void close(); void close();
static td::int64 get_total_connections_count() { static td::int64 get_total_connection_count() {
return total_connections_count_; return total_connection_count_;
} }
private: private:
@ -70,7 +75,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
static constexpr int WEBHOOK_MAX_RESEND_TIMEOUT = 60; static constexpr int WEBHOOK_MAX_RESEND_TIMEOUT = 60;
static constexpr int WEBHOOK_DROP_TIMEOUT = 60 * 60 * 23; static constexpr int WEBHOOK_DROP_TIMEOUT = 60 * 60 * 23;
static std::atomic<td::uint64> total_connections_count_; static std::atomic<td::uint64> total_connection_count_;
td::ActorShared<Callback> callback_; td::ActorShared<Callback> callback_;
td::int64 tqueue_id_; td::int64 tqueue_id_;