libnghttp2_asio: Add request::run_task to execute task in separate thread

This commit is contained in:
Tatsuhiro Tsujikawa 2014-09-28 16:17:43 +09:00
parent 409316018d
commit 88d7abcc23
12 changed files with 392 additions and 19 deletions

View File

@ -59,7 +59,7 @@ endif # HAVE_EPOLL
if ENABLE_ASIO_LIB
noinst_PROGRAMS += asio-sv asio-sv2
noinst_PROGRAMS += asio-sv asio-sv2 asio-sv3
ASIOCPPFLAGS = ${BOOST_CPPFLAGS} ${AM_CPPFLAGS}
ASIOLDFLAGS = @JEMALLOC_LIBS@
@ -75,6 +75,11 @@ asio_sv2_CPPFLAGS = ${ASIOCPPFLAGS}
asio_sv2_LDFLAGS = ${ASIOLDFLAGS}
asio_sv2_LDADD = ${ASIOLDADD}
asio_sv3_SOURCES = asio-sv3.cc
asio_sv3_CPPFLAGS = ${ASIOCPPFLAGS}
asio_sv3_LDFLAGS = ${ASIOLDFLAGS}
asio_sv3_LDADD = ${ASIOLDADD}
endif # ENABLE_ASIO_LIB
endif # ENABLE_EXAMPLES

150
examples/asio-sv3.cc Normal file
View File

@ -0,0 +1,150 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
// We wrote this code based on the original code which has the
// following license:
//
// main.cpp
// ~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <unistd.h>
#include <iostream>
#include <string>
#include <deque>
#include <nghttp2/asio_http2.h>
using namespace nghttp2::asio_http2;
using namespace nghttp2::asio_http2::server;
int main(int argc, char* argv[])
{
try {
// Check command line arguments.
if (argc < 4) {
std::cerr << "Usage: asio-sv3 <port> <threads> <tasks> "
<< " <private-key-file> <cert-file>\n";
return 1;
}
uint16_t port = std::stoi(argv[1]);
std::size_t num_threads = std::stoi(argv[2]);
std::size_t num_concurrent_tasks = std::stoi(argv[3]);
http2 server;
server.num_threads(num_threads);
if(argc >= 5) {
server.tls(argv[4], argv[5]);
}
server.num_concurrent_tasks(num_concurrent_tasks);
server.listen
("*", port,
[](std::shared_ptr<request> req, std::shared_ptr<response> res)
{
res->write_head(200);
auto msgq = std::make_shared<std::deque<std::string>>();
res->end
([msgq](uint8_t *buf, std::size_t len) -> std::pair<ssize_t, bool>
{
if(msgq->empty()) {
// if msgq is empty, tells the library that don't call
// this callback until we call res->resume(). This is
// done by returing std::make_pair(0, false).
return std::make_pair(0, false);
}
auto msg = std::move(msgq->front());
msgq->pop_front();
if(msg.empty()) {
// The empty message signals the end of response in
// this simple protocol.
return std::make_pair(0, true);
}
auto nwrite = std::min(len, msg.size());
std::copy(std::begin(msg), std::begin(msg) + nwrite, buf);
if(msg.size() > nwrite) {
msgq->push_front(msg.substr(nwrite));
}
return std::make_pair(nwrite, false);
});
req->run_task
([res, msgq](channel& channel)
{
// executed in different thread from request callback
// was called.
// Using res and msgq is not safe inside this callback.
// But using them in callback passed to channel::post is
// safe.
// We just emit simple message "message N\n" in every 1
// second and 3 times in total.
for(std::size_t i = 0; i < 3; ++i) {
msgq->push_back("message " + std::to_string(i + 1) + "\n");
channel.post([res]()
{
// executed in same thread where
// request callback was called.
// Tells library we have new message.
res->resume();
});
sleep(1);
}
// Send empty message to signal the end of response
// body.
msgq->push_back("");
channel.post([res]()
{
// executed in same thread where request
// callback was called.
res->resume();
});
});
});
} catch (std::exception& e) {
std::cerr << "exception: " << e.what() << "\n";
}
return 0;
}

View File

@ -64,9 +64,12 @@ class connection
public:
/// Construct a connection with the given io_service.
template<typename... SocketArgs>
explicit connection(request_cb cb, SocketArgs&&... args)
explicit connection(request_cb cb,
boost::asio::io_service& task_io_service,
SocketArgs&&... args)
: socket_(std::forward<SocketArgs>(args)...),
request_cb_(std::move(cb)),
task_io_service_(task_io_service),
writing_(false)
{}
@ -75,6 +78,7 @@ public:
{
handler_ = std::make_shared<http2_handler>
(socket_.get_io_service(),
task_io_service_,
[this]()
{
do_write();
@ -168,6 +172,8 @@ private:
request_cb request_cb_;
boost::asio::io_service& task_io_service_;
std::shared_ptr<http2_handler> handler_;
/// Buffer for incoming data.

View File

@ -97,6 +97,11 @@ void request::on_end(void_cb cb)
return impl_->on_end(std::move(cb));
}
bool request::run_task(thread_cb start)
{
return impl_->run_task(std::move(start));
}
request_impl& request::impl()
{
return *impl_;
@ -122,6 +127,11 @@ void response::end(read_cb cb)
impl_->end(std::move(cb));
}
void response::resume()
{
impl_->resume();
}
unsigned int response::status_code() const
{
return impl_->status_code();
@ -209,7 +219,7 @@ void request_impl::path(std::string arg)
bool request_impl::push(std::string method, std::string path,
std::vector<header> headers)
{
if(handler_.expired() || stream_.expired()) {
if(closed()) {
return false;
}
@ -245,6 +255,17 @@ void request_impl::on_end(void_cb cb)
on_end_cb_ = std::move(cb);
}
bool request_impl::run_task(thread_cb start)
{
if(closed()) {
return false;
}
auto handler = handler_.lock();
return handler->run_task(std::move(start));
}
void request_impl::handler(std::weak_ptr<http2_handler> h)
{
handler_ = std::move(h);
@ -311,7 +332,7 @@ void response_impl::end(std::string data)
void response_impl::end(read_cb cb)
{
if(started_ || handler_.expired() || stream_.expired()) {
if(started_ || closed()) {
return;
}
@ -331,6 +352,26 @@ void response_impl::end(read_cb cb)
}
}
bool response_impl::closed() const
{
return handler_.expired() || stream_.expired();
}
void response_impl::resume()
{
if(closed()) {
return;
}
auto handler = handler_.lock();
auto stream = stream_.lock();
handler->resume(*stream);
if(!handler->inside_callback()) {
handler->initiate_write();
}
}
bool response_impl::started() const
{
return started_;
@ -361,6 +402,34 @@ std::pair<ssize_t, bool> response_impl::call_read
return std::make_pair(0, true);
}
channel::channel()
: impl_(util::make_unique<channel_impl>())
{}
void channel::post(void_cb cb)
{
impl_->post(std::move(cb));
}
channel_impl& channel::impl()
{
return *impl_;
}
channel_impl::channel_impl()
: strand_(nullptr)
{}
void channel_impl::post(void_cb cb)
{
strand_->post(std::move(cb));
}
void channel_impl::strand(boost::asio::io_service::strand *strand)
{
strand_ = strand;
}
http2_stream::http2_stream(int32_t stream_id)
: request_(std::make_shared<request>()),
response_(std::make_shared<response>()),
@ -608,10 +677,14 @@ int on_frame_not_send_callback
} // namespace
http2_handler::http2_handler
(boost::asio::io_service& io_service, connection_write writefun, request_cb cb)
(boost::asio::io_service& io_service,
boost::asio::io_service& task_io_service_,
connection_write writefun, request_cb cb)
: writefun_(writefun),
request_cb_(std::move(cb)),
io_service_(io_service),
task_io_service_(task_io_service_),
strand_(std::make_shared<boost::asio::io_service::strand>(io_service_)),
session_(nullptr),
buf_(nullptr),
buflen_(0),
@ -789,6 +862,11 @@ void http2_handler::initiate_write()
writefun_();
}
void http2_handler::resume(http2_stream& stream)
{
nghttp2_session_resume_data(session_, stream.get_stream_id());
}
int http2_handler::push_promise(http2_stream& stream, std::string method,
std::string path,
std::vector<header> headers)
@ -837,6 +915,26 @@ int http2_handler::push_promise(http2_stream& stream, std::string method,
return 0;
}
bool http2_handler::run_task(thread_cb start)
{
auto strand = strand_;
try {
task_io_service_.post
([start, strand]()
{
channel chan;
chan.impl().strand(strand.get());
start(chan);
});
return true;
} catch(std::exception& ex) {
return false;
}
}
boost::asio::io_service& http2_handler::io_service()
{
return io_service_;

View File

@ -65,6 +65,8 @@ public:
void on_data(data_cb cb);
void on_end(void_cb cb);
bool run_task(thread_cb start);
void set_header(std::vector<header> headers);
void add_header(std::string name, std::string value);
void method(std::string method);
@ -97,6 +99,8 @@ public:
void write_head(unsigned int status_code, std::vector<header> headers = {});
void end(std::string data = "");
void end(read_cb cb);
void resume();
bool closed() const;
unsigned int status_code() const;
const std::vector<header>& headers() const;
@ -113,6 +117,15 @@ private:
bool started_;
};
class channel_impl {
public:
channel_impl();
void post(void_cb cb);
void strand(boost::asio::io_service::strand *strand);
private:
boost::asio::io_service::strand *strand_;
};
class http2_stream {
public:
http2_stream(int32_t stream_id);
@ -137,6 +150,7 @@ typedef std::function<void(void)> connection_write;
class http2_handler : public std::enable_shared_from_this<http2_handler> {
public:
http2_handler(boost::asio::io_service& io_service,
boost::asio::io_service& task_io_service,
connection_write writefun,
request_cb cb);
@ -162,10 +176,14 @@ public:
void leave_callback();
bool inside_callback() const;
void resume(http2_stream& stream);
int push_promise(http2_stream& stream, std::string method,
std::string path,
std::vector<header> headers);
bool run_task(thread_cb start);
boost::asio::io_service& io_service();
template<size_t N>
@ -231,6 +249,8 @@ private:
connection_write writefun_;
request_cb request_cb_;
boost::asio::io_service& io_service_;
boost::asio::io_service& task_io_service_;
std::shared_ptr<boost::asio::io_service::strand> strand_;
nghttp2_session *session_;
const uint8_t *buf_;
std::size_t buflen_;

View File

@ -63,8 +63,14 @@ void http2::tls(std::string private_key_file,
impl_->tls(std::move(private_key_file), std::move(certificate_file));
}
void http2::num_concurrent_tasks(size_t num_concurrent_tasks)
{
impl_->num_concurrent_tasks(num_concurrent_tasks);
}
http2_impl::http2_impl()
: num_threads_(1)
: num_threads_(1),
num_concurrent_tasks_(1)
{}
namespace {
@ -126,7 +132,8 @@ void http2_impl::listen(const std::string& address, uint16_t port,
}, nullptr);
}
server(address, port, num_threads_, std::move(cb), std::move(ssl_ctx)).run();
server(address, port, num_threads_, num_concurrent_tasks_,
std::move(cb), std::move(ssl_ctx)).run();
}
void http2_impl::num_threads(size_t num_threads)
@ -141,6 +148,11 @@ void http2_impl::tls(std::string private_key_file,
certificate_file_ = std::move(certificate_file);
}
void http2_impl::num_concurrent_tasks(size_t num_concurrent_tasks)
{
num_concurrent_tasks_ = num_concurrent_tasks;
}
template<typename T, typename F>
std::shared_ptr<util::Defer<T, F>> defer_shared(T&& t, F f)
{

View File

@ -44,11 +44,13 @@ public:
request_cb cb);
void num_threads(size_t num_threads);
void tls(std::string private_key_file, std::string certificate_file);
void num_concurrent_tasks(size_t num_concurrent_tasks);
private:
std::string private_key_file_;
std::string certificate_file_;
std::unique_ptr<server> server_;
std::size_t num_threads_;
std::size_t num_concurrent_tasks_;
};
} // namespace server

View File

@ -46,8 +46,10 @@ namespace asio_http2 {
namespace server {
io_service_pool::io_service_pool(std::size_t pool_size)
: next_io_service_(0)
io_service_pool::io_service_pool(std::size_t pool_size,
std::size_t thread_pool_size)
: next_io_service_(0),
thread_pool_size_(thread_pool_size)
{
if (pool_size == 0) {
throw std::runtime_error("io_service_pool size is 0");
@ -55,17 +57,28 @@ io_service_pool::io_service_pool(std::size_t pool_size)
// Give all the io_services work to do so that their run() functions will not
// exit until they are explicitly stopped.
for (std::size_t i = 0; i < pool_size; ++i)
{
for (std::size_t i = 0; i < pool_size; ++i) {
auto io_service = std::make_shared<boost::asio::io_service>();
auto work = std::make_shared<boost::asio::io_service::work>(*io_service);
io_services_.push_back(io_service);
work_.push_back(work);
}
auto work = std::make_shared<boost::asio::io_service::work>
(task_io_service_);
work_.push_back(work);
}
void io_service_pool::run()
{
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
thread_pool_.create_thread
([this]()
{
task_io_service_.run();
});
}
// Create a pool of threads to run all of the io_services.
auto futs = std::vector<std::future<std::size_t>>();
@ -81,6 +94,8 @@ void io_service_pool::run()
for (auto& fut : futs) {
fut.get();
}
thread_pool_.join_all();
}
void io_service_pool::stop()
@ -89,6 +104,8 @@ void io_service_pool::stop()
for (auto& iosv : io_services_) {
iosv->stop();
}
task_io_service_.stop();
}
boost::asio::io_service& io_service_pool::get_io_service()
@ -102,6 +119,11 @@ boost::asio::io_service& io_service_pool::get_io_service()
return io_service;
}
boost::asio::io_service& io_service_pool::get_task_io_service()
{
return task_io_service_;
}
} // namespace server
} // namespace asio_http2

View File

@ -43,6 +43,7 @@
#include <memory>
#include <boost/asio.hpp>
#include <boost/noncopyable.hpp>
#include <boost/thread.hpp>
#include <nghttp2/asio_http2.h>
@ -58,7 +59,8 @@ class io_service_pool
{
public:
/// Construct the io_service pool.
explicit io_service_pool(std::size_t pool_size);
explicit io_service_pool(std::size_t pool_size,
std::size_t thread_pool_size);
/// Run all io_service objects in the pool.
void run();
@ -69,6 +71,8 @@ public:
/// Get an io_service to use.
boost::asio::io_service& get_io_service();
boost::asio::io_service& get_task_io_service();
private:
typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
typedef std::shared_ptr<boost::asio::io_service::work> work_ptr;
@ -76,11 +80,16 @@ private:
/// The pool of io_services.
std::vector<io_service_ptr> io_services_;
boost::asio::io_service task_io_service_;
boost::thread_group thread_pool_;
/// The work that keeps the io_services running.
std::vector<work_ptr> work_;
/// The next io_service to use for a connection.
std::size_t next_io_service_;
std::size_t thread_pool_size_;
};
} // namespace server

View File

@ -44,9 +44,10 @@ namespace server {
server::server(const std::string& address, uint16_t port,
std::size_t io_service_pool_size,
std::size_t thread_pool_size,
request_cb cb,
std::unique_ptr<boost::asio::ssl::context> ssl_ctx)
: io_service_pool_(io_service_pool_size),
: io_service_pool_(io_service_pool_size, thread_pool_size),
signals_(io_service_pool_.get_io_service()),
tick_timer_(io_service_pool_.get_io_service(),
boost::posix_time::seconds(1)),
@ -116,7 +117,8 @@ void server::start_accept()
if(ssl_ctx_) {
auto new_connection =
std::make_shared<connection<ssl_socket>>
(request_cb_, io_service_pool_.get_io_service(), *ssl_ctx_);
(request_cb_, io_service_pool_.get_task_io_service(),
io_service_pool_.get_io_service(), *ssl_ctx_);
acceptor_.async_accept
(new_connection->socket().lowest_layer(),
@ -140,7 +142,8 @@ void server::start_accept()
} else {
auto new_connection =
std::make_shared<connection<boost::asio::ip::tcp::socket>>
(request_cb_, io_service_pool_.get_io_service());
(request_cb_, io_service_pool_.get_task_io_service(),
io_service_pool_.get_io_service());
acceptor_.async_accept
(new_connection->socket(),

View File

@ -66,6 +66,7 @@ public:
/// serve up files from the given directory.
explicit server(const std::string& address, uint16_t port,
std::size_t io_service_pool_size,
std::size_t thread_pool_size,
request_cb cb,
std::unique_ptr<boost::asio::ssl::context> ssl_ctx);

View File

@ -53,11 +53,36 @@ typedef std::function<void(void)> void_cb;
// return value is pair of written bytes and bool value indicating
// that this is the end of the body. If the end of the body was
// reached, return true. If there is error and application wants to
// terminate stream, return std::make_pair(-1, false). Currently,
// returning std::make_pair(0, false) is reserved for future use.
// terminate stream, return std::make_pair(-1, false). Returning
// std::make_pair(0, false) tells the library that don't call this
// callback until application calls response::resume(). This is
// useful when there is no data to send at the moment but there will
// be more to come in near future.
typedef std::function<std::pair<ssize_t, bool>
(uint8_t *buf, std::size_t len)> read_cb;
class channel_impl;
class channel {
public:
// Application must not call this directly.
channel();
// Schedules the execution of callback |cb| in the same thread where
// request callback is called. Therefore, it is same to use request
// or response object in |cb|. The callbacks are executed in the
// same order they are posted though same channel object if they are
// posted from the same thread.
void post(void_cb cb);
// Application must not call this directly.
channel_impl& impl();
private:
std::unique_ptr<channel_impl> impl_;
};
typedef std::function<void(channel&)> thread_cb;
class request {
public:
// Application must not call this directly.
@ -104,6 +129,17 @@ public:
// Returns true if stream has been closed.
bool closed() const;
// Runs function |start| in one of background threads. Returns true
// if scheduling task was done successfully.
//
// Since |start| is called in different thread, calling any method
// of request or response object in the callback may cause undefined
// behavior. To safely use them, use channel::post(). A callback
// passed to channel::post() is executed in the same thread where
// request callback is called, so it is safe to use request or
// response object. Example::
bool run_task(thread_cb start);
// Application must not call this directly.
request_impl& impl();
private:
@ -127,7 +163,7 @@ public:
// further call of end() is allowed.
void end(read_cb cb);
// Resumes deferred response. Not implemented yet.
// Resumes deferred response.
void resume();
// Returns status code.
@ -142,6 +178,8 @@ private:
std::unique_ptr<response_impl> impl_;
};
// This is so called request callback. Called every time request is
// received.
typedef std::function<void(std::shared_ptr<request>,
std::shared_ptr<response>)> request_cb;
@ -157,12 +195,19 @@ public:
void listen(const std::string& address, uint16_t port,
request_cb cb);
// Sets number of native threads.
// Sets number of native threads to handle incoming HTTP request.
// It defaults to 1.
void num_threads(size_t num_threads);
// Sets TLS private key file and certificate file. Both files must
// be in PEM format.
void tls(std::string private_key_file, std::string certificate_file);
// Sets number of background threads to run concurrent tasks (see
// request::run_task()). It defaults to 1. This is not the number
// of thread to handle incoming HTTP request. For this purpose, see
// num_threads().
void num_concurrent_tasks(size_t num_concurrent_tasks);
private:
std::unique_ptr<http2_impl> impl_;
};