Merge branch 'sunxiaoguang-asio_graceful_shutdown'

This commit is contained in:
Tatsuhiro Tsujikawa 2015-04-22 21:43:52 +09:00
commit a200bb1084
8 changed files with 74 additions and 21 deletions

View File

@ -35,8 +35,6 @@
// //
#include "asio_io_service_pool.h" #include "asio_io_service_pool.h"
#include <future>
namespace nghttp2 { namespace nghttp2 {
namespace asio_http2 { namespace asio_http2 {
@ -56,19 +54,23 @@ io_service_pool::io_service_pool(std::size_t pool_size) : next_io_service_(0) {
} }
} }
void io_service_pool::run() { void io_service_pool::run(bool asynchronous) {
// Create a pool of threads to run all of the io_services. // Create a pool of threads to run all of the io_services.
auto futs = std::vector<std::future<std::size_t>>();
for (std::size_t i = 0; i < io_services_.size(); ++i) { for (std::size_t i = 0; i < io_services_.size(); ++i) {
futs.push_back(std::async(std::launch::async, futures_.push_back(std::async(std::launch::async,
(size_t (boost::asio::io_service::*)(void)) & (size_t (boost::asio::io_service::*)(void)) &
boost::asio::io_service::run, boost::asio::io_service::run,
io_services_[i])); io_services_[i]));
} }
if (!asynchronous) {
join();
}
}
void io_service_pool::join() {
// Wait for all threads in the pool to exit. // Wait for all threads in the pool to exit.
for (auto &fut : futs) { for (auto &fut : futures_) {
fut.get(); fut.get();
} }
} }

View File

@ -41,6 +41,7 @@
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <future>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
@ -58,11 +59,14 @@ public:
explicit io_service_pool(std::size_t pool_size); explicit io_service_pool(std::size_t pool_size);
/// Run all io_service objects in the pool. /// Run all io_service objects in the pool.
void run(); void run(bool asynchronous = false);
/// Stop all io_service objects in the pool. /// Stop all io_service objects in the pool.
void stop(); void stop();
/// Join on all io_service objects in the pool.
void join();
/// Get an io_service to use. /// Get an io_service to use.
boost::asio::io_service &get_io_service(); boost::asio::io_service &get_io_service();
@ -75,6 +79,9 @@ private:
/// The next io_service to use for a connection. /// The next io_service to use for a connection.
std::size_t next_io_service_; std::size_t next_io_service_;
/// Futures to all the io_service objects
std::vector<std::future<std::size_t>> futures_;
}; };
} // namespace asio_http2 } // namespace asio_http2

View File

@ -50,7 +50,7 @@ boost::system::error_code
server::listen_and_serve(boost::system::error_code &ec, server::listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context, boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port, const std::string &address, const std::string &port,
int backlog, serve_mux &mux) { int backlog, serve_mux &mux, bool asynchronous) {
ec.clear(); ec.clear();
if (bind_and_listen(ec, address, port, backlog)) { if (bind_and_listen(ec, address, port, backlog)) {
@ -65,7 +65,7 @@ server::listen_and_serve(boost::system::error_code &ec,
} }
} }
io_service_pool_.run(); io_service_pool_.run(asynchronous);
return ec; return ec;
} }
@ -156,6 +156,16 @@ void server::start_accept(tcp::acceptor &acceptor, serve_mux &mux) {
}); });
} }
void server::stop()
{
io_service_pool_.stop();
}
void server::join()
{
io_service_pool_.join();
}
} // namespace server } // namespace server
} // namespace asio_http2 } // namespace asio_http2
} // namespace nghttp2 } // namespace nghttp2

View File

@ -69,7 +69,9 @@ public:
listen_and_serve(boost::system::error_code &ec, listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context, boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port, const std::string &address, const std::string &port,
int backlog, serve_mux &mux); int backlog, serve_mux &mux, bool asynchronous = false);
void join();
void stop();
private: private:
/// Initiate an asynchronous accept operation. /// Initiate an asynchronous accept operation.

View File

@ -54,15 +54,17 @@ http2 &http2::operator=(http2 &&other) noexcept {
boost::system::error_code http2::listen_and_serve(boost::system::error_code &ec, boost::system::error_code http2::listen_and_serve(boost::system::error_code &ec,
const std::string &address, const std::string &address,
const std::string &port) { const std::string &port,
return impl_->listen_and_serve(ec, nullptr, address, port); bool asynchronous) {
return impl_->listen_and_serve(ec, nullptr, address, port, asynchronous);
} }
boost::system::error_code boost::system::error_code
http2::listen_and_serve(boost::system::error_code &ec, http2::listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context &tls_context, boost::asio::ssl::context &tls_context,
const std::string &address, const std::string &port) { const std::string &address, const std::string &port,
return impl_->listen_and_serve(ec, &tls_context, address, port); bool asynchronous) {
return impl_->listen_and_serve(ec, &tls_context, address, port, asynchronous);
} }
void http2::num_threads(size_t num_threads) { impl_->num_threads(num_threads); } void http2::num_threads(size_t num_threads) { impl_->num_threads(num_threads); }
@ -73,6 +75,16 @@ bool http2::handle(std::string pattern, request_cb cb) {
return impl_->handle(std::move(pattern), std::move(cb)); return impl_->handle(std::move(pattern), std::move(cb));
} }
void http2::stop()
{
impl_->stop();
}
void http2::join()
{
return impl_->join();
}
} // namespace server } // namespace server
} // namespace asio_http2 } // namespace asio_http2

View File

@ -41,9 +41,9 @@ http2_impl::http2_impl() : num_threads_(1), backlog_(-1) {}
boost::system::error_code http2_impl::listen_and_serve( boost::system::error_code http2_impl::listen_and_serve(
boost::system::error_code &ec, boost::asio::ssl::context *tls_context, boost::system::error_code &ec, boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port) { const std::string &address, const std::string &port, bool asynchronous) {
return server(num_threads_) server_.reset(new server(num_threads_));
.listen_and_serve(ec, tls_context, address, port, backlog_, mux_); return server_->listen_and_serve(ec, tls_context, address, port, backlog_, mux_, asynchronous);
} }
void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; } void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; }
@ -54,6 +54,15 @@ bool http2_impl::handle(std::string pattern, request_cb cb) {
return mux_.handle(std::move(pattern), std::move(cb)); return mux_.handle(std::move(pattern), std::move(cb));
} }
void http2_impl::stop() {
return server_->stop();
}
void http2_impl::join()
{
return server_->join();
}
} // namespace server } // namespace server
} // namespace asio_http2 } // namespace asio_http2

View File

@ -45,10 +45,13 @@ public:
boost::system::error_code boost::system::error_code
listen_and_serve(boost::system::error_code &ec, listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context, boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port); const std::string &address, const std::string &port,
bool asynchronous);
void num_threads(size_t num_threads); void num_threads(size_t num_threads);
void backlog(int backlog); void backlog(int backlog);
bool handle(std::string pattern, request_cb cb); bool handle(std::string pattern, request_cb cb);
void stop();
void join();
private: private:
std::unique_ptr<server> server_; std::unique_ptr<server> server_;

View File

@ -139,14 +139,16 @@ public:
// incoming requests in cleartext TCP connection. // incoming requests in cleartext TCP connection.
boost::system::error_code listen_and_serve(boost::system::error_code &ec, boost::system::error_code listen_and_serve(boost::system::error_code &ec,
const std::string &address, const std::string &address,
const std::string &port); const std::string &port,
bool asynchronous = false);
// Starts listening connection on given address and port and serves // Starts listening connection on given address and port and serves
// incoming requests in SSL/TLS encrypted connection. // incoming requests in SSL/TLS encrypted connection.
boost::system::error_code boost::system::error_code
listen_and_serve(boost::system::error_code &ec, listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context &tls_context, boost::asio::ssl::context &tls_context,
const std::string &address, const std::string &port); const std::string &address, const std::string &port,
bool asynchronous = false);
// Registers request handler |cb| with path pattern |pattern|. This // Registers request handler |cb| with path pattern |pattern|. This
// function will fail and returns false if same pattern has been // function will fail and returns false if same pattern has been
@ -187,6 +189,12 @@ public:
// connections. // connections.
void backlog(int backlog); void backlog(int backlog);
// Gracefully stop http2 server
void stop();
// Join on http2 server and wait for it to fully stop
void join();
private: private:
std::unique_ptr<http2_impl> impl_; std::unique_ptr<http2_impl> impl_;
}; };