Merge branch 'asio_threading' of https://github.com/clemahieu/nghttp2 into clemahieu-asio_threading

This commit is contained in:
Tatsuhiro Tsujikawa 2017-02-05 17:45:01 +09:00
commit 0df13452aa
9 changed files with 18 additions and 263 deletions

View File

@ -219,7 +219,6 @@ if(ENABLE_ASIO_LIB)
ssl.cc ssl.cc
timegm.c timegm.c
asio_common.cc asio_common.cc
asio_io_service_pool.cc
asio_server_http2.cc asio_server_http2.cc
asio_server_http2_impl.cc asio_server_http2_impl.cc
asio_server.cc asio_server.cc

View File

@ -1,102 +0,0 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// We wrote this code based on the original code which has the
// following license:
//
// io_service_pool.cpp
// ~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "asio_io_service_pool.h"
namespace nghttp2 {
namespace asio_http2 {
io_service_pool::io_service_pool(std::size_t pool_size) : next_io_service_(0) {
if (pool_size == 0) {
throw std::runtime_error("io_service_pool size is 0");
}
// Give all the io_services work to do so that their run() functions will not
// exit until they are explicitly stopped.
for (std::size_t i = 0; i < pool_size; ++i) {
auto io_service = std::make_shared<boost::asio::io_service>();
auto work = std::make_shared<boost::asio::io_service::work>(*io_service);
io_services_.push_back(io_service);
work_.push_back(work);
}
}
void io_service_pool::run(bool asynchronous) {
// Create a pool of threads to run all of the io_services.
for (std::size_t i = 0; i < io_services_.size(); ++i) {
futures_.push_back(std::async(std::launch::async,
(size_t(boost::asio::io_service::*)(void)) &
boost::asio::io_service::run,
io_services_[i]));
}
if (!asynchronous) {
join();
}
}
void io_service_pool::join() {
// Wait for all threads in the pool to exit.
for (auto &fut : futures_) {
fut.get();
}
}
void io_service_pool::stop() {
// Explicitly stop all io_services.
for (auto &iosv : io_services_) {
iosv->stop();
}
}
boost::asio::io_service &io_service_pool::get_io_service() {
// Use a round-robin scheme to choose the next io_service to use.
auto &io_service = *io_services_[next_io_service_];
++next_io_service_;
if (next_io_service_ == io_services_.size()) {
next_io_service_ = 0;
}
return io_service;
}
const std::vector<std::shared_ptr<boost::asio::io_service>> &
io_service_pool::io_services() const {
return io_services_;
}
} // namespace asio_http2
} // namespace nghttp2

View File

@ -1,95 +0,0 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// We wrote this code based on the original code which has the
// following license:
//
// io_service_pool.hpp
// ~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASIO_IO_SERVICE_POOL_H
#define ASIO_IO_SERVICE_POOL_H
#include "nghttp2_config.h"
#include <vector>
#include <memory>
#include <future>
#include <boost/noncopyable.hpp>
#include <boost/thread.hpp>
#include <nghttp2/asio_http2.h>
namespace nghttp2 {
namespace asio_http2 {
/// A pool of io_service objects.
class io_service_pool : private boost::noncopyable {
public:
/// Construct the io_service pool.
explicit io_service_pool(std::size_t pool_size);
/// Run all io_service objects in the pool.
void run(bool asynchronous = false);
/// Stop all io_service objects in the pool.
void stop();
/// Join on all io_service objects in the pool.
void join();
/// Get an io_service to use.
boost::asio::io_service &get_io_service();
/// Get access to all io_service objects.
const std::vector<std::shared_ptr<boost::asio::io_service>> &
io_services() const;
private:
/// The pool of io_services.
std::vector<std::shared_ptr<boost::asio::io_service>> io_services_;
/// The work that keeps the io_services running.
std::vector<std::shared_ptr<boost::asio::io_service::work>> work_;
/// The next io_service to use for a connection.
std::size_t next_io_service_;
/// Futures to all the io_service objects
std::vector<std::future<std::size_t>> futures_;
};
} // namespace asio_http2
} // namespace nghttp2
#endif // ASIO_IO_SERVICE_POOL_H

View File

@ -44,10 +44,10 @@ namespace nghttp2 {
namespace asio_http2 { namespace asio_http2 {
namespace server { namespace server {
server::server(std::size_t io_service_pool_size, server::server(boost::asio::io_service &service,
const boost::posix_time::time_duration &tls_handshake_timeout, const boost::posix_time::time_duration &tls_handshake_timeout,
const boost::posix_time::time_duration &read_timeout) const boost::posix_time::time_duration &read_timeout)
: io_service_pool_(io_service_pool_size), : service_(service),
tls_handshake_timeout_(tls_handshake_timeout), tls_handshake_timeout_(tls_handshake_timeout),
read_timeout_(read_timeout) {} read_timeout_(read_timeout) {}
@ -70,8 +70,6 @@ server::listen_and_serve(boost::system::error_code &ec,
} }
} }
io_service_pool_.run(asynchronous);
return ec; return ec;
} }
@ -81,7 +79,7 @@ boost::system::error_code server::bind_and_listen(boost::system::error_code &ec,
int backlog) { int backlog) {
// Open the acceptor with the option to reuse the address (i.e. // Open the acceptor with the option to reuse the address (i.e.
// SO_REUSEADDR). // SO_REUSEADDR).
tcp::resolver resolver(io_service_pool_.get_io_service()); tcp::resolver resolver(service_);
tcp::resolver::query query(address, port); tcp::resolver::query query(address, port);
auto it = resolver.resolve(query, ec); auto it = resolver.resolve(query, ec);
if (ec) { if (ec) {
@ -90,7 +88,7 @@ boost::system::error_code server::bind_and_listen(boost::system::error_code &ec,
for (; it != tcp::resolver::iterator(); ++it) { for (; it != tcp::resolver::iterator(); ++it) {
tcp::endpoint endpoint = *it; tcp::endpoint endpoint = *it;
auto acceptor = tcp::acceptor(io_service_pool_.get_io_service()); auto acceptor = tcp::acceptor(service_);
if (acceptor.open(endpoint.protocol(), ec)) { if (acceptor.open(endpoint.protocol(), ec)) {
continue; continue;
@ -126,7 +124,7 @@ void server::start_accept(boost::asio::ssl::context &tls_context,
tcp::acceptor &acceptor, serve_mux &mux) { tcp::acceptor &acceptor, serve_mux &mux) {
auto new_connection = std::make_shared<connection<ssl_socket>>( auto new_connection = std::make_shared<connection<ssl_socket>>(
mux, tls_handshake_timeout_, read_timeout_, mux, tls_handshake_timeout_, read_timeout_,
io_service_pool_.get_io_service(), tls_context); service_, tls_context);
acceptor.async_accept( acceptor.async_accept(
new_connection->socket().lowest_layer(), new_connection->socket().lowest_layer(),
@ -159,8 +157,7 @@ void server::start_accept(boost::asio::ssl::context &tls_context,
void server::start_accept(tcp::acceptor &acceptor, serve_mux &mux) { void server::start_accept(tcp::acceptor &acceptor, serve_mux &mux) {
auto new_connection = std::make_shared<connection<tcp::socket>>( auto new_connection = std::make_shared<connection<tcp::socket>>(
mux, tls_handshake_timeout_, read_timeout_, mux, tls_handshake_timeout_, read_timeout_, service_);
io_service_pool_.get_io_service());
acceptor.async_accept( acceptor.async_accept(
new_connection->socket(), [this, &acceptor, &mux, new_connection]( new_connection->socket(), [this, &acceptor, &mux, new_connection](
@ -177,19 +174,11 @@ void server::start_accept(tcp::acceptor &acceptor, serve_mux &mux) {
} }
void server::stop() { void server::stop() {
io_service_pool_.stop();
for (auto &acceptor : acceptors_) { for (auto &acceptor : acceptors_) {
acceptor.close(); acceptor.close();
} }
} }
void server::join() { io_service_pool_.join(); }
const std::vector<std::shared_ptr<boost::asio::io_service>> &
server::io_services() const {
return io_service_pool_.io_services();
}
} // namespace server } // namespace server
} // namespace asio_http2 } // namespace asio_http2
} // namespace nghttp2 } // namespace nghttp2

View File

@ -44,11 +44,10 @@
#include <memory> #include <memory>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <boost/asio/io_service.hpp>
#include <nghttp2/asio_http2_server.h> #include <nghttp2/asio_http2_server.h>
#include "asio_io_service_pool.h"
namespace nghttp2 { namespace nghttp2 {
namespace asio_http2 { namespace asio_http2 {
@ -63,7 +62,7 @@ using ssl_socket = boost::asio::ssl::stream<tcp::socket>;
class server : private boost::noncopyable { class server : private boost::noncopyable {
public: public:
explicit server(std::size_t io_service_pool_size, explicit server(boost::asio::io_service &service,
const boost::posix_time::time_duration &tls_handshake_timeout, const boost::posix_time::time_duration &tls_handshake_timeout,
const boost::posix_time::time_duration &read_timeout); const boost::posix_time::time_duration &read_timeout);
@ -91,10 +90,7 @@ private:
const std::string &address, const std::string &address,
const std::string &port, const std::string &port,
int backlog); int backlog);
boost::asio::io_service &service_;
/// The pool of io_service objects used to perform asynchronous
/// operations.
io_service_pool io_service_pool_;
/// Acceptor used to listen for incoming connections. /// Acceptor used to listen for incoming connections.
std::vector<tcp::acceptor> acceptors_; std::vector<tcp::acceptor> acceptors_;

View File

@ -36,7 +36,7 @@ namespace asio_http2 {
namespace server { namespace server {
http2::http2() : impl_(make_unique<http2_impl>()) {} http2::http2(boost::asio::io_service &service) : impl_(make_unique<http2_impl>(service)) {}
http2::~http2() {} http2::~http2() {}
@ -65,8 +65,6 @@ boost::system::error_code http2::listen_and_serve(
return impl_->listen_and_serve(ec, &tls_context, address, port, asynchronous); return impl_->listen_and_serve(ec, &tls_context, address, port, asynchronous);
} }
void http2::num_threads(size_t num_threads) { impl_->num_threads(num_threads); }
void http2::backlog(int backlog) { impl_->backlog(backlog); } void http2::backlog(int backlog) { impl_->backlog(backlog); }
void http2::tls_handshake_timeout(const boost::posix_time::time_duration &t) { void http2::tls_handshake_timeout(const boost::posix_time::time_duration &t) {
@ -83,13 +81,6 @@ bool http2::handle(std::string pattern, request_cb cb) {
void http2::stop() { impl_->stop(); } void http2::stop() { impl_->stop(); }
void http2::join() { return impl_->join(); }
const std::vector<std::shared_ptr<boost::asio::io_service>> &
http2::io_services() const {
return impl_->io_services();
}
} // namespace server } // namespace server
} // namespace asio_http2 } // namespace asio_http2

View File

@ -37,8 +37,8 @@ namespace asio_http2 {
namespace server { namespace server {
http2_impl::http2_impl() http2_impl::http2_impl(boost::asio::io_service &service)
: num_threads_(1), : service_(service),
backlog_(-1), backlog_(-1),
tls_handshake_timeout_(boost::posix_time::seconds(60)), tls_handshake_timeout_(boost::posix_time::seconds(60)),
read_timeout_(boost::posix_time::seconds(60)) {} read_timeout_(boost::posix_time::seconds(60)) {}
@ -47,13 +47,11 @@ boost::system::error_code http2_impl::listen_and_serve(
boost::system::error_code &ec, boost::asio::ssl::context *tls_context, boost::system::error_code &ec, boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port, bool asynchronous) { const std::string &address, const std::string &port, bool asynchronous) {
server_.reset( server_.reset(
new server(num_threads_, tls_handshake_timeout_, read_timeout_)); new server(service_, tls_handshake_timeout_, read_timeout_));
return server_->listen_and_serve(ec, tls_context, address, port, backlog_, return server_->listen_and_serve(ec, tls_context, address, port, backlog_,
mux_, asynchronous); mux_, asynchronous);
} }
void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; }
void http2_impl::backlog(int backlog) { backlog_ = backlog; } void http2_impl::backlog(int backlog) { backlog_ = backlog; }
void http2_impl::tls_handshake_timeout( void http2_impl::tls_handshake_timeout(
@ -71,13 +69,6 @@ bool http2_impl::handle(std::string pattern, request_cb cb) {
void http2_impl::stop() { return server_->stop(); } void http2_impl::stop() { return server_->stop(); }
void http2_impl::join() { return server_->join(); }
const std::vector<std::shared_ptr<boost::asio::io_service>> &
http2_impl::io_services() const {
return server_->io_services();
}
} // namespace server } // namespace server
} // namespace asio_http2 } // namespace asio_http2

View File

@ -41,7 +41,7 @@ class server;
class http2_impl { class http2_impl {
public: public:
http2_impl(); http2_impl(boost::asio::io_service &service);
boost::system::error_code listen_and_serve( boost::system::error_code listen_and_serve(
boost::system::error_code &ec, boost::asio::ssl::context *tls_context, boost::system::error_code &ec, boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port, bool asynchronous); const std::string &address, const std::string &port, bool asynchronous);
@ -51,13 +51,10 @@ public:
void read_timeout(const boost::posix_time::time_duration &t); void read_timeout(const boost::posix_time::time_duration &t);
bool handle(std::string pattern, request_cb cb); bool handle(std::string pattern, request_cb cb);
void stop(); void stop();
void join();
const std::vector<std::shared_ptr<boost::asio::io_service>> &
io_services() const;
private: private:
std::unique_ptr<server> server_; std::unique_ptr<server> server_;
std::size_t num_threads_; boost::asio::io_service &service_;
int backlog_; int backlog_;
serve_mux mux_; serve_mux mux_;
boost::posix_time::time_duration tls_handshake_timeout_; boost::posix_time::time_duration tls_handshake_timeout_;

View File

@ -132,7 +132,7 @@ class http2_impl;
class http2 { class http2 {
public: public:
http2(); http2(boost::asio::io_service &service);
~http2(); ~http2();
http2(http2 &&other) noexcept; http2(http2 &&other) noexcept;
@ -190,10 +190,6 @@ public:
// equivalent .- and ..-free URL. // equivalent .- and ..-free URL.
bool handle(std::string pattern, request_cb cb); bool handle(std::string pattern, request_cb cb);
// Sets number of native threads to handle incoming HTTP request.
// It defaults to 1.
void num_threads(size_t num_threads);
// Sets the maximum length to which the queue of pending // Sets the maximum length to which the queue of pending
// connections. // connections.
void backlog(int backlog); void backlog(int backlog);
@ -207,13 +203,6 @@ public:
// Gracefully stop http2 server // Gracefully stop http2 server
void stop(); void stop();
// Join on http2 server and wait for it to fully stop
void join();
// Get access to the io_service objects.
const std::vector<std::shared_ptr<boost::asio::io_service>> &
io_services() const;
private: private:
std::unique_ptr<http2_impl> impl_; std::unique_ptr<http2_impl> impl_;
}; };