Graceful shutdown and joinable server

This commit is contained in:
Xiaoguang Sun 2015-04-22 17:51:28 +08:00
parent 97648d257f
commit 92a1ca5917
8 changed files with 74 additions and 21 deletions

View File

@ -35,8 +35,6 @@
//
#include "asio_io_service_pool.h"
#include <future>
namespace nghttp2 {
namespace asio_http2 {
@ -56,19 +54,23 @@ io_service_pool::io_service_pool(std::size_t pool_size) : next_io_service_(0) {
}
}
void io_service_pool::run() {
void io_service_pool::run(bool asynchronous) {
// Create a pool of threads to run all of the io_services.
auto futs = std::vector<std::future<std::size_t>>();
for (std::size_t i = 0; i < io_services_.size(); ++i) {
futs.push_back(std::async(std::launch::async,
futures_.push_back(std::async(std::launch::async,
(size_t (boost::asio::io_service::*)(void)) &
boost::asio::io_service::run,
io_services_[i]));
}
if (!asynchronous) {
join();
}
}
void io_service_pool::join() {
// Wait for all threads in the pool to exit.
for (auto &fut : futs) {
for (auto &fut : futures_) {
fut.get();
}
}

View File

@ -41,6 +41,7 @@
#include <vector>
#include <memory>
#include <future>
#include <boost/noncopyable.hpp>
#include <boost/thread.hpp>
@ -58,11 +59,14 @@ public:
explicit io_service_pool(std::size_t pool_size);
/// Run all io_service objects in the pool.
void run();
void run(bool asynchronous = false);
/// Stop all io_service objects in the pool.
void stop();
/// Join on all io_service objects in the pool.
void join();
/// Get an io_service to use.
boost::asio::io_service &get_io_service();
@ -75,6 +79,9 @@ private:
/// The next io_service to use for a connection.
std::size_t next_io_service_;
/// Futures to all the io_service objects
std::vector<std::future<std::size_t>> futures_;
};
} // namespace asio_http2

View File

@ -50,7 +50,7 @@ boost::system::error_code
server::listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port,
int backlog, serve_mux &mux) {
int backlog, serve_mux &mux, bool asynchronous) {
ec.clear();
if (bind_and_listen(ec, address, port, backlog)) {
@ -65,7 +65,7 @@ server::listen_and_serve(boost::system::error_code &ec,
}
}
io_service_pool_.run();
io_service_pool_.run(asynchronous);
return ec;
}
@ -156,6 +156,16 @@ void server::start_accept(tcp::acceptor &acceptor, serve_mux &mux) {
});
}
void server::stop()
{
io_service_pool_.stop();
}
void server::join()
{
io_service_pool_.join();
}
} // namespace server
} // namespace asio_http2
} // namespace nghttp2

View File

@ -69,7 +69,9 @@ public:
listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port,
int backlog, serve_mux &mux);
int backlog, serve_mux &mux, bool asynchronous = false);
void join();
void stop();
private:
/// Initiate an asynchronous accept operation.

View File

@ -54,15 +54,17 @@ http2 &http2::operator=(http2 &&other) noexcept {
boost::system::error_code http2::listen_and_serve(boost::system::error_code &ec,
const std::string &address,
const std::string &port) {
return impl_->listen_and_serve(ec, nullptr, address, port);
const std::string &port,
bool asynchronous) {
return impl_->listen_and_serve(ec, nullptr, address, port, asynchronous);
}
boost::system::error_code
http2::listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context &tls_context,
const std::string &address, const std::string &port) {
return impl_->listen_and_serve(ec, &tls_context, address, port);
const std::string &address, const std::string &port,
bool asynchronous) {
return impl_->listen_and_serve(ec, &tls_context, address, port, asynchronous);
}
void http2::num_threads(size_t num_threads) { impl_->num_threads(num_threads); }
@ -73,6 +75,16 @@ bool http2::handle(std::string pattern, request_cb cb) {
return impl_->handle(std::move(pattern), std::move(cb));
}
void http2::stop()
{
impl_->stop();
}
void http2::join()
{
return impl_->join();
}
} // namespace server
} // namespace asio_http2

View File

@ -41,9 +41,9 @@ http2_impl::http2_impl() : num_threads_(1), backlog_(-1) {}
boost::system::error_code http2_impl::listen_and_serve(
boost::system::error_code &ec, boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port) {
return server(num_threads_)
.listen_and_serve(ec, tls_context, address, port, backlog_, mux_);
const std::string &address, const std::string &port, bool asynchronous) {
server_.reset(new server(num_threads_));
return server_->listen_and_serve(ec, tls_context, address, port, backlog_, mux_, asynchronous);
}
void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; }
@ -54,6 +54,15 @@ bool http2_impl::handle(std::string pattern, request_cb cb) {
return mux_.handle(std::move(pattern), std::move(cb));
}
void http2_impl::stop() {
return server_->stop();
}
void http2_impl::join()
{
return server_->join();
}
} // namespace server
} // namespace asio_http2

View File

@ -45,10 +45,13 @@ public:
boost::system::error_code
listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port);
const std::string &address, const std::string &port,
bool asynchronous);
void num_threads(size_t num_threads);
void backlog(int backlog);
bool handle(std::string pattern, request_cb cb);
void stop();
void join();
private:
std::unique_ptr<server> server_;

View File

@ -139,14 +139,16 @@ public:
// incoming requests in cleartext TCP connection.
boost::system::error_code listen_and_serve(boost::system::error_code &ec,
const std::string &address,
const std::string &port);
const std::string &port,
bool asynchronous = false);
// Starts listening connection on given address and port and serves
// incoming requests in SSL/TLS encrypted connection.
boost::system::error_code
listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context &tls_context,
const std::string &address, const std::string &port);
const std::string &address, const std::string &port,
bool asynchronous = false);
// Registers request handler |cb| with path pattern |pattern|. This
// function will fail and returns false if same pattern has been
@ -187,6 +189,12 @@ public:
// connections.
void backlog(int backlog);
// Gracefully stop http2 server
void stop();
// Join on http2 server and wait for it to fully stop
void join();
private:
std::unique_ptr<http2_impl> impl_;
};