From e9bfc8db9bde332d3db234a45034bb7bd7598b07 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 30 Jun 2022 19:59:30 +0300 Subject: [PATCH] Use LambdaPromise instead of PromiseActor. --- td | 2 +- telegram-bot-api/ClientManager.cpp | 4 +- telegram-bot-api/ClientManager.h | 2 +- telegram-bot-api/HttpConnection.cpp | 17 +++--- telegram-bot-api/HttpConnection.h | 5 +- telegram-bot-api/HttpStatConnection.cpp | 18 +++---- telegram-bot-api/HttpStatConnection.h | 8 +-- telegram-bot-api/Query.h | 8 +-- telegram-bot-api/WebhookActor.cpp | 72 ++++++++++++------------- telegram-bot-api/WebhookActor.h | 3 +- 10 files changed, 65 insertions(+), 74 deletions(-) diff --git a/td b/td index b393215..9a061c3 160000 --- a/td +++ b/td @@ -1 +1 @@ -Subproject commit b393215d6671863b6baf2a589d343cff9474f6ba +Subproject commit 9a061c30c1f1928f12bdb3feb2108a8052b102be diff --git a/telegram-bot-api/ClientManager.cpp b/telegram-bot-api/ClientManager.cpp index 13bf98a..12074c0 100644 --- a/telegram-bot-api/ClientManager.cpp +++ b/telegram-bot-api/ClientManager.cpp @@ -145,7 +145,7 @@ void ClientManager::send(PromisedQueryPtr query) { std::move(query)); // will send 429 if the client is already closed } -void ClientManager::get_stats(td::PromiseActor promise, +void ClientManager::get_stats(td::Promise promise, td::vector> args) { if (close_flag_) { promise.set_value(td::BufferSlice("Closing")); @@ -394,7 +394,7 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, td::S auto query = td::make_unique(std::move(containers), token, is_test_dc, method, std::move(args), td::vector>(), td::vector(), std::move(shared_data), td::IPAddress(), true); - return PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor>())); + return PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise>())); } void ClientManager::raw_event(const td::Event::Raw &event) { diff --git a/telegram-bot-api/ClientManager.h b/telegram-bot-api/ClientManager.h index 1dcc093..9bf30c4 100644 --- a/telegram-bot-api/ClientManager.h +++ b/telegram-bot-api/ClientManager.h @@ -43,7 +43,7 @@ class ClientManager final : public td::Actor { void send(PromisedQueryPtr query); - void get_stats(td::PromiseActor promise, td::vector> args); + void get_stats(td::Promise promise, td::vector> args); void close(td::Promise &&promise); diff --git a/telegram-bot-api/HttpConnection.cpp b/telegram-bot-api/HttpConnection.cpp index 68dcdec..5d718f3 100644 --- a/telegram-bot-api/HttpConnection.cpp +++ b/telegram-bot-api/HttpConnection.cpp @@ -48,22 +48,17 @@ void HttpConnection::handle(td::unique_ptr http_query, std::move(http_query->args_), std::move(http_query->headers_), std::move(http_query->files_), shared_data_, http_query->peer_address_, false); - td::PromiseActor> promise; - td::FutureActor> future; - td::init_promise_future(&promise, &future); - future.set_event(td::EventCreator::yield(actor_id())); + auto promise = td::PromiseCreator::lambda([actor_id = actor_id(this)](td::Result> r_query) { + send_closure(actor_id, &HttpConnection::on_query_finished, std::move(r_query)); + }); auto promised_query = PromisedQueryPtr(query.release(), PromiseDeleter(std::move(promise))); send_closure(client_manager_, &ClientManager::send, std::move(promised_query)); - result_ = std::move(future); } -void HttpConnection::wakeup() { - if (result_.empty()) { - return; - } - LOG_CHECK(result_.is_ok()) << result_.move_as_error(); +void HttpConnection::on_query_finished(td::Result> r_query) { + LOG_CHECK(r_query.is_ok()) << r_query.error(); - auto query = result_.move_as_ok(); + auto query = r_query.move_as_ok(); send_response(query->http_status_code(), std::move(query->answer()), query->retry_after()); } diff --git a/telegram-bot-api/HttpConnection.h b/telegram-bot-api/HttpConnection.h index d631244..c2e1148 100644 --- a/telegram-bot-api/HttpConnection.h +++ b/telegram-bot-api/HttpConnection.h @@ -32,10 +32,7 @@ class HttpConnection final : public td::HttpInboundConnection::Callback { void handle(td::unique_ptr http_query, td::ActorOwn connection) final; - void wakeup() final; - private: - td::FutureActor> result_; td::ActorId client_manager_; td::ActorOwn connection_; std::shared_ptr shared_data_; @@ -45,6 +42,8 @@ class HttpConnection final : public td::HttpInboundConnection::Callback { stop(); } + void on_query_finished(td::Result> r_query); + void send_response(int http_status_code, td::BufferSlice &&content, int retry_after); void send_http_error(int http_status_code, td::Slice description); diff --git a/telegram-bot-api/HttpStatConnection.cpp b/telegram-bot-api/HttpStatConnection.cpp index 9d66aba..21ac451 100644 --- a/telegram-bot-api/HttpStatConnection.cpp +++ b/telegram-bot-api/HttpStatConnection.cpp @@ -18,22 +18,20 @@ void HttpStatConnection::handle(td::unique_ptr http_query, CHECK(connection_->empty()); connection_ = std::move(connection); - td::PromiseActor promise; - td::FutureActor future; - init_promise_future(&promise, &future); - future.set_event(td::EventCreator::yield(actor_id())); - LOG(DEBUG) << "SEND"; + auto promise = td::PromiseCreator::lambda([actor_id = actor_id(this)](td::Result result) { + send_closure(actor_id, &HttpStatConnection::on_result, std::move(result)); + }); send_closure(client_manager_, &ClientManager::get_stats, std::move(promise), http_query->get_args()); - result_ = std::move(future); } -void HttpStatConnection::wakeup() { - if (result_.empty()) { +void HttpStatConnection::on_result(td::Result result) { + if (result.is_error()) { + send_closure(connection_.release(), &td::HttpInboundConnection::write_error, + td::Status::Error(500, "Internal Server Error: closing")); return; } - LOG_CHECK(result_.is_ok()) << result_.move_as_error(); - auto content = result_.move_as_ok(); + auto content = result.move_as_ok(); td::HttpHeaderCreator hc; hc.init_status_line(200); hc.set_keep_alive(); diff --git a/telegram-bot-api/HttpStatConnection.h b/telegram-bot-api/HttpStatConnection.h index 614f5ca..dc1fa63 100644 --- a/telegram-bot-api/HttpStatConnection.h +++ b/telegram-bot-api/HttpStatConnection.h @@ -12,9 +12,9 @@ #include "td/net/HttpQuery.h" #include "td/actor/actor.h" -#include "td/actor/PromiseFuture.h" #include "td/utils/buffer.h" +#include "td/utils/Status.h" namespace telegram_bot_api { @@ -22,15 +22,15 @@ class HttpStatConnection final : public td::HttpInboundConnection::Callback { public: explicit HttpStatConnection(td::ActorId client_manager) : client_manager_(client_manager) { } + void handle(td::unique_ptr http_query, td::ActorOwn connection) final; - void wakeup() final; - private: - td::FutureActor result_; td::ActorId client_manager_; td::ActorOwn connection_; + void on_result(td::Result result); + void hangup() final { connection_.release(); stop(); diff --git a/telegram-bot-api/Query.h b/telegram-bot-api/Query.h index 96bd056..c1df8b5 100644 --- a/telegram-bot-api/Query.h +++ b/telegram-bot-api/Query.h @@ -227,7 +227,7 @@ class JsonQueryError final : public td::Jsonable { class PromiseDeleter { public: - explicit PromiseDeleter(td::PromiseActor> &&promise) : promise_(std::move(promise)) { + explicit PromiseDeleter(td::Promise> &&promise) : promise_(std::move(promise)) { } PromiseDeleter() = default; PromiseDeleter(const PromiseDeleter &) = delete; @@ -236,7 +236,7 @@ class PromiseDeleter { PromiseDeleter &operator=(PromiseDeleter &&) = default; void operator()(Query *raw_ptr) { td::unique_ptr query(raw_ptr); // now I cannot forget to delete this pointer - if (!promise_.empty_promise()) { + if (promise_) { if (!query->is_ready()) { query->set_retry_after_error(5); } @@ -245,11 +245,11 @@ class PromiseDeleter { } } ~PromiseDeleter() { - CHECK(promise_.empty()); + CHECK(!promise_); } private: - td::PromiseActor> promise_; + td::Promise> promise_; }; using PromisedQueryPtr = std::unique_ptr; diff --git a/telegram-bot-api/WebhookActor.cpp b/telegram-bot-api/WebhookActor.cpp index 68b9ac9..095b5cf 100644 --- a/telegram-bot-api/WebhookActor.cpp +++ b/telegram-bot-api/WebhookActor.cpp @@ -82,7 +82,7 @@ void WebhookActor::relax_wakeup_at(double wakeup_at, const char *source) { } void WebhookActor::resolve_ip_address() { - if (fix_ip_address_) { + if (fix_ip_address_ || is_ip_address_being_resolved_) { return; } if (td::Time::now() < next_ip_address_resolve_time_) { @@ -90,43 +90,42 @@ void WebhookActor::resolve_ip_address() { return; } - bool future_created = false; - if (future_ip_address_.empty()) { - td::PromiseActor promise; - init_promise_future(&promise, &future_ip_address_); - future_created = true; - send_closure(parameters_->get_host_by_name_actor_id_, &td::GetHostByNameActor::run, url_.host_, url_.port_, false, - td::PromiseCreator::from_promise_actor(std::move(promise))); + is_ip_address_being_resolved_ = true; + auto promise = td::PromiseCreator::lambda([actor_id = actor_id(this)](td::Result r_ip_address) { + send_closure(actor_id, &WebhookActor::on_resolved_ip_address, std::move(r_ip_address)); + }); + send_closure(parameters_->get_host_by_name_actor_id_, &td::GetHostByNameActor::run, url_.host_, url_.port_, false, + std::move(promise)); +} + +void WebhookActor::on_resolved_ip_address(td::Result r_ip_address) { + CHECK(is_ip_address_being_resolved_); + is_ip_address_being_resolved_ = false; + + next_ip_address_resolve_time_ = + td::Time::now() + IP_ADDRESS_CACHE_TIME + td::Random::fast(0, IP_ADDRESS_CACHE_TIME / 10); + relax_wakeup_at(next_ip_address_resolve_time_, "on_resolved_ip_address"); + + SCOPE_EXIT { + loop(); + }; + + if (r_ip_address.is_error()) { + return on_error(r_ip_address.move_as_error()); } - - if (future_ip_address_.is_ready()) { - next_ip_address_resolve_time_ = - td::Time::now() + IP_ADDRESS_CACHE_TIME + td::Random::fast(0, IP_ADDRESS_CACHE_TIME / 10); - relax_wakeup_at(next_ip_address_resolve_time_, "resolve_ip_address"); - - auto r_ip_address = future_ip_address_.move_as_result(); - if (r_ip_address.is_error()) { - CHECK(!(r_ip_address.error() == td::Status::Error::HANGUP_ERROR_CODE>())); - return on_error(r_ip_address.move_as_error()); - } - auto new_ip_address = r_ip_address.move_as_ok(); - if (!check_ip_address(new_ip_address)) { - return on_error(td::Status::Error(PSLICE() << "IP address " << new_ip_address.get_ip_str() << " is reserved")); - } - if (!(ip_address_ == new_ip_address)) { - VLOG(webhook) << "IP address has changed: " << ip_address_ << " --> " << new_ip_address; - ip_address_ = new_ip_address; - ip_generation_++; - if (was_checked_) { - on_webhook_verified(); - } - } - VLOG(webhook) << "IP address was verified"; - } else { - if (future_created) { - future_ip_address_.set_event(td::EventCreator::yield(actor_id())); + auto new_ip_address = r_ip_address.move_as_ok(); + if (!check_ip_address(new_ip_address)) { + return on_error(td::Status::Error(PSLICE() << "IP address " << new_ip_address.get_ip_str() << " is reserved")); + } + if (!(ip_address_ == new_ip_address)) { + VLOG(webhook) << "IP address has changed: " << ip_address_ << " --> " << new_ip_address; + ip_address_ = new_ip_address; + ip_generation_++; + if (was_checked_) { + on_webhook_verified(); } } + VLOG(webhook) << "IP address was verified"; } td::Status WebhookActor::create_connection() { @@ -605,8 +604,7 @@ void WebhookActor::handle(td::unique_ptr response) { td::MutableSlice(), std::move(response->args_), std::move(response->headers_), std::move(response->files_), parameters_->shared_data_, response->peer_address_, false); - auto promised_query = - PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor>())); + auto promised_query = PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise>())); send_closure(callback_, &Callback::send, std::move(promised_query)); } first_error_410_time_ = 0; diff --git a/telegram-bot-api/WebhookActor.h b/telegram-bot-api/WebhookActor.h index d5e2499..4ca2c34 100644 --- a/telegram-bot-api/WebhookActor.h +++ b/telegram-bot-api/WebhookActor.h @@ -137,7 +137,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { td::IPAddress ip_address_; td::int32 ip_generation_ = 0; double next_ip_address_resolve_time_ = 0; - td::FutureActor future_ip_address_; + bool is_ip_address_being_resolved_ = false; class Connection final : public td::ListNode { public: @@ -175,6 +175,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { void relax_wakeup_at(double wakeup_at, const char *source); void resolve_ip_address(); + void on_resolved_ip_address(td::Result r_ip_address); td::Result create_ssl_stream(); td::Status create_connection() TD_WARN_UNUSED_RESULT;