enable asio client to attach to an existing stream

This commit is contained in:
Zhiyong Lin 2021-06-23 10:50:57 -04:00
parent 3b17a659f6
commit 76a9ac67b5
8 changed files with 86 additions and 22 deletions

View File

@ -87,6 +87,18 @@ session::session(boost::asio::io_service &io_service,
impl_->start_resolve(host, service); impl_->start_resolve(host, service);
} }
session::session(boost::asio::io_service &io_service,
std::shared_ptr<http_stream> stream)
: impl_(std::make_shared<session_tcp_impl>(io_service, stream)) {
impl_->attached();
}
session::session(boost::asio::io_service &io_service,
std::shared_ptr<https_stream> stream)
: impl_(std::make_shared<session_tls_impl>(io_service, stream)) {
impl_->attached();
}
session::~session() {} session::~session() {}
session::session(session &&other) noexcept : impl_(std::move(other.impl_)) {} session::session(session &&other) noexcept : impl_(std::move(other.impl_)) {}

View File

@ -124,6 +124,19 @@ void session_impl::handle_ping(const boost::system::error_code &ec) {
start_ping(); start_ping();
} }
void session_impl::attached() {
if (!setup_session()) {
return;
}
socket().set_option(boost::asio::ip::tcp::no_delay(true));
do_write();
do_read();
start_ping();
}
void session_impl::connected(tcp::resolver::iterator endpoint_it) { void session_impl::connected(tcp::resolver::iterator endpoint_it) {
if (!setup_session()) { if (!setup_session()) {
return; return;
@ -329,6 +342,13 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
break; break;
} }
case NGHTTP2_GOAWAY: {
if (!sess->stopped()) {
auto ec = make_error_code(static_cast<nghttp2_error>(NGHTTP2_ERR_EOF));
sess->call_error_cb(ec);
}
break;
}
} }
return 0; return 0;
} }

View File

@ -51,6 +51,7 @@ public:
void connected(tcp::resolver::iterator endpoint_it); void connected(tcp::resolver::iterator endpoint_it);
void not_connected(const boost::system::error_code &ec); void not_connected(const boost::system::error_code &ec);
void attached();
void on_connect(connect_cb cb); void on_connect(connect_cb cb);
void on_error(error_cb cb); void on_error(error_cb cb);
@ -99,6 +100,8 @@ public:
void stop(); void stop();
bool stopped() const; bool stopped() const;
void call_error_cb(const boost::system::error_code &ec);
protected: protected:
boost::array<uint8_t, 8_k> rb_; boost::array<uint8_t, 8_k> rb_;
boost::array<uint8_t, 64_k> wb_; boost::array<uint8_t, 64_k> wb_;
@ -107,7 +110,6 @@ protected:
private: private:
bool should_stop() const; bool should_stop() const;
bool setup_session(); bool setup_session();
void call_error_cb(const boost::system::error_code &ec);
void handle_deadline(); void handle_deadline();
void start_ping(); void start_ping();
void handle_ping(const boost::system::error_code &ec); void handle_ping(const boost::system::error_code &ec);

View File

@ -32,25 +32,32 @@ session_tcp_impl::session_tcp_impl(
boost::asio::io_service &io_service, const std::string &host, boost::asio::io_service &io_service, const std::string &host,
const std::string &service, const std::string &service,
const boost::posix_time::time_duration &connect_timeout) const boost::posix_time::time_duration &connect_timeout)
: session_impl(io_service, connect_timeout), socket_(io_service) {} : session_impl(io_service, connect_timeout),
socket_(std::make_shared<tcp::socket>(io_service)) {}
session_tcp_impl::session_tcp_impl( session_tcp_impl::session_tcp_impl(
boost::asio::io_service &io_service, boost::asio::io_service &io_service,
const boost::asio::ip::tcp::endpoint &local_endpoint, const boost::asio::ip::tcp::endpoint &local_endpoint,
const std::string &host, const std::string &service, const std::string &host, const std::string &service,
const boost::posix_time::time_duration &connect_timeout) const boost::posix_time::time_duration &connect_timeout)
: session_impl(io_service, connect_timeout), socket_(io_service) { : session_impl(io_service, connect_timeout),
socket_.open(local_endpoint.protocol()); socket_(std::make_shared<tcp::socket>(io_service)) {
socket_->open(local_endpoint.protocol());
boost::asio::socket_base::reuse_address option(true); boost::asio::socket_base::reuse_address option(true);
socket_.set_option(option); socket_->set_option(option);
socket_.bind(local_endpoint); socket_->bind(local_endpoint);
} }
session_tcp_impl::session_tcp_impl(boost::asio::io_service &io_service,
std::shared_ptr<tcp::socket> socket)
: session_impl(io_service, boost::posix_time::seconds(0)),
socket_(socket) {}
session_tcp_impl::~session_tcp_impl() {} session_tcp_impl::~session_tcp_impl() {}
void session_tcp_impl::start_connect(tcp::resolver::iterator endpoint_it) { void session_tcp_impl::start_connect(tcp::resolver::iterator endpoint_it) {
auto self = shared_from_this(); auto self = shared_from_this();
socket_.async_connect( socket_->async_connect(
*endpoint_it, [self, endpoint_it](const boost::system::error_code &ec) { *endpoint_it, [self, endpoint_it](const boost::system::error_code &ec) {
if (self->stopped()) { if (self->stopped()) {
return; return;
@ -65,21 +72,21 @@ void session_tcp_impl::start_connect(tcp::resolver::iterator endpoint_it) {
}); });
} }
tcp::socket &session_tcp_impl::socket() { return socket_; } tcp::socket &session_tcp_impl::socket() { return *socket_; }
void session_tcp_impl::read_socket( void session_tcp_impl::read_socket(
std::function<void(const boost::system::error_code &ec, std::size_t n)> h) { std::function<void(const boost::system::error_code &ec, std::size_t n)> h) {
socket_.async_read_some(boost::asio::buffer(rb_), h); socket_->async_read_some(boost::asio::buffer(rb_), h);
} }
void session_tcp_impl::write_socket( void session_tcp_impl::write_socket(
std::function<void(const boost::system::error_code &ec, std::size_t n)> h) { std::function<void(const boost::system::error_code &ec, std::size_t n)> h) {
boost::asio::async_write(socket_, boost::asio::buffer(wb_, wblen_), h); boost::asio::async_write(*socket_, boost::asio::buffer(wb_, wblen_), h);
} }
void session_tcp_impl::shutdown_socket() { void session_tcp_impl::shutdown_socket() {
boost::system::error_code ignored_ec; boost::system::error_code ignored_ec;
socket_.close(ignored_ec); socket_->close(ignored_ec);
} }
} // namespace client } // namespace client

View File

@ -44,6 +44,9 @@ public:
const boost::asio::ip::tcp::endpoint &local_endpoint, const boost::asio::ip::tcp::endpoint &local_endpoint,
const std::string &host, const std::string &service, const std::string &host, const std::string &service,
const boost::posix_time::time_duration &connect_timeout); const boost::posix_time::time_duration &connect_timeout);
session_tcp_impl(boost::asio::io_service &io_service,
std::shared_ptr<tcp::socket> socket);
virtual ~session_tcp_impl(); virtual ~session_tcp_impl();
virtual void start_connect(tcp::resolver::iterator endpoint_it); virtual void start_connect(tcp::resolver::iterator endpoint_it);
@ -57,7 +60,7 @@ public:
virtual void shutdown_socket(); virtual void shutdown_socket();
private: private:
tcp::socket socket_; std::shared_ptr<tcp::socket> socket_;
}; };
} // namespace client } // namespace client

View File

@ -33,17 +33,23 @@ session_tls_impl::session_tls_impl(
boost::asio::io_service &io_service, boost::asio::ssl::context &tls_ctx, boost::asio::io_service &io_service, boost::asio::ssl::context &tls_ctx,
const std::string &host, const std::string &service, const std::string &host, const std::string &service,
const boost::posix_time::time_duration &connect_timeout) const boost::posix_time::time_duration &connect_timeout)
: session_impl(io_service, connect_timeout), socket_(io_service, tls_ctx) { : session_impl(io_service, connect_timeout),
socket_(std::make_shared<ssl_socket>(io_service, tls_ctx)) {
// this callback setting is no effect is // this callback setting is no effect is
// ssl::context::set_verify_mode(boost::asio::ssl::verify_peer) is // ssl::context::set_verify_mode(boost::asio::ssl::verify_peer) is
// not used, which is what we want. // not used, which is what we want.
socket_.set_verify_callback(boost::asio::ssl::rfc2818_verification(host)); socket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(host));
auto ssl = socket_.native_handle(); auto ssl = socket_->native_handle();
if (!util::numeric_host(host.c_str())) { if (!util::numeric_host(host.c_str())) {
SSL_set_tlsext_host_name(ssl, host.c_str()); SSL_set_tlsext_host_name(ssl, host.c_str());
} }
} }
session_tls_impl::session_tls_impl(boost::asio::io_service &io_service,
std::shared_ptr<ssl_socket> socket)
: session_impl(io_service, boost::posix_time::seconds{0}),
socket_(socket) {}
session_tls_impl::~session_tls_impl() {} session_tls_impl::~session_tls_impl() {}
void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) { void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) {
@ -61,7 +67,7 @@ void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) {
return; return;
} }
self->socket_.async_handshake( self->socket_->async_handshake(
boost::asio::ssl::stream_base::client, boost::asio::ssl::stream_base::client,
[self, endpoint_it](const boost::system::error_code &ec) { [self, endpoint_it](const boost::system::error_code &ec) {
if (self->stopped()) { if (self->stopped()) {
@ -73,7 +79,7 @@ void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) {
return; return;
} }
if (!tls_h2_negotiated(self->socket_)) { if (!tls_h2_negotiated(*(self->socket_))) {
self->not_connected(make_error_code( self->not_connected(make_error_code(
NGHTTP2_ASIO_ERR_TLS_NO_APP_PROTO_NEGOTIATED)); NGHTTP2_ASIO_ERR_TLS_NO_APP_PROTO_NEGOTIATED));
return; return;
@ -84,21 +90,21 @@ void session_tls_impl::start_connect(tcp::resolver::iterator endpoint_it) {
}); });
} }
tcp::socket &session_tls_impl::socket() { return socket_.next_layer(); } tcp::socket &session_tls_impl::socket() { return socket_->next_layer(); }
void session_tls_impl::read_socket( void session_tls_impl::read_socket(
std::function<void(const boost::system::error_code &ec, std::size_t n)> h) { std::function<void(const boost::system::error_code &ec, std::size_t n)> h) {
socket_.async_read_some(boost::asio::buffer(rb_), h); socket_->async_read_some(boost::asio::buffer(rb_), h);
} }
void session_tls_impl::write_socket( void session_tls_impl::write_socket(
std::function<void(const boost::system::error_code &ec, std::size_t n)> h) { std::function<void(const boost::system::error_code &ec, std::size_t n)> h) {
boost::asio::async_write(socket_, boost::asio::buffer(wb_, wblen_), h); boost::asio::async_write(*socket_, boost::asio::buffer(wb_, wblen_), h);
} }
void session_tls_impl::shutdown_socket() { void session_tls_impl::shutdown_socket() {
boost::system::error_code ignored_ec; boost::system::error_code ignored_ec;
socket_.lowest_layer().close(ignored_ec); socket_->lowest_layer().close(ignored_ec);
} }
} // namespace client } // namespace client

View File

@ -43,6 +43,9 @@ public:
boost::asio::ssl::context &tls_ctx, const std::string &host, boost::asio::ssl::context &tls_ctx, const std::string &host,
const std::string &service, const std::string &service,
const boost::posix_time::time_duration &connect_timeout); const boost::posix_time::time_duration &connect_timeout);
session_tls_impl(boost::asio::io_service &io_service,
std::shared_ptr<ssl_socket> socket);
virtual ~session_tls_impl(); virtual ~session_tls_impl();
virtual void start_connect(tcp::resolver::iterator endpoint_it); virtual void start_connect(tcp::resolver::iterator endpoint_it);
@ -56,7 +59,7 @@ public:
virtual void shutdown_socket(); virtual void shutdown_socket();
private: private:
ssl_socket socket_; std::shared_ptr<ssl_socket> socket_;
}; };
} // namespace client } // namespace client

View File

@ -26,6 +26,7 @@
#define ASIO_HTTP2_CLIENT_H #define ASIO_HTTP2_CLIENT_H
#include <nghttp2/asio_http2.h> #include <nghttp2/asio_http2.h>
#include <memory>
namespace nghttp2 { namespace nghttp2 {
@ -142,6 +143,9 @@ private:
class session_impl; class session_impl;
using http_stream = boost::asio::ip::tcp::socket;
using https_stream = boost::asio::ssl::stream<http_stream>;
class session { class session {
public: public:
// Starts HTTP/2 session by connecting to |host| and |service| // Starts HTTP/2 session by connecting to |host| and |service|
@ -183,6 +187,13 @@ public:
const std::string &service, const std::string &service,
const boost::posix_time::time_duration &connect_timeout); const boost::posix_time::time_duration &connect_timeout);
// Starts HTTP/2 session by attaching to a stream. The stream must out live
// the session.
session(boost::asio::io_service &io_service,
std::shared_ptr<http_stream> stream);
session(boost::asio::io_service &io_service,
std::shared_ptr<https_stream> stream);
~session(); ~session();
session(session &&other) noexcept; session(session &&other) noexcept;