From d7f98c48d114c21a837836eade59a0d61352e318 Mon Sep 17 00:00:00 2001 From: Jacky_Yin Date: Mon, 16 Aug 2021 19:50:03 +0800 Subject: [PATCH] feat: get thread_info before asio server start add set_on_all_threads_created_callback for asio server, this callback is called after all threads created and before threads enter blocking loop. --- src/asio_io_service_pool.cc | 35 +++++++++++++++++++++--- src/asio_io_service_pool.h | 11 ++++++-- src/asio_server.cc | 5 ++-- src/asio_server.h | 3 +- src/asio_server_http2.cc | 4 +++ src/asio_server_http2_impl.cc | 10 +++++-- src/asio_server_http2_impl.h | 2 ++ src/includes/nghttp2/asio_http2.h | 11 ++++++++ src/includes/nghttp2/asio_http2_server.h | 4 +++ 9 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/asio_io_service_pool.cc b/src/asio_io_service_pool.cc index 01483664..4de5f497 100644 --- a/src/asio_io_service_pool.cc +++ b/src/asio_io_service_pool.cc @@ -33,6 +33,9 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // + +#include + #include "asio_io_service_pool.h" namespace nghttp2 { @@ -52,15 +55,39 @@ io_service_pool::io_service_pool(std::size_t pool_size) : next_io_service_(0) { io_services_.push_back(io_service); work_.push_back(work); } + + // Pre-allocate vector space. + io_services_thread_info_.resize(pool_size); } -void io_service_pool::run(bool asynchronous) { +size_t io_service_pool::io_thread_run(size_t thread_index) { + auto io_service_thread_info = std::make_shared(gettid()); + io_services_thread_info_[thread_index] = io_service_thread_info; + return io_services_[thread_index]->run(); +} + +void io_service_pool::run(bool asynchronous, + on_all_threads_created_cb all_threads_created_cb) { // Create a pool of threads to run all of the io_services. for (std::size_t i = 0; i < io_services_.size(); ++i) { futures_.push_back(std::async(std::launch::async, - (size_t(boost::asio::io_service::*)(void)) & - boost::asio::io_service::run, - io_services_[i])); + (size_t(io_service_pool::*)(size_t)) & + io_service_pool::io_thread_run, + this, i)); + } + + if (all_threads_created_cb) { + // Wait until all thread created. + while (1) { + if (std::all_of(io_services_thread_info_.cbegin(), + io_services_thread_info_.cend(), + [](auto i) { return i && i != nullptr; })) { + break; + } + usleep(10); + } + // Execute callback before blocking. + all_threads_created_cb(io_services_thread_info_); } if (!asynchronous) { diff --git a/src/asio_io_service_pool.h b/src/asio_io_service_pool.h index 9c115338..616d56de 100644 --- a/src/asio_io_service_pool.h +++ b/src/asio_io_service_pool.h @@ -59,7 +59,8 @@ public: explicit io_service_pool(std::size_t pool_size); /// Run all io_service objects in the pool. - void run(bool asynchronous = false); + void run(bool asynchronous = false, + on_all_threads_created_cb all_threads_created_cb = nullptr); /// Stop all io_service objects in the pool. void force_stop(); @@ -87,8 +88,14 @@ private: /// The next io_service to use for a connection. std::size_t next_io_service_; - /// Futures to all the io_service objects + /// Futures to all the io_service objects. std::vector> futures_; + + /// Execute this function when new thread is created. + size_t io_thread_run(size_t index); + + /// Hold the thread info for all io_services. + std::vector> io_services_thread_info_; }; } // namespace asio_http2 diff --git a/src/asio_server.cc b/src/asio_server.cc index 74c92276..996a9986 100644 --- a/src/asio_server.cc +++ b/src/asio_server.cc @@ -55,7 +55,8 @@ boost::system::error_code server::listen_and_serve(boost::system::error_code &ec, boost::asio::ssl::context *tls_context, const std::string &address, const std::string &port, - int backlog, serve_mux &mux, bool asynchronous) { + int backlog, serve_mux &mux, bool asynchronous, + on_all_threads_created_cb all_threads_created_cb) { ec.clear(); if (bind_and_listen(ec, address, port, backlog)) { @@ -70,7 +71,7 @@ server::listen_and_serve(boost::system::error_code &ec, } } - io_service_pool_.run(asynchronous); + io_service_pool_.run(asynchronous, all_threads_created_cb); return ec; } diff --git a/src/asio_server.h b/src/asio_server.h index 1190e322..ea06be58 100644 --- a/src/asio_server.h +++ b/src/asio_server.h @@ -71,7 +71,8 @@ public: listen_and_serve(boost::system::error_code &ec, boost::asio::ssl::context *tls_context, const std::string &address, const std::string &port, - int backlog, serve_mux &mux, bool asynchronous = false); + int backlog, serve_mux &mux, bool asynchronous = false, + on_all_threads_created_cb all_threads_created_cb = nullptr); void join(); void stop(); diff --git a/src/asio_server_http2.cc b/src/asio_server_http2.cc index 02d3d197..459570a3 100644 --- a/src/asio_server_http2.cc +++ b/src/asio_server_http2.cc @@ -69,6 +69,10 @@ void http2::num_threads(size_t num_threads) { impl_->num_threads(num_threads); } void http2::backlog(int backlog) { impl_->backlog(backlog); } +void http2::set_on_all_threads_created_callback(on_all_threads_created_cb cb) { + impl_->set_on_all_threads_created_callback(cb); +} + void http2::tls_handshake_timeout(const boost::posix_time::time_duration &t) { impl_->tls_handshake_timeout(t); } diff --git a/src/asio_server_http2_impl.cc b/src/asio_server_http2_impl.cc index 00afdd65..2e1a6ad1 100644 --- a/src/asio_server_http2_impl.cc +++ b/src/asio_server_http2_impl.cc @@ -41,7 +41,8 @@ http2_impl::http2_impl() : num_threads_(1), backlog_(-1), tls_handshake_timeout_(boost::posix_time::seconds(60)), - read_timeout_(boost::posix_time::seconds(60)) {} + read_timeout_(boost::posix_time::seconds(60)), + all_threads_created_cb_(nullptr) {} boost::system::error_code http2_impl::listen_and_serve( boost::system::error_code &ec, boost::asio::ssl::context *tls_context, @@ -49,13 +50,18 @@ boost::system::error_code http2_impl::listen_and_serve( server_.reset( new server(num_threads_, tls_handshake_timeout_, read_timeout_)); return server_->listen_and_serve(ec, tls_context, address, port, backlog_, - mux_, asynchronous); + mux_, asynchronous, all_threads_created_cb_); } void http2_impl::num_threads(size_t num_threads) { num_threads_ = num_threads; } void http2_impl::backlog(int backlog) { backlog_ = backlog; } +void http2_impl::set_on_all_threads_created_callback( + on_all_threads_created_cb cb) { + all_threads_created_cb_ = cb; +} + void http2_impl::tls_handshake_timeout( const boost::posix_time::time_duration &t) { tls_handshake_timeout_ = t; diff --git a/src/asio_server_http2_impl.h b/src/asio_server_http2_impl.h index 93a6d2cc..b5a2c9d4 100644 --- a/src/asio_server_http2_impl.h +++ b/src/asio_server_http2_impl.h @@ -47,6 +47,7 @@ public: const std::string &address, const std::string &port, bool asynchronous); void num_threads(size_t num_threads); void backlog(int backlog); + void set_on_all_threads_created_callback(on_all_threads_created_cb cb); void tls_handshake_timeout(const boost::posix_time::time_duration &t); void read_timeout(const boost::posix_time::time_duration &t); bool handle(std::string pattern, request_cb cb); @@ -63,6 +64,7 @@ private: serve_mux mux_; boost::posix_time::time_duration tls_handshake_timeout_; boost::posix_time::time_duration read_timeout_; + on_all_threads_created_cb all_threads_created_cb_; }; } // namespace server diff --git a/src/includes/nghttp2/asio_http2.h b/src/includes/nghttp2/asio_http2.h index 57e55e1f..d17d8fd6 100644 --- a/src/includes/nghttp2/asio_http2.h +++ b/src/includes/nghttp2/asio_http2.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -128,6 +129,16 @@ enum nghttp2_asio_error { NGHTTP2_ASIO_ERR_TLS_NO_APP_PROTO_NEGOTIATED = 1, }; +struct thread_info { + thread_info(pid_t id) : pid(id) {} + // Thread info is ready to be read. + bool ready; + pid_t pid; +}; + +typedef std::function>)> + on_all_threads_created_cb; + } // namespace asio_http2 } // namespace nghttp2 diff --git a/src/includes/nghttp2/asio_http2_server.h b/src/includes/nghttp2/asio_http2_server.h index 651af690..9341b5b2 100644 --- a/src/includes/nghttp2/asio_http2_server.h +++ b/src/includes/nghttp2/asio_http2_server.h @@ -198,6 +198,10 @@ public: // connections. void backlog(int backlog); + // Sets the callback to be executed after all threads + // of |num_threads| is created. + void set_on_all_threads_created_callback(on_all_threads_created_cb cb); + // Sets TLS handshake timeout, which defaults to 60 seconds. void tls_handshake_timeout(const boost::posix_time::time_duration &t);