asio: Remove threaded task runner

This is too complicated and not suitable for event driven server.  We
plan to expose io_service instead.
This commit is contained in:
Tatsuhiro Tsujikawa 2015-03-04 01:46:59 +09:00
parent 2fa28e790d
commit 062b42918c
12 changed files with 18 additions and 316 deletions

View File

@ -58,7 +58,7 @@ endif # ENABLE_TINY_NGHTTPD
if ENABLE_ASIO_LIB if ENABLE_ASIO_LIB
noinst_PROGRAMS += asio-sv asio-sv2 asio-sv3 asio-cl noinst_PROGRAMS += asio-sv asio-sv2 asio-cl
ASIOCPPFLAGS = ${BOOST_CPPFLAGS} ${AM_CPPFLAGS} ASIOCPPFLAGS = ${BOOST_CPPFLAGS} ${AM_CPPFLAGS}
ASIOLDADD = $(top_builddir)/lib/libnghttp2.la \ ASIOLDADD = $(top_builddir)/lib/libnghttp2.la \
@ -79,10 +79,6 @@ asio_sv2_SOURCES = asio-sv2.cc
asio_sv2_CPPFLAGS = ${ASIOCPPFLAGS} asio_sv2_CPPFLAGS = ${ASIOCPPFLAGS}
asio_sv2_LDADD = ${ASIOLDADD} asio_sv2_LDADD = ${ASIOLDADD}
asio_sv3_SOURCES = asio-sv3.cc
asio_sv3_CPPFLAGS = ${ASIOCPPFLAGS}
asio_sv3_LDADD = ${ASIOLDADD}
asio_cl_SOURCES = asio-cl.cc asio_cl_SOURCES = asio-cl.cc
asio_cl_CPPFLAGS = ${ASIOCPPFLAGS} asio_cl_CPPFLAGS = ${ASIOCPPFLAGS}
asio_cl_LDADD = ${ASIOLDADD} asio_cl_LDADD = ${ASIOLDADD}

View File

@ -1,142 +0,0 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// We wrote this code based on the original code which has the
// following license:
//
// main.cpp
// ~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <unistd.h>
#include <iostream>
#include <string>
#include <deque>
#include <nghttp2/asio_http2.h>
using namespace nghttp2::asio_http2;
using namespace nghttp2::asio_http2::server;
int main(int argc, char *argv[]) {
try {
// Check command line arguments.
if (argc < 4) {
std::cerr << "Usage: asio-sv3 <port> <threads> <tasks> "
<< " <private-key-file> <cert-file>\n";
return 1;
}
uint16_t port = std::stoi(argv[1]);
std::size_t num_threads = std::stoi(argv[2]);
std::size_t num_concurrent_tasks = std::stoi(argv[3]);
http2 server;
server.num_threads(num_threads);
if (argc >= 5) {
server.tls(argv[4], argv[5]);
}
server.num_concurrent_tasks(num_concurrent_tasks);
server.listen("*", port, [](const std::shared_ptr<request> &req,
const std::shared_ptr<response> &res) {
res->write_head(200);
auto msgq = std::make_shared<std::deque<std::string>>();
res->end([msgq](uint8_t * buf, std::size_t len)
-> std::pair<ssize_t, bool> {
if (msgq->empty()) {
// if msgq is empty, tells the library that don't call
// this callback until we call res->resume(). This is
// done by returing std::make_pair(0, false).
return std::make_pair(0, false);
}
auto msg = std::move(msgq->front());
msgq->pop_front();
if (msg.empty()) {
// The empty message signals the end of response in
// this simple protocol.
return std::make_pair(0, true);
}
auto nwrite = std::min(len, msg.size());
std::copy(std::begin(msg), std::begin(msg) + nwrite, buf);
if (msg.size() > nwrite) {
msgq->push_front(msg.substr(nwrite));
}
return std::make_pair(nwrite, false);
});
req->run_task([res, msgq](channel &channel) {
// executed in different thread from request callback
// was called.
// Using res and msgq is not safe inside this callback.
// But using them in callback passed to channel::post is
// safe.
// We just emit simple message "message N\n" in every 1
// second and 3 times in total.
for (std::size_t i = 0; i < 3; ++i) {
msgq->push_back("message " + std::to_string(i + 1) + "\n");
channel.post([res]() {
// executed in same thread where
// request callback was called.
// Tells library we have new message.
res->resume();
});
sleep(1);
}
// Send empty message to signal the end of response
// body.
msgq->push_back("");
channel.post([res]() {
// executed in same thread where request
// callback was called.
res->resume();
});
});
});
} catch (std::exception &e) {
std::cerr << "exception: " << e.what() << "\n";
}
return 0;
}

View File

@ -62,16 +62,14 @@ class connection : public std::enable_shared_from_this<connection<socket_type>>,
public: public:
/// Construct a connection with the given io_service. /// Construct a connection with the given io_service.
template <typename... SocketArgs> template <typename... SocketArgs>
explicit connection(request_cb cb, boost::asio::io_service &task_io_service, explicit connection(request_cb cb, SocketArgs &&... args)
SocketArgs &&... args)
: socket_(std::forward<SocketArgs>(args)...), request_cb_(std::move(cb)), : socket_(std::forward<SocketArgs>(args)...), request_cb_(std::move(cb)),
task_io_service_(task_io_service), writing_(false) {} writing_(false) {}
/// Start the first asynchronous operation for the connection. /// Start the first asynchronous operation for the connection.
void start() { void start() {
handler_ = std::make_shared<http2_handler>( handler_ = std::make_shared<http2_handler>(
socket_.get_io_service(), task_io_service_, [this]() { do_write(); }, socket_.get_io_service(), [this]() { do_write(); }, request_cb_);
request_cb_);
if (handler_->start() != 0) { if (handler_->start() != 0) {
return; return;
} }
@ -151,8 +149,6 @@ private:
request_cb request_cb_; request_cb request_cb_;
boost::asio::io_service &task_io_service_;
std::shared_ptr<http2_handler> handler_; std::shared_ptr<http2_handler> handler_;
/// Buffer for incoming data. /// Buffer for incoming data.

View File

@ -35,20 +35,6 @@ namespace nghttp2 {
namespace asio_http2 { namespace asio_http2 {
channel::channel() : impl_(make_unique<channel_impl>()) {}
void channel::post(void_cb cb) { impl_->post(std::move(cb)); }
channel_impl &channel::impl() { return *impl_; }
channel_impl::channel_impl() : strand_(nullptr) {}
void channel_impl::post(void_cb cb) { strand_->post(std::move(cb)); }
void channel_impl::strand(boost::asio::io_service::strand *strand) {
strand_ = strand;
}
namespace server { namespace server {
extern std::shared_ptr<std::string> cached_date; extern std::shared_ptr<std::string> cached_date;
@ -80,10 +66,6 @@ void request::on_data(data_cb cb) { return impl_->on_data(std::move(cb)); }
void request::on_end(void_cb cb) { return impl_->on_end(std::move(cb)); } void request::on_end(void_cb cb) { return impl_->on_end(std::move(cb)); }
bool request::run_task(thread_cb start) {
return impl_->run_task(std::move(start));
}
request_impl &request::impl() { return *impl_; } request_impl &request::impl() { return *impl_; }
response::response() : impl_(make_unique<response_impl>()) {} response::response() : impl_(make_unique<response_impl>()) {}
@ -162,16 +144,6 @@ void request_impl::on_data(data_cb cb) { on_data_cb_ = std::move(cb); }
void request_impl::on_end(void_cb cb) { on_end_cb_ = std::move(cb); } void request_impl::on_end(void_cb cb) { on_end_cb_ = std::move(cb); }
bool request_impl::run_task(thread_cb start) {
if (closed()) {
return false;
}
auto handler = handler_.lock();
return handler->run_task(std::move(start));
}
void request_impl::handler(std::weak_ptr<http2_handler> h) { void request_impl::handler(std::weak_ptr<http2_handler> h) {
handler_ = std::move(h); handler_ = std::move(h);
} }
@ -460,11 +432,8 @@ int on_frame_not_send_callback(nghttp2_session *session,
} // namespace } // namespace
http2_handler::http2_handler(boost::asio::io_service &io_service, http2_handler::http2_handler(boost::asio::io_service &io_service,
boost::asio::io_service &task_io_service_,
connection_write writefun, request_cb cb) connection_write writefun, request_cb cb)
: writefun_(writefun), request_cb_(std::move(cb)), io_service_(io_service), : writefun_(writefun), request_cb_(std::move(cb)), io_service_(io_service),
task_io_service_(task_io_service_),
strand_(std::make_shared<boost::asio::io_service::strand>(io_service_)),
session_(nullptr), buf_(nullptr), buflen_(0), inside_callback_(false) {} session_(nullptr), buf_(nullptr), buflen_(0), inside_callback_(false) {}
http2_handler::~http2_handler() { nghttp2_session_del(session_); } http2_handler::~http2_handler() { nghttp2_session_del(session_); }
@ -667,23 +636,6 @@ int http2_handler::push_promise(http2_stream &stream, std::string method,
return 0; return 0;
} }
bool http2_handler::run_task(thread_cb start) {
auto strand = strand_;
try {
task_io_service_.post([start, strand]() {
channel chan;
chan.impl().strand(strand.get());
start(chan);
});
return true;
} catch (std::exception &ex) {
return false;
}
}
boost::asio::io_service &http2_handler::io_service() { return io_service_; } boost::asio::io_service &http2_handler::io_service() { return io_service_; }
callback_guard::callback_guard(http2_handler &h) : handler(h) { callback_guard::callback_guard(http2_handler &h) : handler(h) {

View File

@ -40,17 +40,6 @@
namespace nghttp2 { namespace nghttp2 {
namespace asio_http2 { namespace asio_http2 {
class channel_impl {
public:
channel_impl();
void post(void_cb cb);
void strand(boost::asio::io_service::strand *strand);
private:
boost::asio::io_service::strand *strand_;
};
namespace server { namespace server {
class http2_handler; class http2_handler;
@ -76,8 +65,6 @@ public:
void on_data(data_cb cb); void on_data(data_cb cb);
void on_end(void_cb cb); void on_end(void_cb cb);
bool run_task(thread_cb start);
void set_header(std::vector<header> headers); void set_header(std::vector<header> headers);
void add_header(std::string name, std::string value); void add_header(std::string name, std::string value);
void method(std::string method); void method(std::string method);
@ -154,9 +141,8 @@ typedef std::function<void(void)> connection_write;
class http2_handler : public std::enable_shared_from_this<http2_handler> { class http2_handler : public std::enable_shared_from_this<http2_handler> {
public: public:
http2_handler(boost::asio::io_service &io_service, http2_handler(boost::asio::io_service &io_service, connection_write writefun,
boost::asio::io_service &task_io_service, request_cb cb);
connection_write writefun, request_cb cb);
~http2_handler(); ~http2_handler();
@ -185,8 +171,6 @@ public:
int push_promise(http2_stream &stream, std::string method, std::string path, int push_promise(http2_stream &stream, std::string method, std::string path,
std::vector<header> headers); std::vector<header> headers);
bool run_task(thread_cb start);
boost::asio::io_service &io_service(); boost::asio::io_service &io_service();
template <size_t N> template <size_t N>
@ -250,8 +234,6 @@ private:
connection_write writefun_; connection_write writefun_;
request_cb request_cb_; request_cb request_cb_;
boost::asio::io_service &io_service_; boost::asio::io_service &io_service_;
boost::asio::io_service &task_io_service_;
std::shared_ptr<boost::asio::io_service::strand> strand_;
nghttp2_session *session_; nghttp2_session *session_;
const uint8_t *buf_; const uint8_t *buf_;
std::size_t buflen_; std::size_t buflen_;

View File

@ -55,14 +55,9 @@ void http2::tls(std::string private_key_file, std::string certificate_file) {
impl_->tls(std::move(private_key_file), std::move(certificate_file)); impl_->tls(std::move(private_key_file), std::move(certificate_file));
} }
void http2::num_concurrent_tasks(size_t num_concurrent_tasks) {
impl_->num_concurrent_tasks(num_concurrent_tasks);
}
void http2::backlog(int backlog) { impl_->backlog(backlog); } void http2::backlog(int backlog) { impl_->backlog(backlog); }
http2_impl::http2_impl() http2_impl::http2_impl() : num_threads_(1), backlog_(-1) {}
: num_threads_(1), num_concurrent_tasks_(1), backlog_(-1) {}
namespace { namespace {
std::vector<unsigned char> &get_alpn_token() { std::vector<unsigned char> &get_alpn_token() {
@ -114,8 +109,8 @@ void http2_impl::listen(const std::string &address, uint16_t port,
nullptr); nullptr);
} }
server(address, port, num_threads_, num_concurrent_tasks_, std::move(cb), server(address, port, num_threads_, std::move(cb), std::move(ssl_ctx),
std::move(ssl_ctx), backlog_).run(); backlog_).run();
} }
void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; } void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; }
@ -126,10 +121,6 @@ void http2_impl::tls(std::string private_key_file,
certificate_file_ = std::move(certificate_file); certificate_file_ = std::move(certificate_file);
} }
void http2_impl::num_concurrent_tasks(size_t num_concurrent_tasks) {
num_concurrent_tasks_ = num_concurrent_tasks;
}
void http2_impl::backlog(int backlog) { backlog_ = backlog; } void http2_impl::backlog(int backlog) { backlog_ = backlog; }
} // namespace server } // namespace server

View File

@ -43,7 +43,6 @@ public:
void listen(const std::string &address, uint16_t port, request_cb cb); void listen(const std::string &address, uint16_t port, request_cb cb);
void num_threads(size_t num_threads); void num_threads(size_t num_threads);
void tls(std::string private_key_file, std::string certificate_file); void tls(std::string private_key_file, std::string certificate_file);
void num_concurrent_tasks(size_t num_concurrent_tasks);
void backlog(int backlog); void backlog(int backlog);
private: private:
@ -51,7 +50,6 @@ private:
std::string certificate_file_; std::string certificate_file_;
std::unique_ptr<server> server_; std::unique_ptr<server> server_;
std::size_t num_threads_; std::size_t num_threads_;
std::size_t num_concurrent_tasks_;
int backlog_; int backlog_;
}; };

View File

@ -46,9 +46,7 @@ namespace asio_http2 {
namespace server { namespace server {
io_service_pool::io_service_pool(std::size_t pool_size, io_service_pool::io_service_pool(std::size_t pool_size) : next_io_service_(0) {
std::size_t thread_pool_size)
: next_io_service_(0), thread_pool_size_(thread_pool_size) {
if (pool_size == 0) { if (pool_size == 0) {
throw std::runtime_error("io_service_pool size is 0"); throw std::runtime_error("io_service_pool size is 0");
} }
@ -61,16 +59,9 @@ io_service_pool::io_service_pool(std::size_t pool_size,
io_services_.push_back(io_service); io_services_.push_back(io_service);
work_.push_back(work); work_.push_back(work);
} }
auto work = std::make_shared<boost::asio::io_service::work>(task_io_service_);
work_.push_back(work);
} }
void io_service_pool::run() { void io_service_pool::run() {
for (std::size_t i = 0; i < thread_pool_size_; ++i) {
thread_pool_.create_thread([this]() { task_io_service_.run(); });
}
// Create a pool of threads to run all of the io_services. // Create a pool of threads to run all of the io_services.
auto futs = std::vector<std::future<std::size_t>>(); auto futs = std::vector<std::future<std::size_t>>();
@ -85,8 +76,6 @@ void io_service_pool::run() {
for (auto &fut : futs) { for (auto &fut : futs) {
fut.get(); fut.get();
} }
thread_pool_.join_all();
} }
void io_service_pool::stop() { void io_service_pool::stop() {
@ -94,8 +83,6 @@ void io_service_pool::stop() {
for (auto &iosv : io_services_) { for (auto &iosv : io_services_) {
iosv->stop(); iosv->stop();
} }
task_io_service_.stop();
} }
boost::asio::io_service &io_service_pool::get_io_service() { boost::asio::io_service &io_service_pool::get_io_service() {
@ -108,10 +95,6 @@ boost::asio::io_service &io_service_pool::get_io_service() {
return io_service; return io_service;
} }
boost::asio::io_service &io_service_pool::get_task_io_service() {
return task_io_service_;
}
} // namespace server } // namespace server
} // namespace asio_http2 } // namespace asio_http2

View File

@ -57,7 +57,7 @@ namespace server {
class io_service_pool : private boost::noncopyable { class io_service_pool : private boost::noncopyable {
public: public:
/// Construct the io_service pool. /// Construct the io_service pool.
explicit io_service_pool(std::size_t pool_size, std::size_t thread_pool_size); explicit io_service_pool(std::size_t pool_size);
/// Run all io_service objects in the pool. /// Run all io_service objects in the pool.
void run(); void run();
@ -68,25 +68,15 @@ public:
/// Get an io_service to use. /// Get an io_service to use.
boost::asio::io_service &get_io_service(); boost::asio::io_service &get_io_service();
boost::asio::io_service &get_task_io_service();
private: private:
typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
typedef std::shared_ptr<boost::asio::io_service::work> work_ptr;
/// The pool of io_services. /// The pool of io_services.
std::vector<io_service_ptr> io_services_; std::vector<std::shared_ptr<boost::asio::io_service>> io_services_;
boost::asio::io_service task_io_service_;
boost::thread_group thread_pool_;
/// The work that keeps the io_services running. /// The work that keeps the io_services running.
std::vector<work_ptr> work_; std::vector<std::shared_ptr<boost::asio::io_service::work>> work_;
/// The next io_service to use for a connection. /// The next io_service to use for a connection.
std::size_t next_io_service_; std::size_t next_io_service_;
std::size_t thread_pool_size_;
}; };
} // namespace server } // namespace server

View File

@ -43,10 +43,9 @@ namespace asio_http2 {
namespace server { namespace server {
server::server(const std::string &address, uint16_t port, server::server(const std::string &address, uint16_t port,
std::size_t io_service_pool_size, std::size_t thread_pool_size, std::size_t io_service_pool_size, request_cb cb,
request_cb cb,
std::unique_ptr<boost::asio::ssl::context> ssl_ctx, int backlog) std::unique_ptr<boost::asio::ssl::context> ssl_ctx, int backlog)
: io_service_pool_(io_service_pool_size, thread_pool_size), : io_service_pool_(io_service_pool_size),
signals_(io_service_pool_.get_io_service()), signals_(io_service_pool_.get_io_service()),
tick_timer_(io_service_pool_.get_io_service(), tick_timer_(io_service_pool_.get_io_service(),
boost::posix_time::seconds(1)), boost::posix_time::seconds(1)),
@ -113,8 +112,7 @@ typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> ssl_socket;
void server::start_accept() { void server::start_accept() {
if (ssl_ctx_) { if (ssl_ctx_) {
auto new_connection = std::make_shared<connection<ssl_socket>>( auto new_connection = std::make_shared<connection<ssl_socket>>(
request_cb_, io_service_pool_.get_task_io_service(), request_cb_, io_service_pool_.get_io_service(), *ssl_ctx_);
io_service_pool_.get_io_service(), *ssl_ctx_);
for (auto &acceptor : acceptors_) { for (auto &acceptor : acceptors_) {
acceptor.async_accept( acceptor.async_accept(
@ -138,8 +136,7 @@ void server::start_accept() {
} else { } else {
auto new_connection = auto new_connection =
std::make_shared<connection<boost::asio::ip::tcp::socket>>( std::make_shared<connection<boost::asio::ip::tcp::socket>>(
request_cb_, io_service_pool_.get_task_io_service(), request_cb_, io_service_pool_.get_io_service());
io_service_pool_.get_io_service());
for (auto &acceptor : acceptors_) { for (auto &acceptor : acceptors_) {
acceptor.async_accept( acceptor.async_accept(

View File

@ -63,8 +63,7 @@ public:
/// Construct the server to listen on the specified TCP address and port, and /// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory. /// serve up files from the given directory.
explicit server(const std::string &address, uint16_t port, explicit server(const std::string &address, uint16_t port,
std::size_t io_service_pool_size, std::size_t io_service_pool_size, request_cb cb,
std::size_t thread_pool_size, request_cb cb,
std::unique_ptr<boost::asio::ssl::context> ssl_ctx, std::unique_ptr<boost::asio::ssl::context> ssl_ctx,
int backlog = -1); int backlog = -1);

View File

@ -100,29 +100,6 @@ typedef std::function<void(uint32_t)> close_cb;
typedef std::function<std::pair<ssize_t, bool>(uint8_t *buf, std::size_t len)> typedef std::function<std::pair<ssize_t, bool>(uint8_t *buf, std::size_t len)>
read_cb; read_cb;
class channel_impl;
class channel {
public:
// Application must not call this directly.
channel();
// Schedules the execution of callback |cb| in the same thread where
// request callback is called. Therefore, it is same to use request
// or response object in |cb|. The callbacks are executed in the
// same order they are posted though same channel object if they are
// posted from the same thread.
void post(void_cb cb);
// Application must not call this directly.
channel_impl &impl();
private:
std::unique_ptr<channel_impl> impl_;
};
typedef std::function<void(channel &)> thread_cb;
namespace server { namespace server {
class request_impl; class request_impl;
@ -174,17 +151,6 @@ public:
// Returns true if stream has been closed. // Returns true if stream has been closed.
bool closed() const; bool closed() const;
// Runs function |start| in one of background threads. Returns true
// if scheduling task was done successfully.
//
// Since |start| is called in different thread, calling any method
// of request or response object in the callback may cause undefined
// behavior. To safely use them, use channel::post(). A callback
// passed to channel::post() is executed in the same thread where
// request callback is called, so it is safe to use request or
// response object. Example::
bool run_task(thread_cb start);
// Application must not call this directly. // Application must not call this directly.
request_impl &impl(); request_impl &impl();
@ -249,12 +215,6 @@ public:
// be in PEM format. // be in PEM format.
void tls(std::string private_key_file, std::string certificate_file); void tls(std::string private_key_file, std::string certificate_file);
// Sets number of background threads to run concurrent tasks (see
// request::run_task()). It defaults to 1. This is not the number
// of thread to handle incoming HTTP request. For this purpose, see
// num_threads().
void num_concurrent_tasks(size_t num_concurrent_tasks);
// Sets the maximum length to which the queue of pending // Sets the maximum length to which the queue of pending
// connections. // connections.
void backlog(int backlog); void backlog(int backlog);