From 76a9ac67b5e81a02bf09cbfec7439a7561551068 Mon Sep 17 00:00:00 2001 From: Zhiyong Lin Date: Wed, 23 Jun 2021 10:50:57 -0400 Subject: [PATCH] enable asio client to attach to an existing stream --- src/asio_client_session.cc | 12 +++++++++++ src/asio_client_session_impl.cc | 20 ++++++++++++++++++ src/asio_client_session_impl.h | 4 +++- src/asio_client_session_tcp_impl.cc | 27 +++++++++++++++--------- src/asio_client_session_tcp_impl.h | 5 ++++- src/asio_client_session_tls_impl.cc | 24 +++++++++++++-------- src/asio_client_session_tls_impl.h | 5 ++++- src/includes/nghttp2/asio_http2_client.h | 11 ++++++++++ 8 files changed, 86 insertions(+), 22 deletions(-) diff --git a/src/asio_client_session.cc b/src/asio_client_session.cc index 5cf01c74..ed1f1ccf 100644 --- a/src/asio_client_session.cc +++ b/src/asio_client_session.cc @@ -87,6 +87,18 @@ session::session(boost::asio::io_service &io_service, impl_->start_resolve(host, service); } +session::session(boost::asio::io_service &io_service, + std::shared_ptr stream) + : impl_(std::make_shared(io_service, stream)) { + impl_->attached(); +} + +session::session(boost::asio::io_service &io_service, + std::shared_ptr stream) + : impl_(std::make_shared(io_service, stream)) { + impl_->attached(); +} + session::~session() {} session::session(session &&other) noexcept : impl_(std::move(other.impl_)) {} diff --git a/src/asio_client_session_impl.cc b/src/asio_client_session_impl.cc index b96824dd..c1f8d7c2 100644 --- a/src/asio_client_session_impl.cc +++ b/src/asio_client_session_impl.cc @@ -124,6 +124,19 @@ void session_impl::handle_ping(const boost::system::error_code &ec) { start_ping(); } +void session_impl::attached() { + if (!setup_session()) { + return; + } + + socket().set_option(boost::asio::ip::tcp::no_delay(true)); + + do_write(); + do_read(); + + start_ping(); +} + void session_impl::connected(tcp::resolver::iterator endpoint_it) { if (!setup_session()) { return; @@ -329,6 +342,13 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, break; } + case NGHTTP2_GOAWAY: { + if (!sess->stopped()) { + auto ec = make_error_code(static_cast(NGHTTP2_ERR_EOF)); + sess->call_error_cb(ec); + } + break; + } } return 0; } diff --git a/src/asio_client_session_impl.h b/src/asio_client_session_impl.h index 694ac208..12e29562 100644 --- a/src/asio_client_session_impl.h +++ b/src/asio_client_session_impl.h @@ -51,6 +51,7 @@ public: void connected(tcp::resolver::iterator endpoint_it); void not_connected(const boost::system::error_code &ec); + void attached(); void on_connect(connect_cb cb); void on_error(error_cb cb); @@ -99,6 +100,8 @@ public: void stop(); bool stopped() const; + void call_error_cb(const boost::system::error_code &ec); + protected: boost::array rb_; boost::array wb_; @@ -107,7 +110,6 @@ protected: private: bool should_stop() const; bool setup_session(); - void call_error_cb(const boost::system::error_code &ec); void handle_deadline(); void start_ping(); void handle_ping(const boost::system::error_code &ec); diff --git a/src/asio_client_session_tcp_impl.cc b/src/asio_client_session_tcp_impl.cc index 8fdf2118..05d87518 100644 --- a/src/asio_client_session_tcp_impl.cc +++ b/src/asio_client_session_tcp_impl.cc @@ -32,25 +32,32 @@ session_tcp_impl::session_tcp_impl( boost::asio::io_service &io_service, const std::string &host, const std::string &service, const boost::posix_time::time_duration &connect_timeout) - : session_impl(io_service, connect_timeout), socket_(io_service) {} + : session_impl(io_service, connect_timeout), + socket_(std::make_shared(io_service)) {} session_tcp_impl::session_tcp_impl( boost::asio::io_service &io_service, const boost::asio::ip::tcp::endpoint &local_endpoint, const std::string &host, const std::string &service, const boost::posix_time::time_duration &connect_timeout) - : session_impl(io_service, connect_timeout), socket_(io_service) { - socket_.open(local_endpoint.protocol()); + : session_impl(io_service, connect_timeout), + socket_(std::make_shared(io_service)) { + socket_->open(local_endpoint.protocol()); boost::asio::socket_base::reuse_address option(true); - socket_.set_option(option); - socket_.bind(local_endpoint); + socket_->set_option(option); + socket_->bind(local_endpoint); } +session_tcp_impl::session_tcp_impl(boost::asio::io_service &io_service, + std::shared_ptr socket) + : session_impl(io_service, boost::posix_time::seconds(0)), + socket_(socket) {} + session_tcp_impl::~session_tcp_impl() {} void session_tcp_impl::start_connect(tcp::resolver::iterator endpoint_it) { auto self = shared_from_this(); - socket_.async_connect( + socket_->async_connect( *endpoint_it, [self, endpoint_it](const boost::system::error_code &ec) { if (self->stopped()) { return; @@ -65,21 +72,21 @@ void session_tcp_impl::start_connect(tcp::resolver::iterator endpoint_it) { }); } -tcp::socket &session_tcp_impl::socket() { return socket_; } +tcp::socket &session_tcp_impl::socket() { return *socket_; } void session_tcp_impl::read_socket( std::function h) { - socket_.async_read_some(boost::asio::buffer(rb_), h); + socket_->async_read_some(boost::asio::buffer(rb_), h); } void session_tcp_impl::write_socket( std::function h) { - boost::asio::async_write(socket_, boost::asio::buffer(wb_, wblen_), h); + boost::asio::async_write(*socket_, boost::asio::buffer(wb_, wblen_), h); } void session_tcp_impl::shutdown_socket() { boost::system::error_code ignored_ec; - socket_.close(ignored_ec); + socket_->close(ignored_ec); } } // namespace client diff --git a/src/asio_client_session_tcp_impl.h b/src/asio_client_session_tcp_impl.h index 0b6ae93f..7d46b240 100644 --- a/src/asio_client_session_tcp_impl.h +++ b/src/asio_client_session_tcp_impl.h @@ -44,6 +44,9 @@ public: const boost::asio::ip::tcp::endpoint &local_endpoint, const std::string &host, const std::string &service, const boost::posix_time::time_duration &connect_timeout); + session_tcp_impl(boost::asio::io_service &io_service, + std::shared_ptr socket); + virtual ~session_tcp_impl(); virtual void start_connect(tcp::resolver::iterator endpoint_it); @@ -57,7 +60,7 @@ public: virtual void shutdown_socket(); private: - tcp::socket socket_; + std::shared_ptr socket_; }; } // namespace client diff --git a/src/asio_client_session_tls_impl.cc b/src/asio_client_session_tls_impl.cc index 377886ca..85bb5cf8 100644 --- a/src/asio_client_session_tls_impl.cc +++ b/src/asio_client_session_tls_impl.cc @@ -33,17 +33,23 @@ session_tls_impl::session_tls_impl( boost::asio::io_service &io_service, boost::asio::ssl::context &tls_ctx, const std::string &host, const std::string &service, const boost::posix_time::time_duration &connect_timeout) - : session_impl(io_service, connect_timeout), socket_(io_service, tls_ctx) { + : session_impl(io_service, connect_timeout), + socket_(std::make_shared(io_service, tls_ctx)) { // this callback setting is no effect is // ssl::context::set_verify_mode(boost::asio::ssl::verify_peer) is // not used, which is what we want. - socket_.set_verify_callback(boost::asio::ssl::rfc2818_verification(host)); - auto ssl = socket_.native_handle(); + socket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(host)); + auto ssl = socket_->native_handle(); if (!util::numeric_host(host.c_str())) { SSL_set_tlsext_host_name(ssl, host.c_str()); } } +session_tls_impl::session_tls_impl(boost::asio::io_service &io_service, + std::shared_ptr socket) + : session_impl(io_service, boost::posix_time::seconds{0}), + socket_(socket) {} + session_tls_impl::~session_tls_impl() {} void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) { @@ -61,7 +67,7 @@ void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) { return; } - self->socket_.async_handshake( + self->socket_->async_handshake( boost::asio::ssl::stream_base::client, [self, endpoint_it](const boost::system::error_code &ec) { if (self->stopped()) { @@ -73,7 +79,7 @@ void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) { return; } - if (!tls_h2_negotiated(self->socket_)) { + if (!tls_h2_negotiated(*(self->socket_))) { self->not_connected(make_error_code( NGHTTP2_ASIO_ERR_TLS_NO_APP_PROTO_NEGOTIATED)); return; @@ -84,21 +90,21 @@ void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) { }); } -tcp::socket &session_tls_impl::socket() { return socket_.next_layer(); } +tcp::socket &session_tls_impl::socket() { return socket_->next_layer(); } void session_tls_impl::read_socket( std::function h) { - socket_.async_read_some(boost::asio::buffer(rb_), h); + socket_->async_read_some(boost::asio::buffer(rb_), h); } void session_tls_impl::write_socket( std::function h) { - boost::asio::async_write(socket_, boost::asio::buffer(wb_, wblen_), h); + boost::asio::async_write(*socket_, boost::asio::buffer(wb_, wblen_), h); } void session_tls_impl::shutdown_socket() { boost::system::error_code ignored_ec; - socket_.lowest_layer().close(ignored_ec); + socket_->lowest_layer().close(ignored_ec); } } // namespace client diff --git a/src/asio_client_session_tls_impl.h b/src/asio_client_session_tls_impl.h index 645c60f4..ac7ada84 100644 --- a/src/asio_client_session_tls_impl.h +++ b/src/asio_client_session_tls_impl.h @@ -43,6 +43,9 @@ public: boost::asio::ssl::context &tls_ctx, const std::string &host, const std::string &service, const boost::posix_time::time_duration &connect_timeout); + session_tls_impl(boost::asio::io_service &io_service, + std::shared_ptr socket); + virtual ~session_tls_impl(); virtual void start_connect(tcp::resolver::iterator endpoint_it); @@ -56,7 +59,7 @@ public: virtual void shutdown_socket(); private: - ssl_socket socket_; + std::shared_ptr socket_; }; } // namespace client diff --git a/src/includes/nghttp2/asio_http2_client.h b/src/includes/nghttp2/asio_http2_client.h index 59ba9b26..0d18354f 100644 --- a/src/includes/nghttp2/asio_http2_client.h +++ b/src/includes/nghttp2/asio_http2_client.h @@ -26,6 +26,7 @@ #define ASIO_HTTP2_CLIENT_H #include +#include namespace nghttp2 { @@ -142,6 +143,9 @@ private: class session_impl; +using http_stream = boost::asio::ip::tcp::socket; +using https_stream = boost::asio::ssl::stream; + class session { public: // Starts HTTP/2 session by connecting to |host| and |service| @@ -183,6 +187,13 @@ public: const std::string &service, const boost::posix_time::time_duration &connect_timeout); + // Starts HTTP/2 session by attaching to a stream. The stream must out live + // the session. + session(boost::asio::io_service &io_service, + std::shared_ptr stream); + session(boost::asio::io_service &io_service, + std::shared_ptr stream); + ~session(); session(session &&other) noexcept;