From 062b42918c864b0a5eb2d09eafc37e24f5f76cbd Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 4 Mar 2015 01:46:59 +0900 Subject: [PATCH] asio: Remove threaded task runner This is too complicated and not suitable for event driven server. We plan to expose io_service instead. --- examples/Makefile.am | 6 +- examples/asio-sv3.cc | 142 ------------------------------ src/asio_connection.h | 10 +-- src/asio_http2_handler.cc | 48 ---------- src/asio_http2_handler.h | 22 +---- src/asio_http2_impl.cc | 15 +--- src/asio_http2_impl.h | 2 - src/asio_io_service_pool.cc | 19 +--- src/asio_io_service_pool.h | 16 +--- src/asio_server.cc | 11 +-- src/asio_server.h | 3 +- src/includes/nghttp2/asio_http2.h | 40 --------- 12 files changed, 18 insertions(+), 316 deletions(-) delete mode 100644 examples/asio-sv3.cc diff --git a/examples/Makefile.am b/examples/Makefile.am index ea817276..e323384a 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -58,7 +58,7 @@ endif # ENABLE_TINY_NGHTTPD if ENABLE_ASIO_LIB -noinst_PROGRAMS += asio-sv asio-sv2 asio-sv3 asio-cl +noinst_PROGRAMS += asio-sv asio-sv2 asio-cl ASIOCPPFLAGS = ${BOOST_CPPFLAGS} ${AM_CPPFLAGS} ASIOLDADD = $(top_builddir)/lib/libnghttp2.la \ @@ -79,10 +79,6 @@ asio_sv2_SOURCES = asio-sv2.cc asio_sv2_CPPFLAGS = ${ASIOCPPFLAGS} asio_sv2_LDADD = ${ASIOLDADD} -asio_sv3_SOURCES = asio-sv3.cc -asio_sv3_CPPFLAGS = ${ASIOCPPFLAGS} -asio_sv3_LDADD = ${ASIOLDADD} - asio_cl_SOURCES = asio-cl.cc asio_cl_CPPFLAGS = ${ASIOCPPFLAGS} asio_cl_LDADD = ${ASIOLDADD} diff --git a/examples/asio-sv3.cc b/examples/asio-sv3.cc deleted file mode 100644 index 66a5e2b5..00000000 --- a/examples/asio-sv3.cc +++ /dev/null @@ -1,142 +0,0 @@ -/* - * nghttp2 - HTTP/2 C Library - * - * Copyright (c) 2014 Tatsuhiro Tsujikawa - * - * Permission is hereby granted, free of charge, to any person obtaining - * a copy of this software and associated documentation files (the - * "Software"), to deal in the Software without restriction, including - * without limitation the rights to use, copy, modify, merge, publish, - * distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to - * the following conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE - * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION - * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION - * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ -// We wrote this code based on the original code which has the -// following license: -// -// main.cpp -// ~~~~~~~~ -// -// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include -#include -#include -#include - -#include - -using namespace nghttp2::asio_http2; -using namespace nghttp2::asio_http2::server; - -int main(int argc, char *argv[]) { - try { - // Check command line arguments. - if (argc < 4) { - std::cerr << "Usage: asio-sv3 " - << " \n"; - return 1; - } - - uint16_t port = std::stoi(argv[1]); - std::size_t num_threads = std::stoi(argv[2]); - std::size_t num_concurrent_tasks = std::stoi(argv[3]); - - http2 server; - - server.num_threads(num_threads); - - if (argc >= 5) { - server.tls(argv[4], argv[5]); - } - - server.num_concurrent_tasks(num_concurrent_tasks); - - server.listen("*", port, [](const std::shared_ptr &req, - const std::shared_ptr &res) { - res->write_head(200); - - auto msgq = std::make_shared>(); - - res->end([msgq](uint8_t * buf, std::size_t len) - -> std::pair { - if (msgq->empty()) { - // if msgq is empty, tells the library that don't call - // this callback until we call res->resume(). This is - // done by returing std::make_pair(0, false). - return std::make_pair(0, false); - } - auto msg = std::move(msgq->front()); - msgq->pop_front(); - - if (msg.empty()) { - // The empty message signals the end of response in - // this simple protocol. - return std::make_pair(0, true); - } - - auto nwrite = std::min(len, msg.size()); - std::copy(std::begin(msg), std::begin(msg) + nwrite, buf); - if (msg.size() > nwrite) { - msgq->push_front(msg.substr(nwrite)); - } - return std::make_pair(nwrite, false); - }); - - req->run_task([res, msgq](channel &channel) { - // executed in different thread from request callback - // was called. - - // Using res and msgq is not safe inside this callback. - // But using them in callback passed to channel::post is - // safe. - - // We just emit simple message "message N\n" in every 1 - // second and 3 times in total. - for (std::size_t i = 0; i < 3; ++i) { - msgq->push_back("message " + std::to_string(i + 1) + "\n"); - - channel.post([res]() { - // executed in same thread where - // request callback was called. - - // Tells library we have new message. - res->resume(); - }); - - sleep(1); - } - - // Send empty message to signal the end of response - // body. - msgq->push_back(""); - - channel.post([res]() { - // executed in same thread where request - // callback was called. - res->resume(); - }); - - }); - - }); - } catch (std::exception &e) { - std::cerr << "exception: " << e.what() << "\n"; - } - - return 0; -} diff --git a/src/asio_connection.h b/src/asio_connection.h index 8be709c2..03e8f43f 100644 --- a/src/asio_connection.h +++ b/src/asio_connection.h @@ -62,16 +62,14 @@ class connection : public std::enable_shared_from_this>, public: /// Construct a connection with the given io_service. template - explicit connection(request_cb cb, boost::asio::io_service &task_io_service, - SocketArgs &&... args) + explicit connection(request_cb cb, SocketArgs &&... args) : socket_(std::forward(args)...), request_cb_(std::move(cb)), - task_io_service_(task_io_service), writing_(false) {} + writing_(false) {} /// Start the first asynchronous operation for the connection. void start() { handler_ = std::make_shared( - socket_.get_io_service(), task_io_service_, [this]() { do_write(); }, - request_cb_); + socket_.get_io_service(), [this]() { do_write(); }, request_cb_); if (handler_->start() != 0) { return; } @@ -151,8 +149,6 @@ private: request_cb request_cb_; - boost::asio::io_service &task_io_service_; - std::shared_ptr handler_; /// Buffer for incoming data. diff --git a/src/asio_http2_handler.cc b/src/asio_http2_handler.cc index f3302ec8..f9c54cea 100644 --- a/src/asio_http2_handler.cc +++ b/src/asio_http2_handler.cc @@ -35,20 +35,6 @@ namespace nghttp2 { namespace asio_http2 { -channel::channel() : impl_(make_unique()) {} - -void channel::post(void_cb cb) { impl_->post(std::move(cb)); } - -channel_impl &channel::impl() { return *impl_; } - -channel_impl::channel_impl() : strand_(nullptr) {} - -void channel_impl::post(void_cb cb) { strand_->post(std::move(cb)); } - -void channel_impl::strand(boost::asio::io_service::strand *strand) { - strand_ = strand; -} - namespace server { extern std::shared_ptr cached_date; @@ -80,10 +66,6 @@ void request::on_data(data_cb cb) { return impl_->on_data(std::move(cb)); } void request::on_end(void_cb cb) { return impl_->on_end(std::move(cb)); } -bool request::run_task(thread_cb start) { - return impl_->run_task(std::move(start)); -} - request_impl &request::impl() { return *impl_; } response::response() : impl_(make_unique()) {} @@ -162,16 +144,6 @@ void request_impl::on_data(data_cb cb) { on_data_cb_ = std::move(cb); } void request_impl::on_end(void_cb cb) { on_end_cb_ = std::move(cb); } -bool request_impl::run_task(thread_cb start) { - if (closed()) { - return false; - } - - auto handler = handler_.lock(); - - return handler->run_task(std::move(start)); -} - void request_impl::handler(std::weak_ptr h) { handler_ = std::move(h); } @@ -460,11 +432,8 @@ int on_frame_not_send_callback(nghttp2_session *session, } // namespace http2_handler::http2_handler(boost::asio::io_service &io_service, - boost::asio::io_service &task_io_service_, connection_write writefun, request_cb cb) : writefun_(writefun), request_cb_(std::move(cb)), io_service_(io_service), - task_io_service_(task_io_service_), - strand_(std::make_shared(io_service_)), session_(nullptr), buf_(nullptr), buflen_(0), inside_callback_(false) {} http2_handler::~http2_handler() { nghttp2_session_del(session_); } @@ -667,23 +636,6 @@ int http2_handler::push_promise(http2_stream &stream, std::string method, return 0; } -bool http2_handler::run_task(thread_cb start) { - auto strand = strand_; - - try { - task_io_service_.post([start, strand]() { - channel chan; - chan.impl().strand(strand.get()); - - start(chan); - }); - - return true; - } catch (std::exception &ex) { - return false; - } -} - boost::asio::io_service &http2_handler::io_service() { return io_service_; } callback_guard::callback_guard(http2_handler &h) : handler(h) { diff --git a/src/asio_http2_handler.h b/src/asio_http2_handler.h index 5dc71210..2e684965 100644 --- a/src/asio_http2_handler.h +++ b/src/asio_http2_handler.h @@ -40,17 +40,6 @@ namespace nghttp2 { namespace asio_http2 { - -class channel_impl { -public: - channel_impl(); - void post(void_cb cb); - void strand(boost::asio::io_service::strand *strand); - -private: - boost::asio::io_service::strand *strand_; -}; - namespace server { class http2_handler; @@ -76,8 +65,6 @@ public: void on_data(data_cb cb); void on_end(void_cb cb); - bool run_task(thread_cb start); - void set_header(std::vector
headers); void add_header(std::string name, std::string value); void method(std::string method); @@ -154,9 +141,8 @@ typedef std::function connection_write; class http2_handler : public std::enable_shared_from_this { public: - http2_handler(boost::asio::io_service &io_service, - boost::asio::io_service &task_io_service, - connection_write writefun, request_cb cb); + http2_handler(boost::asio::io_service &io_service, connection_write writefun, + request_cb cb); ~http2_handler(); @@ -185,8 +171,6 @@ public: int push_promise(http2_stream &stream, std::string method, std::string path, std::vector
headers); - bool run_task(thread_cb start); - boost::asio::io_service &io_service(); template @@ -250,8 +234,6 @@ private: connection_write writefun_; request_cb request_cb_; boost::asio::io_service &io_service_; - boost::asio::io_service &task_io_service_; - std::shared_ptr strand_; nghttp2_session *session_; const uint8_t *buf_; std::size_t buflen_; diff --git a/src/asio_http2_impl.cc b/src/asio_http2_impl.cc index ab575ac0..4a823349 100644 --- a/src/asio_http2_impl.cc +++ b/src/asio_http2_impl.cc @@ -55,14 +55,9 @@ void http2::tls(std::string private_key_file, std::string certificate_file) { impl_->tls(std::move(private_key_file), std::move(certificate_file)); } -void http2::num_concurrent_tasks(size_t num_concurrent_tasks) { - impl_->num_concurrent_tasks(num_concurrent_tasks); -} - void http2::backlog(int backlog) { impl_->backlog(backlog); } -http2_impl::http2_impl() - : num_threads_(1), num_concurrent_tasks_(1), backlog_(-1) {} +http2_impl::http2_impl() : num_threads_(1), backlog_(-1) {} namespace { std::vector &get_alpn_token() { @@ -114,8 +109,8 @@ void http2_impl::listen(const std::string &address, uint16_t port, nullptr); } - server(address, port, num_threads_, num_concurrent_tasks_, std::move(cb), - std::move(ssl_ctx), backlog_).run(); + server(address, port, num_threads_, std::move(cb), std::move(ssl_ctx), + backlog_).run(); } void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; } @@ -126,10 +121,6 @@ void http2_impl::tls(std::string private_key_file, certificate_file_ = std::move(certificate_file); } -void http2_impl::num_concurrent_tasks(size_t num_concurrent_tasks) { - num_concurrent_tasks_ = num_concurrent_tasks; -} - void http2_impl::backlog(int backlog) { backlog_ = backlog; } } // namespace server diff --git a/src/asio_http2_impl.h b/src/asio_http2_impl.h index 3b531056..eb99f4a1 100644 --- a/src/asio_http2_impl.h +++ b/src/asio_http2_impl.h @@ -43,7 +43,6 @@ public: void listen(const std::string &address, uint16_t port, request_cb cb); void num_threads(size_t num_threads); void tls(std::string private_key_file, std::string certificate_file); - void num_concurrent_tasks(size_t num_concurrent_tasks); void backlog(int backlog); private: @@ -51,7 +50,6 @@ private: std::string certificate_file_; std::unique_ptr server_; std::size_t num_threads_; - std::size_t num_concurrent_tasks_; int backlog_; }; diff --git a/src/asio_io_service_pool.cc b/src/asio_io_service_pool.cc index 4b9e39e3..90241b79 100644 --- a/src/asio_io_service_pool.cc +++ b/src/asio_io_service_pool.cc @@ -46,9 +46,7 @@ namespace asio_http2 { namespace server { -io_service_pool::io_service_pool(std::size_t pool_size, - std::size_t thread_pool_size) - : next_io_service_(0), thread_pool_size_(thread_pool_size) { +io_service_pool::io_service_pool(std::size_t pool_size) : next_io_service_(0) { if (pool_size == 0) { throw std::runtime_error("io_service_pool size is 0"); } @@ -61,16 +59,9 @@ io_service_pool::io_service_pool(std::size_t pool_size, io_services_.push_back(io_service); work_.push_back(work); } - - auto work = std::make_shared(task_io_service_); - work_.push_back(work); } void io_service_pool::run() { - for (std::size_t i = 0; i < thread_pool_size_; ++i) { - thread_pool_.create_thread([this]() { task_io_service_.run(); }); - } - // Create a pool of threads to run all of the io_services. auto futs = std::vector>(); @@ -85,8 +76,6 @@ void io_service_pool::run() { for (auto &fut : futs) { fut.get(); } - - thread_pool_.join_all(); } void io_service_pool::stop() { @@ -94,8 +83,6 @@ void io_service_pool::stop() { for (auto &iosv : io_services_) { iosv->stop(); } - - task_io_service_.stop(); } boost::asio::io_service &io_service_pool::get_io_service() { @@ -108,10 +95,6 @@ boost::asio::io_service &io_service_pool::get_io_service() { return io_service; } -boost::asio::io_service &io_service_pool::get_task_io_service() { - return task_io_service_; -} - } // namespace server } // namespace asio_http2 diff --git a/src/asio_io_service_pool.h b/src/asio_io_service_pool.h index f29fb4df..ac0a76c3 100644 --- a/src/asio_io_service_pool.h +++ b/src/asio_io_service_pool.h @@ -57,7 +57,7 @@ namespace server { class io_service_pool : private boost::noncopyable { public: /// Construct the io_service pool. - explicit io_service_pool(std::size_t pool_size, std::size_t thread_pool_size); + explicit io_service_pool(std::size_t pool_size); /// Run all io_service objects in the pool. void run(); @@ -68,25 +68,15 @@ public: /// Get an io_service to use. boost::asio::io_service &get_io_service(); - boost::asio::io_service &get_task_io_service(); - private: - typedef std::shared_ptr io_service_ptr; - typedef std::shared_ptr work_ptr; - /// The pool of io_services. - std::vector io_services_; - - boost::asio::io_service task_io_service_; - boost::thread_group thread_pool_; + std::vector> io_services_; /// The work that keeps the io_services running. - std::vector work_; + std::vector> work_; /// The next io_service to use for a connection. std::size_t next_io_service_; - - std::size_t thread_pool_size_; }; } // namespace server diff --git a/src/asio_server.cc b/src/asio_server.cc index 39effabc..811dcb5e 100644 --- a/src/asio_server.cc +++ b/src/asio_server.cc @@ -43,10 +43,9 @@ namespace asio_http2 { namespace server { server::server(const std::string &address, uint16_t port, - std::size_t io_service_pool_size, std::size_t thread_pool_size, - request_cb cb, + std::size_t io_service_pool_size, request_cb cb, std::unique_ptr ssl_ctx, int backlog) - : io_service_pool_(io_service_pool_size, thread_pool_size), + : io_service_pool_(io_service_pool_size), signals_(io_service_pool_.get_io_service()), tick_timer_(io_service_pool_.get_io_service(), boost::posix_time::seconds(1)), @@ -113,8 +112,7 @@ typedef boost::asio::ssl::stream ssl_socket; void server::start_accept() { if (ssl_ctx_) { auto new_connection = std::make_shared>( - request_cb_, io_service_pool_.get_task_io_service(), - io_service_pool_.get_io_service(), *ssl_ctx_); + request_cb_, io_service_pool_.get_io_service(), *ssl_ctx_); for (auto &acceptor : acceptors_) { acceptor.async_accept( @@ -138,8 +136,7 @@ void server::start_accept() { } else { auto new_connection = std::make_shared>( - request_cb_, io_service_pool_.get_task_io_service(), - io_service_pool_.get_io_service()); + request_cb_, io_service_pool_.get_io_service()); for (auto &acceptor : acceptors_) { acceptor.async_accept( diff --git a/src/asio_server.h b/src/asio_server.h index ac108272..69090948 100644 --- a/src/asio_server.h +++ b/src/asio_server.h @@ -63,8 +63,7 @@ public: /// Construct the server to listen on the specified TCP address and port, and /// serve up files from the given directory. explicit server(const std::string &address, uint16_t port, - std::size_t io_service_pool_size, - std::size_t thread_pool_size, request_cb cb, + std::size_t io_service_pool_size, request_cb cb, std::unique_ptr ssl_ctx, int backlog = -1); diff --git a/src/includes/nghttp2/asio_http2.h b/src/includes/nghttp2/asio_http2.h index 3d3b4959..34794f36 100644 --- a/src/includes/nghttp2/asio_http2.h +++ b/src/includes/nghttp2/asio_http2.h @@ -100,29 +100,6 @@ typedef std::function close_cb; typedef std::function(uint8_t *buf, std::size_t len)> read_cb; -class channel_impl; - -class channel { -public: - // Application must not call this directly. - channel(); - - // Schedules the execution of callback |cb| in the same thread where - // request callback is called. Therefore, it is same to use request - // or response object in |cb|. The callbacks are executed in the - // same order they are posted though same channel object if they are - // posted from the same thread. - void post(void_cb cb); - - // Application must not call this directly. - channel_impl &impl(); - -private: - std::unique_ptr impl_; -}; - -typedef std::function thread_cb; - namespace server { class request_impl; @@ -174,17 +151,6 @@ public: // Returns true if stream has been closed. bool closed() const; - // Runs function |start| in one of background threads. Returns true - // if scheduling task was done successfully. - // - // Since |start| is called in different thread, calling any method - // of request or response object in the callback may cause undefined - // behavior. To safely use them, use channel::post(). A callback - // passed to channel::post() is executed in the same thread where - // request callback is called, so it is safe to use request or - // response object. Example:: - bool run_task(thread_cb start); - // Application must not call this directly. request_impl &impl(); @@ -249,12 +215,6 @@ public: // be in PEM format. void tls(std::string private_key_file, std::string certificate_file); - // Sets number of background threads to run concurrent tasks (see - // request::run_task()). It defaults to 1. This is not the number - // of thread to handle incoming HTTP request. For this purpose, see - // num_threads(). - void num_concurrent_tasks(size_t num_concurrent_tasks); - // Sets the maximum length to which the queue of pending // connections. void backlog(int backlog);