diff --git a/examples/Makefile.am b/examples/Makefile.am index f54b3460..11715c60 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -59,7 +59,7 @@ endif # HAVE_EPOLL if ENABLE_ASIO_LIB -noinst_PROGRAMS += asio-sv asio-sv2 +noinst_PROGRAMS += asio-sv asio-sv2 asio-sv3 ASIOCPPFLAGS = ${BOOST_CPPFLAGS} ${AM_CPPFLAGS} ASIOLDFLAGS = @JEMALLOC_LIBS@ @@ -75,6 +75,11 @@ asio_sv2_CPPFLAGS = ${ASIOCPPFLAGS} asio_sv2_LDFLAGS = ${ASIOLDFLAGS} asio_sv2_LDADD = ${ASIOLDADD} +asio_sv3_SOURCES = asio-sv3.cc +asio_sv3_CPPFLAGS = ${ASIOCPPFLAGS} +asio_sv3_LDFLAGS = ${ASIOLDFLAGS} +asio_sv3_LDADD = ${ASIOLDADD} + endif # ENABLE_ASIO_LIB endif # ENABLE_EXAMPLES diff --git a/examples/asio-sv3.cc b/examples/asio-sv3.cc new file mode 100644 index 00000000..a49ee3d8 --- /dev/null +++ b/examples/asio-sv3.cc @@ -0,0 +1,150 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2014 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +// We wrote this code based on the original code which has the +// following license: +// +// main.cpp +// ~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include +#include +#include +#include + +#include + +using namespace nghttp2::asio_http2; +using namespace nghttp2::asio_http2::server; + +int main(int argc, char* argv[]) +{ + try { + // Check command line arguments. + if (argc < 4) { + std::cerr << "Usage: asio-sv3 " + << " \n"; + return 1; + } + + uint16_t port = std::stoi(argv[1]); + std::size_t num_threads = std::stoi(argv[2]); + std::size_t num_concurrent_tasks = std::stoi(argv[3]); + + http2 server; + + server.num_threads(num_threads); + + if(argc >= 5) { + server.tls(argv[4], argv[5]); + } + + server.num_concurrent_tasks(num_concurrent_tasks); + + server.listen + ("*", port, + [](std::shared_ptr req, std::shared_ptr res) + { + res->write_head(200); + + auto msgq = std::make_shared>(); + + res->end + ([msgq](uint8_t *buf, std::size_t len) -> std::pair + { + if(msgq->empty()) { + // if msgq is empty, tells the library that don't call + // this callback until we call res->resume(). This is + // done by returing std::make_pair(0, false). + return std::make_pair(0, false); + } + auto msg = std::move(msgq->front()); + msgq->pop_front(); + + if(msg.empty()) { + // The empty message signals the end of response in + // this simple protocol. + return std::make_pair(0, true); + } + + auto nwrite = std::min(len, msg.size()); + std::copy(std::begin(msg), std::begin(msg) + nwrite, buf); + if(msg.size() > nwrite) { + msgq->push_front(msg.substr(nwrite)); + } + return std::make_pair(nwrite, false); + }); + + req->run_task + ([res, msgq](channel& channel) + { + // executed in different thread from request callback + // was called. + + // Using res and msgq is not safe inside this callback. + // But using them in callback passed to channel::post is + // safe. + + // We just emit simple message "message N\n" in every 1 + // second and 3 times in total. + for(std::size_t i = 0; i < 3; ++i) { + msgq->push_back("message " + std::to_string(i + 1) + "\n"); + + channel.post([res]() + { + // executed in same thread where + // request callback was called. + + // Tells library we have new message. + res->resume(); + }); + + sleep(1); + } + + // Send empty message to signal the end of response + // body. + msgq->push_back(""); + + channel.post([res]() + { + // executed in same thread where request + // callback was called. + res->resume(); + }); + + }); + + }); + } catch (std::exception& e) { + std::cerr << "exception: " << e.what() << "\n"; + } + + return 0; +} diff --git a/src/asio_connection.h b/src/asio_connection.h index 4736517b..1416ae06 100644 --- a/src/asio_connection.h +++ b/src/asio_connection.h @@ -64,9 +64,12 @@ class connection public: /// Construct a connection with the given io_service. template - explicit connection(request_cb cb, SocketArgs&&... args) + explicit connection(request_cb cb, + boost::asio::io_service& task_io_service, + SocketArgs&&... args) : socket_(std::forward(args)...), request_cb_(std::move(cb)), + task_io_service_(task_io_service), writing_(false) {} @@ -75,6 +78,7 @@ public: { handler_ = std::make_shared (socket_.get_io_service(), + task_io_service_, [this]() { do_write(); @@ -168,6 +172,8 @@ private: request_cb request_cb_; + boost::asio::io_service& task_io_service_; + std::shared_ptr handler_; /// Buffer for incoming data. diff --git a/src/asio_http2_handler.cc b/src/asio_http2_handler.cc index 7bbe4a27..d6ba0e01 100644 --- a/src/asio_http2_handler.cc +++ b/src/asio_http2_handler.cc @@ -97,6 +97,11 @@ void request::on_end(void_cb cb) return impl_->on_end(std::move(cb)); } +bool request::run_task(thread_cb start) +{ + return impl_->run_task(std::move(start)); +} + request_impl& request::impl() { return *impl_; @@ -122,6 +127,11 @@ void response::end(read_cb cb) impl_->end(std::move(cb)); } +void response::resume() +{ + impl_->resume(); +} + unsigned int response::status_code() const { return impl_->status_code(); @@ -209,7 +219,7 @@ void request_impl::path(std::string arg) bool request_impl::push(std::string method, std::string path, std::vector
headers) { - if(handler_.expired() || stream_.expired()) { + if(closed()) { return false; } @@ -245,6 +255,17 @@ void request_impl::on_end(void_cb cb) on_end_cb_ = std::move(cb); } +bool request_impl::run_task(thread_cb start) +{ + if(closed()) { + return false; + } + + auto handler = handler_.lock(); + + return handler->run_task(std::move(start)); +} + void request_impl::handler(std::weak_ptr h) { handler_ = std::move(h); @@ -311,7 +332,7 @@ void response_impl::end(std::string data) void response_impl::end(read_cb cb) { - if(started_ || handler_.expired() || stream_.expired()) { + if(started_ || closed()) { return; } @@ -331,6 +352,26 @@ void response_impl::end(read_cb cb) } } +bool response_impl::closed() const +{ + return handler_.expired() || stream_.expired(); +} + +void response_impl::resume() +{ + if(closed()) { + return; + } + + auto handler = handler_.lock(); + auto stream = stream_.lock(); + handler->resume(*stream); + + if(!handler->inside_callback()) { + handler->initiate_write(); + } +} + bool response_impl::started() const { return started_; @@ -361,6 +402,34 @@ std::pair response_impl::call_read return std::make_pair(0, true); } +channel::channel() + : impl_(util::make_unique()) +{} + +void channel::post(void_cb cb) +{ + impl_->post(std::move(cb)); +} + +channel_impl& channel::impl() +{ + return *impl_; +} + +channel_impl::channel_impl() + : strand_(nullptr) +{} + +void channel_impl::post(void_cb cb) +{ + strand_->post(std::move(cb)); +} + +void channel_impl::strand(boost::asio::io_service::strand *strand) +{ + strand_ = strand; +} + http2_stream::http2_stream(int32_t stream_id) : request_(std::make_shared()), response_(std::make_shared()), @@ -608,10 +677,14 @@ int on_frame_not_send_callback } // namespace http2_handler::http2_handler -(boost::asio::io_service& io_service, connection_write writefun, request_cb cb) +(boost::asio::io_service& io_service, + boost::asio::io_service& task_io_service_, + connection_write writefun, request_cb cb) : writefun_(writefun), request_cb_(std::move(cb)), io_service_(io_service), + task_io_service_(task_io_service_), + strand_(std::make_shared(io_service_)), session_(nullptr), buf_(nullptr), buflen_(0), @@ -789,6 +862,11 @@ void http2_handler::initiate_write() writefun_(); } +void http2_handler::resume(http2_stream& stream) +{ + nghttp2_session_resume_data(session_, stream.get_stream_id()); +} + int http2_handler::push_promise(http2_stream& stream, std::string method, std::string path, std::vector
headers) @@ -837,6 +915,26 @@ int http2_handler::push_promise(http2_stream& stream, std::string method, return 0; } +bool http2_handler::run_task(thread_cb start) +{ + auto strand = strand_; + + try { + task_io_service_.post + ([start, strand]() + { + channel chan; + chan.impl().strand(strand.get()); + + start(chan); + }); + + return true; + } catch(std::exception& ex) { + return false; + } +} + boost::asio::io_service& http2_handler::io_service() { return io_service_; diff --git a/src/asio_http2_handler.h b/src/asio_http2_handler.h index af59896c..cbeed227 100644 --- a/src/asio_http2_handler.h +++ b/src/asio_http2_handler.h @@ -65,6 +65,8 @@ public: void on_data(data_cb cb); void on_end(void_cb cb); + bool run_task(thread_cb start); + void set_header(std::vector
headers); void add_header(std::string name, std::string value); void method(std::string method); @@ -97,6 +99,8 @@ public: void write_head(unsigned int status_code, std::vector
headers = {}); void end(std::string data = ""); void end(read_cb cb); + void resume(); + bool closed() const; unsigned int status_code() const; const std::vector
& headers() const; @@ -113,6 +117,15 @@ private: bool started_; }; +class channel_impl { +public: + channel_impl(); + void post(void_cb cb); + void strand(boost::asio::io_service::strand *strand); +private: + boost::asio::io_service::strand *strand_; +}; + class http2_stream { public: http2_stream(int32_t stream_id); @@ -137,6 +150,7 @@ typedef std::function connection_write; class http2_handler : public std::enable_shared_from_this { public: http2_handler(boost::asio::io_service& io_service, + boost::asio::io_service& task_io_service, connection_write writefun, request_cb cb); @@ -162,10 +176,14 @@ public: void leave_callback(); bool inside_callback() const; + void resume(http2_stream& stream); + int push_promise(http2_stream& stream, std::string method, std::string path, std::vector
headers); + bool run_task(thread_cb start); + boost::asio::io_service& io_service(); template @@ -231,6 +249,8 @@ private: connection_write writefun_; request_cb request_cb_; boost::asio::io_service& io_service_; + boost::asio::io_service& task_io_service_; + std::shared_ptr strand_; nghttp2_session *session_; const uint8_t *buf_; std::size_t buflen_; diff --git a/src/asio_http2_impl.cc b/src/asio_http2_impl.cc index ded3b1d3..728b67a4 100644 --- a/src/asio_http2_impl.cc +++ b/src/asio_http2_impl.cc @@ -63,8 +63,14 @@ void http2::tls(std::string private_key_file, impl_->tls(std::move(private_key_file), std::move(certificate_file)); } +void http2::num_concurrent_tasks(size_t num_concurrent_tasks) +{ + impl_->num_concurrent_tasks(num_concurrent_tasks); +} + http2_impl::http2_impl() - : num_threads_(1) + : num_threads_(1), + num_concurrent_tasks_(1) {} namespace { @@ -126,7 +132,8 @@ void http2_impl::listen(const std::string& address, uint16_t port, }, nullptr); } - server(address, port, num_threads_, std::move(cb), std::move(ssl_ctx)).run(); + server(address, port, num_threads_, num_concurrent_tasks_, + std::move(cb), std::move(ssl_ctx)).run(); } void http2_impl::num_threads(size_t num_threads) @@ -141,6 +148,11 @@ void http2_impl::tls(std::string private_key_file, certificate_file_ = std::move(certificate_file); } +void http2_impl::num_concurrent_tasks(size_t num_concurrent_tasks) +{ + num_concurrent_tasks_ = num_concurrent_tasks; +} + template std::shared_ptr> defer_shared(T&& t, F f) { diff --git a/src/asio_http2_impl.h b/src/asio_http2_impl.h index 0e1e84c9..9cca408d 100644 --- a/src/asio_http2_impl.h +++ b/src/asio_http2_impl.h @@ -44,11 +44,13 @@ public: request_cb cb); void num_threads(size_t num_threads); void tls(std::string private_key_file, std::string certificate_file); + void num_concurrent_tasks(size_t num_concurrent_tasks); private: std::string private_key_file_; std::string certificate_file_; std::unique_ptr server_; std::size_t num_threads_; + std::size_t num_concurrent_tasks_; }; } // namespace server diff --git a/src/asio_io_service_pool.cc b/src/asio_io_service_pool.cc index 103e5bd7..aa7ffd96 100644 --- a/src/asio_io_service_pool.cc +++ b/src/asio_io_service_pool.cc @@ -46,8 +46,10 @@ namespace asio_http2 { namespace server { -io_service_pool::io_service_pool(std::size_t pool_size) - : next_io_service_(0) +io_service_pool::io_service_pool(std::size_t pool_size, + std::size_t thread_pool_size) + : next_io_service_(0), + thread_pool_size_(thread_pool_size) { if (pool_size == 0) { throw std::runtime_error("io_service_pool size is 0"); @@ -55,17 +57,28 @@ io_service_pool::io_service_pool(std::size_t pool_size) // Give all the io_services work to do so that their run() functions will not // exit until they are explicitly stopped. - for (std::size_t i = 0; i < pool_size; ++i) - { + for (std::size_t i = 0; i < pool_size; ++i) { auto io_service = std::make_shared(); auto work = std::make_shared(*io_service); io_services_.push_back(io_service); work_.push_back(work); } + + auto work = std::make_shared + (task_io_service_); + work_.push_back(work); } void io_service_pool::run() { + for(std::size_t i = 0; i < thread_pool_size_; ++i) { + thread_pool_.create_thread + ([this]() + { + task_io_service_.run(); + }); + } + // Create a pool of threads to run all of the io_services. auto futs = std::vector>(); @@ -81,6 +94,8 @@ void io_service_pool::run() for (auto& fut : futs) { fut.get(); } + + thread_pool_.join_all(); } void io_service_pool::stop() @@ -89,6 +104,8 @@ void io_service_pool::stop() for (auto& iosv : io_services_) { iosv->stop(); } + + task_io_service_.stop(); } boost::asio::io_service& io_service_pool::get_io_service() @@ -102,6 +119,11 @@ boost::asio::io_service& io_service_pool::get_io_service() return io_service; } +boost::asio::io_service& io_service_pool::get_task_io_service() +{ + return task_io_service_; +} + } // namespace server } // namespace asio_http2 diff --git a/src/asio_io_service_pool.h b/src/asio_io_service_pool.h index 5cc9b31f..a5984954 100644 --- a/src/asio_io_service_pool.h +++ b/src/asio_io_service_pool.h @@ -43,6 +43,7 @@ #include #include #include +#include #include @@ -58,7 +59,8 @@ class io_service_pool { public: /// Construct the io_service pool. - explicit io_service_pool(std::size_t pool_size); + explicit io_service_pool(std::size_t pool_size, + std::size_t thread_pool_size); /// Run all io_service objects in the pool. void run(); @@ -69,6 +71,8 @@ public: /// Get an io_service to use. boost::asio::io_service& get_io_service(); + boost::asio::io_service& get_task_io_service(); + private: typedef std::shared_ptr io_service_ptr; typedef std::shared_ptr work_ptr; @@ -76,11 +80,16 @@ private: /// The pool of io_services. std::vector io_services_; + boost::asio::io_service task_io_service_; + boost::thread_group thread_pool_; + /// The work that keeps the io_services running. std::vector work_; /// The next io_service to use for a connection. std::size_t next_io_service_; + + std::size_t thread_pool_size_; }; } // namespace server diff --git a/src/asio_server.cc b/src/asio_server.cc index 50b28575..cbf9fd35 100644 --- a/src/asio_server.cc +++ b/src/asio_server.cc @@ -44,9 +44,10 @@ namespace server { server::server(const std::string& address, uint16_t port, std::size_t io_service_pool_size, + std::size_t thread_pool_size, request_cb cb, std::unique_ptr ssl_ctx) - : io_service_pool_(io_service_pool_size), + : io_service_pool_(io_service_pool_size, thread_pool_size), signals_(io_service_pool_.get_io_service()), tick_timer_(io_service_pool_.get_io_service(), boost::posix_time::seconds(1)), @@ -116,7 +117,8 @@ void server::start_accept() if(ssl_ctx_) { auto new_connection = std::make_shared> - (request_cb_, io_service_pool_.get_io_service(), *ssl_ctx_); + (request_cb_, io_service_pool_.get_task_io_service(), + io_service_pool_.get_io_service(), *ssl_ctx_); acceptor_.async_accept (new_connection->socket().lowest_layer(), @@ -140,7 +142,8 @@ void server::start_accept() } else { auto new_connection = std::make_shared> - (request_cb_, io_service_pool_.get_io_service()); + (request_cb_, io_service_pool_.get_task_io_service(), + io_service_pool_.get_io_service()); acceptor_.async_accept (new_connection->socket(), diff --git a/src/asio_server.h b/src/asio_server.h index a69e9650..1d2b9b67 100644 --- a/src/asio_server.h +++ b/src/asio_server.h @@ -66,6 +66,7 @@ public: /// serve up files from the given directory. explicit server(const std::string& address, uint16_t port, std::size_t io_service_pool_size, + std::size_t thread_pool_size, request_cb cb, std::unique_ptr ssl_ctx); diff --git a/src/includes/nghttp2/asio_http2.h b/src/includes/nghttp2/asio_http2.h index 616e94cd..cee2da3f 100644 --- a/src/includes/nghttp2/asio_http2.h +++ b/src/includes/nghttp2/asio_http2.h @@ -53,11 +53,36 @@ typedef std::function void_cb; // return value is pair of written bytes and bool value indicating // that this is the end of the body. If the end of the body was // reached, return true. If there is error and application wants to -// terminate stream, return std::make_pair(-1, false). Currently, -// returning std::make_pair(0, false) is reserved for future use. +// terminate stream, return std::make_pair(-1, false). Returning +// std::make_pair(0, false) tells the library that don't call this +// callback until application calls response::resume(). This is +// useful when there is no data to send at the moment but there will +// be more to come in near future. typedef std::function (uint8_t *buf, std::size_t len)> read_cb; +class channel_impl; + +class channel { +public: + // Application must not call this directly. + channel(); + + // Schedules the execution of callback |cb| in the same thread where + // request callback is called. Therefore, it is same to use request + // or response object in |cb|. The callbacks are executed in the + // same order they are posted though same channel object if they are + // posted from the same thread. + void post(void_cb cb); + + // Application must not call this directly. + channel_impl& impl(); +private: + std::unique_ptr impl_; +}; + +typedef std::function thread_cb; + class request { public: // Application must not call this directly. @@ -104,6 +129,17 @@ public: // Returns true if stream has been closed. bool closed() const; + // Runs function |start| in one of background threads. Returns true + // if scheduling task was done successfully. + // + // Since |start| is called in different thread, calling any method + // of request or response object in the callback may cause undefined + // behavior. To safely use them, use channel::post(). A callback + // passed to channel::post() is executed in the same thread where + // request callback is called, so it is safe to use request or + // response object. Example:: + bool run_task(thread_cb start); + // Application must not call this directly. request_impl& impl(); private: @@ -127,7 +163,7 @@ public: // further call of end() is allowed. void end(read_cb cb); - // Resumes deferred response. Not implemented yet. + // Resumes deferred response. void resume(); // Returns status code. @@ -142,6 +178,8 @@ private: std::unique_ptr impl_; }; +// This is so called request callback. Called every time request is +// received. typedef std::function, std::shared_ptr)> request_cb; @@ -157,12 +195,19 @@ public: void listen(const std::string& address, uint16_t port, request_cb cb); - // Sets number of native threads. + // Sets number of native threads to handle incoming HTTP request. + // It defaults to 1. void num_threads(size_t num_threads); // Sets TLS private key file and certificate file. Both files must // be in PEM format. void tls(std::string private_key_file, std::string certificate_file); + + // Sets number of background threads to run concurrent tasks (see + // request::run_task()). It defaults to 1. This is not the number + // of thread to handle incoming HTTP request. For this purpose, see + // num_threads(). + void num_concurrent_tasks(size_t num_concurrent_tasks); private: std::unique_ptr impl_; };