From 7c5ef0613dc9f540021ea97f4a53ce2841d0866f Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Mon, 21 Dec 2015 23:38:32 +0900 Subject: [PATCH] asio: Add configurable connect/read timeout for client This commit includes backward incompatible change, since we change private field in public API class. --- src/asio_client_session.cc | 16 ++- src/asio_client_session_impl.cc | 127 +++++++++++++++++++---- src/asio_client_session_impl.h | 13 ++- src/asio_client_session_tcp_impl.cc | 9 +- src/asio_client_session_tls_impl.cc | 5 +- src/includes/nghttp2/asio_http2_client.h | 8 +- 6 files changed, 146 insertions(+), 32 deletions(-) diff --git a/src/asio_client_session.cc b/src/asio_client_session.cc index aaee1e77..bc835c38 100644 --- a/src/asio_client_session.cc +++ b/src/asio_client_session.cc @@ -39,12 +39,16 @@ using boost::asio::ip::tcp; session::session(boost::asio::io_service &io_service, const std::string &host, const std::string &service) - : impl_(make_unique(io_service, host, service)) {} + : impl_(std::make_shared(io_service, host, service)) { + impl_->start_resolve(host, service); +} session::session(boost::asio::io_service &io_service, boost::asio::ssl::context &tls_ctx, const std::string &host, const std::string &service) - : impl_(make_unique(io_service, tls_ctx, host, service)) { + : impl_(std::make_shared(io_service, tls_ctx, host, + service)) { + impl_->start_resolve(host, service); } session::~session() {} @@ -93,6 +97,14 @@ const request *session::submit(boost::system::error_code &ec, return impl_->submit(ec, method, uri, std::move(cb), std::move(h)); } +void session::connect_timeout(const boost::posix_time::time_duration &t) { + impl_->connect_timeout(t); +} + +void session::read_timeout(const boost::posix_time::time_duration &t) { + impl_->read_timeout(t); +} + } // namespace client } // namespace asio_http2 } // nghttp2 diff --git a/src/asio_client_session_impl.cc b/src/asio_client_session_impl.cc index 4aeee74d..a4cce25a 100644 --- a/src/asio_client_session_impl.cc +++ b/src/asio_client_session_impl.cc @@ -40,8 +40,10 @@ namespace client { session_impl::session_impl(boost::asio::io_service &io_service) : wblen_(0), io_service_(io_service), resolver_(io_service), - session_(nullptr), data_pending_(nullptr), data_pendinglen_(0), - writing_(false), inside_callback_(false) {} + deadline_(io_service), connect_timeout_(boost::posix_time::seconds(60)), + read_timeout_(boost::posix_time::seconds(60)), session_(nullptr), + data_pending_(nullptr), data_pendinglen_(0), writing_(false), + inside_callback_(false), stopped_(false) {} session_impl::~session_impl() { // finish up all active stream @@ -56,9 +58,13 @@ session_impl::~session_impl() { void session_impl::start_resolve(const std::string &host, const std::string &service) { + deadline_.expires_from_now(connect_timeout_); + + auto self = this->shared_from_this(); + resolver_.async_resolve({host, service}, - [this](const boost::system::error_code &ec, - tcp::resolver::iterator endpoint_it) { + [this, self](const boost::system::error_code &ec, + tcp::resolver::iterator endpoint_it) { if (ec) { not_connected(ec); return; @@ -66,6 +72,25 @@ void session_impl::start_resolve(const std::string &host, start_connect(endpoint_it); }); + + deadline_.async_wait(std::bind(&session_impl::handle_deadline, self)); +} + +void session_impl::handle_deadline() { + if (stopped_) { + return; + } + + if (deadline_.expires_at() <= + boost::asio::deadline_timer::traits_type::now()) { + call_error_cb(boost::asio::error::timed_out); + stop(); + deadline_.expires_at(boost::posix_time::pos_infin); + return; + } + + deadline_.async_wait( + std::bind(&session_impl::handle_deadline, this->shared_from_this())); } void session_impl::connected(tcp::resolver::iterator endpoint_it) { @@ -86,6 +111,7 @@ void session_impl::connected(tcp::resolver::iterator endpoint_it) { void session_impl::not_connected(const boost::system::error_code &ec) { call_error_cb(ec); + stop(); } void session_impl::on_connect(connect_cb cb) { connect_cb_ = std::move(cb); } @@ -97,6 +123,9 @@ const connect_cb &session_impl::on_connect() const { return connect_cb_; } const error_cb &session_impl::on_error() const { return error_cb_; } void session_impl::call_error_cb(const boost::system::error_code &ec) { + if (stopped_) { + return; + } auto &error_cb = on_error(); if (!error_cb) { return; @@ -350,12 +379,20 @@ int session_impl::write_trailer(stream &strm, header_map h) { } void session_impl::cancel(stream &strm, uint32_t error_code) { + if (stopped_) { + return; + } + nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE, strm.stream_id(), error_code); signal_write(); } void session_impl::resume(stream &strm) { + if (stopped_) { + return; + } + nghttp2_session_resume_data(session_, strm.stream_id()); signal_write(); } @@ -396,6 +433,11 @@ const request *session_impl::submit(boost::system::error_code &ec, header_map h) { ec.clear(); + if (stopped_) { + ec = make_error_code(static_cast(NGHTTP2_INTERNAL_ERROR)); + return nullptr; + } + http_parser_url u{}; // TODO Handle CONNECT method if (http_parser_parse_url(uri.c_str(), uri.size(), 0, &u) != 0) { @@ -485,6 +527,10 @@ const request *session_impl::submit(boost::system::error_code &ec, } void session_impl::shutdown() { + if (stopped_) { + return; + } + nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR); signal_write(); } @@ -522,14 +568,21 @@ void session_impl::leave_callback() { } void session_impl::do_read() { - read_socket([this](const boost::system::error_code &ec, - std::size_t bytes_transferred) { + if (stopped_) { + return; + } + + deadline_.expires_from_now(read_timeout_); + + auto self = this->shared_from_this(); + + read_socket([this, self](const boost::system::error_code &ec, + std::size_t bytes_transferred) { if (ec) { if (!should_stop()) { call_error_cb(ec); } - shutdown_socket(); - + stop(); return; } @@ -542,7 +595,7 @@ void session_impl::do_read() { if (rv != static_cast(bytes_transferred)) { call_error_cb(make_error_code( static_cast(rv < 0 ? rv : NGHTTP2_ERR_PROTO))); - shutdown_socket(); + stop(); return; } } @@ -550,7 +603,7 @@ void session_impl::do_read() { do_write(); if (should_stop()) { - shutdown_socket(); + stop(); return; } @@ -559,6 +612,10 @@ void session_impl::do_read() { } void session_impl::do_write() { + if (stopped_) { + return; + } + if (writing_) { return; } @@ -580,7 +637,7 @@ void session_impl::do_write() { auto n = nghttp2_session_mem_send(session_, &data); if (n < 0) { call_error_cb(make_error_code(static_cast(n))); - shutdown_socket(); + stop(); return; } @@ -602,23 +659,51 @@ void session_impl::do_write() { } if (wblen_ == 0) { + if (should_stop()) { + stop(); + } return; } writing_ = true; - write_socket([this](const boost::system::error_code &ec, std::size_t n) { - if (ec) { - call_error_cb(ec); - shutdown_socket(); - return; - } + // Reset read deadline here, because normally client is sending + // something, it does not expect timeout while doing it. + deadline_.expires_from_now(read_timeout_); - wblen_ = 0; - writing_ = false; + auto self = this->shared_from_this(); - do_write(); - }); + write_socket( + [this, self](const boost::system::error_code &ec, std::size_t n) { + if (ec) { + call_error_cb(ec); + stop(); + return; + } + + wblen_ = 0; + writing_ = false; + + do_write(); + }); +} + +void session_impl::stop() { + if (stopped_) { + return; + } + + shutdown_socket(); + deadline_.cancel(); + stopped_ = true; +} + +void session_impl::connect_timeout(const boost::posix_time::time_duration &t) { + connect_timeout_ = t; +} + +void session_impl::read_timeout(const boost::posix_time::time_duration &t) { + read_timeout_ = t; } } // namespace client diff --git a/src/asio_client_session_impl.h b/src/asio_client_session_impl.h index 37032d68..5e936c37 100644 --- a/src/asio_client_session_impl.h +++ b/src/asio_client_session_impl.h @@ -41,7 +41,7 @@ class stream; using boost::asio::ip::tcp; -class session_impl { +class session_impl : public std::enable_shared_from_this { public: session_impl(boost::asio::io_service &io_service); virtual ~session_impl(); @@ -91,6 +91,11 @@ public: void do_read(); void do_write(); + void connect_timeout(const boost::posix_time::time_duration &t); + void read_timeout(const boost::posix_time::time_duration &t); + + void stop(); + protected: boost::array rb_; boost::array wb_; @@ -100,6 +105,7 @@ private: bool should_stop() const; bool setup_session(); void call_error_cb(const boost::system::error_code &ec); + void handle_deadline(); boost::asio::io_service &io_service_; tcp::resolver resolver_; @@ -109,6 +115,10 @@ private: connect_cb connect_cb_; error_cb error_cb_; + boost::asio::deadline_timer deadline_; + boost::posix_time::time_duration connect_timeout_; + boost::posix_time::time_duration read_timeout_; + nghttp2_session *session_; const uint8_t *data_pending_; @@ -116,6 +126,7 @@ private: bool writing_; bool inside_callback_; + bool stopped_; }; } // namespace client diff --git a/src/asio_client_session_tcp_impl.cc b/src/asio_client_session_tcp_impl.cc index c20a2303..478f1c16 100644 --- a/src/asio_client_session_tcp_impl.cc +++ b/src/asio_client_session_tcp_impl.cc @@ -31,9 +31,7 @@ namespace client { session_tcp_impl::session_tcp_impl(boost::asio::io_service &io_service, const std::string &host, const std::string &service) - : session_impl(io_service), socket_(io_service) { - start_resolve(host, service); -} + : session_impl(io_service), socket_(io_service) {} session_tcp_impl::~session_tcp_impl() {} @@ -62,7 +60,10 @@ void session_tcp_impl::write_socket( boost::asio::async_write(socket_, boost::asio::buffer(wb_, wblen_), h); } -void session_tcp_impl::shutdown_socket() { socket_.close(); } +void session_tcp_impl::shutdown_socket() { + boost::system::error_code ignored_ec; + socket_.close(ignored_ec); +} } // namespace client } // namespace asio_http2 diff --git a/src/asio_client_session_tls_impl.cc b/src/asio_client_session_tls_impl.cc index 8dc32d57..486c52d7 100644 --- a/src/asio_client_session_tls_impl.cc +++ b/src/asio_client_session_tls_impl.cc @@ -38,8 +38,6 @@ session_tls_impl::session_tls_impl(boost::asio::io_service &io_service, // ssl::context::set_verify_mode(boost::asio::ssl::verify_peer) is // not used, which is what we want. socket_.set_verify_callback(boost::asio::ssl::rfc2818_verification(host)); - - start_resolve(host, service); } session_tls_impl::~session_tls_impl() {} @@ -85,7 +83,8 @@ void session_tls_impl::write_socket( } void session_tls_impl::shutdown_socket() { - socket_.async_shutdown([](const boost::system::error_code &ec) {}); + boost::system::error_code ignored_ec; + socket_.lowest_layer().close(ignored_ec); } } // namespace client diff --git a/src/includes/nghttp2/asio_http2_client.h b/src/includes/nghttp2/asio_http2_client.h index a2b51ad7..55ef9f68 100644 --- a/src/includes/nghttp2/asio_http2_client.h +++ b/src/includes/nghttp2/asio_http2_client.h @@ -144,6 +144,12 @@ public: // and session is terminated. void on_error(error_cb cb) const; + // Sets connect timeout, which defaults to 60 seconds. + void connect_timeout(const boost::posix_time::time_duration &t); + + // Sets read timeout, which defaults to 60 seconds. + void read_timeout(const boost::posix_time::time_duration &t); + // Shutdowns connection. void shutdown() const; @@ -177,7 +183,7 @@ public: generator_cb cb, header_map h = header_map{}) const; private: - std::unique_ptr impl_; + std::shared_ptr impl_; }; // configure |tls_ctx| for client use. Currently, we just set NPN