feat: get thread_info before asio server start

add set_on_all_threads_created_callback for asio server,
this callback is called after all threads created and
before threads enter blocking loop.
This commit is contained in:
Jacky_Yin 2021-08-16 19:50:03 +08:00
parent 8cee15bc5a
commit d7f98c48d1
9 changed files with 74 additions and 11 deletions

View File

@ -33,6 +33,9 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <algorithm>
#include "asio_io_service_pool.h"
namespace nghttp2 {
@ -52,15 +55,39 @@ io_service_pool::io_service_pool(std::size_t pool_size) : next_io_service_(0) {
io_services_.push_back(io_service);
work_.push_back(work);
}
// Pre-allocate vector space.
io_services_thread_info_.resize(pool_size);
}
void io_service_pool::run(bool asynchronous) {
size_t io_service_pool::io_thread_run(size_t thread_index) {
auto io_service_thread_info = std::make_shared<thread_info>(gettid());
io_services_thread_info_[thread_index] = io_service_thread_info;
return io_services_[thread_index]->run();
}
void io_service_pool::run(bool asynchronous,
on_all_threads_created_cb all_threads_created_cb) {
// Create a pool of threads to run all of the io_services.
for (std::size_t i = 0; i < io_services_.size(); ++i) {
futures_.push_back(std::async(std::launch::async,
(size_t(boost::asio::io_service::*)(void)) &
boost::asio::io_service::run,
io_services_[i]));
(size_t(io_service_pool::*)(size_t)) &
io_service_pool::io_thread_run,
this, i));
}
if (all_threads_created_cb) {
// Wait until all thread created.
while (1) {
if (std::all_of(io_services_thread_info_.cbegin(),
io_services_thread_info_.cend(),
[](auto i) { return i && i != nullptr; })) {
break;
}
usleep(10);
}
// Execute callback before blocking.
all_threads_created_cb(io_services_thread_info_);
}
if (!asynchronous) {

View File

@ -59,7 +59,8 @@ public:
explicit io_service_pool(std::size_t pool_size);
/// Run all io_service objects in the pool.
void run(bool asynchronous = false);
void run(bool asynchronous = false,
on_all_threads_created_cb all_threads_created_cb = nullptr);
/// Stop all io_service objects in the pool.
void force_stop();
@ -87,8 +88,14 @@ private:
/// The next io_service to use for a connection.
std::size_t next_io_service_;
/// Futures to all the io_service objects
/// Futures to all the io_service objects.
std::vector<std::future<std::size_t>> futures_;
/// Execute this function when new thread is created.
size_t io_thread_run(size_t index);
/// Hold the thread info for all io_services.
std::vector<std::shared_ptr<thread_info>> io_services_thread_info_;
};
} // namespace asio_http2

View File

@ -55,7 +55,8 @@ boost::system::error_code
server::listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port,
int backlog, serve_mux &mux, bool asynchronous) {
int backlog, serve_mux &mux, bool asynchronous,
on_all_threads_created_cb all_threads_created_cb) {
ec.clear();
if (bind_and_listen(ec, address, port, backlog)) {
@ -70,7 +71,7 @@ server::listen_and_serve(boost::system::error_code &ec,
}
}
io_service_pool_.run(asynchronous);
io_service_pool_.run(asynchronous, all_threads_created_cb);
return ec;
}

View File

@ -71,7 +71,8 @@ public:
listen_and_serve(boost::system::error_code &ec,
boost::asio::ssl::context *tls_context,
const std::string &address, const std::string &port,
int backlog, serve_mux &mux, bool asynchronous = false);
int backlog, serve_mux &mux, bool asynchronous = false,
on_all_threads_created_cb all_threads_created_cb = nullptr);
void join();
void stop();

View File

@ -69,6 +69,10 @@ void http2::num_threads(size_t num_threads) { impl_->num_threads(num_threads); }
void http2::backlog(int backlog) { impl_->backlog(backlog); }
void http2::set_on_all_threads_created_callback(on_all_threads_created_cb cb) {
impl_->set_on_all_threads_created_callback(cb);
}
void http2::tls_handshake_timeout(const boost::posix_time::time_duration &t) {
impl_->tls_handshake_timeout(t);
}

View File

@ -41,7 +41,8 @@ http2_impl::http2_impl()
: num_threads_(1),
backlog_(-1),
tls_handshake_timeout_(boost::posix_time::seconds(60)),
read_timeout_(boost::posix_time::seconds(60)) {}
read_timeout_(boost::posix_time::seconds(60)),
all_threads_created_cb_(nullptr) {}
boost::system::error_code http2_impl::listen_and_serve(
boost::system::error_code &ec, boost::asio::ssl::context *tls_context,
@ -49,13 +50,18 @@ boost::system::error_code http2_impl::listen_and_serve(
server_.reset(
new server(num_threads_, tls_handshake_timeout_, read_timeout_));
return server_->listen_and_serve(ec, tls_context, address, port, backlog_,
mux_, asynchronous);
mux_, asynchronous, all_threads_created_cb_);
}
void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; }
void http2_impl::backlog(int backlog) { backlog_ = backlog; }
void http2_impl::set_on_all_threads_created_callback(
on_all_threads_created_cb cb) {
all_threads_created_cb_ = cb;
}
void http2_impl::tls_handshake_timeout(
const boost::posix_time::time_duration &t) {
tls_handshake_timeout_ = t;

View File

@ -47,6 +47,7 @@ public:
const std::string &address, const std::string &port, bool asynchronous);
void num_threads(size_t num_threads);
void backlog(int backlog);
void set_on_all_threads_created_callback(on_all_threads_created_cb cb);
void tls_handshake_timeout(const boost::posix_time::time_duration &t);
void read_timeout(const boost::posix_time::time_duration &t);
bool handle(std::string pattern, request_cb cb);
@ -63,6 +64,7 @@ private:
serve_mux mux_;
boost::posix_time::time_duration tls_handshake_timeout_;
boost::posix_time::time_duration read_timeout_;
on_all_threads_created_cb all_threads_created_cb_;
};
} // namespace server

View File

@ -31,6 +31,7 @@
#include <vector>
#include <functional>
#include <map>
#include <unistd.h>
#include <boost/system/error_code.hpp>
#include <boost/asio.hpp>
@ -128,6 +129,16 @@ enum nghttp2_asio_error {
NGHTTP2_ASIO_ERR_TLS_NO_APP_PROTO_NEGOTIATED = 1,
};
struct thread_info {
thread_info(pid_t id) : pid(id) {}
// Thread info is ready to be read.
bool ready;
pid_t pid;
};
typedef std::function<void(std::vector<std::shared_ptr<thread_info>>)>
on_all_threads_created_cb;
} // namespace asio_http2
} // namespace nghttp2

View File

@ -198,6 +198,10 @@ public:
// connections.
void backlog(int backlog);
// Sets the callback to be executed after all threads
// of |num_threads| is created.
void set_on_all_threads_created_callback(on_all_threads_created_cb cb);
// Sets TLS handshake timeout, which defaults to 60 seconds.
void tls_handshake_timeout(const boost::posix_time::time_duration &t);