/* * nghttp2 - HTTP/2 C Library * * Copyright (c) 2012 Tatsuhiro Tsujikawa * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "shrpx_connection_handler.h" #ifdef HAVE_UNISTD_H #include #endif // HAVE_UNISTD_H #include #include #include #include #include #include "shrpx_client_handler.h" #include "shrpx_ssl.h" #include "shrpx_worker.h" #include "shrpx_config.h" #include "shrpx_http2_session.h" #include "shrpx_connect_blocker.h" #include "shrpx_downstream_connection.h" #include "shrpx_accept_handler.h" #include "shrpx_memcached_dispatcher.h" #include "shrpx_signal.h" #include "shrpx_log.h" #include "util.h" #include "template.h" using namespace nghttp2; namespace shrpx { namespace { void acceptor_disable_cb(struct ev_loop *loop, ev_timer *w, int revent) { auto h = static_cast(w->data); // If we are in graceful shutdown period, we must not enable // acceptors again. if (h->get_graceful_shutdown()) { return; } h->enable_acceptor(); } } // namespace namespace { void ocsp_cb(struct ev_loop *loop, ev_timer *w, int revent) { auto h = static_cast(w->data); // If we are in graceful shutdown period, we won't do ocsp query. if (h->get_graceful_shutdown()) { return; } LOG(NOTICE) << "Start ocsp update"; h->proceed_next_cert_ocsp(); } } // namespace namespace { void ocsp_read_cb(struct ev_loop *loop, ev_io *w, int revent) { auto h = static_cast(w->data); h->read_ocsp_chunk(); } } // namespace namespace { void ocsp_chld_cb(struct ev_loop *loop, ev_child *w, int revent) { auto h = static_cast(w->data); h->handle_ocsp_complete(); } } // namespace namespace { void thread_join_async_cb(struct ev_loop *loop, ev_async *w, int revent) { ev_break(loop); } } // namespace namespace { void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) { auto h = static_cast(w->data); h->handle_serial_event(); } } // namespace ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen) : gen_(gen), single_worker_(nullptr), loop_(loop), tls_ticket_key_memcached_get_retry_count_(0), tls_ticket_key_memcached_fail_count_(0), worker_round_robin_cnt_(get_config()->api.enabled ? 1 : 0), graceful_shutdown_(false) { ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.); disable_acceptor_timer_.data = this; ev_timer_init(&ocsp_timer_, ocsp_cb, 0., 0.); ocsp_timer_.data = this; ev_io_init(&ocsp_.rev, ocsp_read_cb, -1, EV_READ); ocsp_.rev.data = this; ev_async_init(&thread_join_asyncev_, thread_join_async_cb); ev_async_init(&serial_event_asyncev_, serial_event_async_cb); serial_event_asyncev_.data = this; ev_async_start(loop_, &serial_event_asyncev_); ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0); ocsp_.chldev.data = this; ocsp_.next = 0; ocsp_.proc.rfd = -1; reset_ocsp(); } ConnectionHandler::~ConnectionHandler() { ev_child_stop(loop_, &ocsp_.chldev); ev_async_stop(loop_, &serial_event_asyncev_); ev_async_stop(loop_, &thread_join_asyncev_); ev_io_stop(loop_, &ocsp_.rev); ev_timer_stop(loop_, &ocsp_timer_); ev_timer_stop(loop_, &disable_acceptor_timer_); for (auto ssl_ctx : all_ssl_ctx_) { auto tls_ctx_data = static_cast(SSL_CTX_get_app_data(ssl_ctx)); if (tls_ctx_data) { delete tls_ctx_data; } SSL_CTX_free(ssl_ctx); } // Free workers before destroying ev_loop workers_.clear(); for (auto loop : worker_loops_) { ev_loop_destroy(loop); } } void ConnectionHandler::set_ticket_keys_to_worker( const std::shared_ptr &ticket_keys) { for (auto &worker : workers_) { worker->set_ticket_keys(ticket_keys); } } void ConnectionHandler::worker_reopen_log_files() { WorkerEvent wev{}; wev.type = REOPEN_LOG; for (auto &worker : workers_) { worker->send(wev); } } void ConnectionHandler::worker_replace_downstream( std::shared_ptr downstreamconf) { WorkerEvent wev{}; wev.type = REPLACE_DOWNSTREAM; wev.downstreamconf = std::move(downstreamconf); for (auto &worker : workers_) { worker->send(wev); } } int ConnectionHandler::create_single_worker() { cert_tree_ = ssl::create_cert_lookup_tree(); auto sv_ssl_ctx = ssl::setup_server_ssl_context( all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get() #ifdef HAVE_NEVERBLEED , nb_.get() #endif // HAVE_NEVERBLEED ); auto cl_ssl_ctx = ssl::setup_downstream_client_ssl_context( #ifdef HAVE_NEVERBLEED nb_.get() #endif // HAVE_NEVERBLEED ); if (cl_ssl_ctx) { all_ssl_ctx_.push_back(cl_ssl_ctx); } auto config = get_config(); auto &tlsconf = config->tls; auto &memcachedconf = config->tls.session_cache.memcached; SSL_CTX *session_cache_ssl_ctx = nullptr; if (memcachedconf.tls) { session_cache_ssl_ctx = ssl::create_ssl_client_context( #ifdef HAVE_NEVERBLEED nb_.get(), #endif // HAVE_NEVERBLEED tlsconf.cacert, memcachedconf.cert_file, memcachedconf.private_key_file, nullptr); all_ssl_ctx_.push_back(session_cache_ssl_ctx); } single_worker_ = make_unique( loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), ticket_keys_, this, config->conn.downstream); #ifdef HAVE_MRUBY if (single_worker_->create_mruby_context() != 0) { return -1; } #endif // HAVE_MRUBY return 0; } int ConnectionHandler::create_worker_thread(size_t num) { #ifndef NOTHREADS assert(workers_.size() == 0); cert_tree_ = ssl::create_cert_lookup_tree(); auto sv_ssl_ctx = ssl::setup_server_ssl_context( all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get() #ifdef HAVE_NEVERBLEED , nb_.get() #endif // HAVE_NEVERBLEED ); auto cl_ssl_ctx = ssl::setup_downstream_client_ssl_context( #ifdef HAVE_NEVERBLEED nb_.get() #endif // HAVE_NEVERBLEED ); if (cl_ssl_ctx) { all_ssl_ctx_.push_back(cl_ssl_ctx); } auto config = get_config(); auto &tlsconf = config->tls; auto &memcachedconf = config->tls.session_cache.memcached; auto &apiconf = config->api; // We have dedicated worker for API request processing. if (apiconf.enabled) { ++num; } for (size_t i = 0; i < num; ++i) { auto loop = ev_loop_new(config->ev_loop_flags); SSL_CTX *session_cache_ssl_ctx = nullptr; if (memcachedconf.tls) { session_cache_ssl_ctx = ssl::create_ssl_client_context( #ifdef HAVE_NEVERBLEED nb_.get(), #endif // HAVE_NEVERBLEED tlsconf.cacert, memcachedconf.cert_file, memcachedconf.private_key_file, nullptr); all_ssl_ctx_.push_back(session_cache_ssl_ctx); } auto worker = make_unique( loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), ticket_keys_, this, config->conn.downstream); #ifdef HAVE_MRUBY if (worker->create_mruby_context() != 0) { return -1; } #endif // HAVE_MRUBY workers_.push_back(std::move(worker)); worker_loops_.push_back(loop); LLOG(NOTICE, this) << "Created worker thread #" << workers_.size() - 1; } for (auto &worker : workers_) { worker->run_async(); } #endif // NOTHREADS return 0; } void ConnectionHandler::join_worker() { #ifndef NOTHREADS int n = 0; if (LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Waiting for worker thread to join: n=" << workers_.size(); } for (auto &worker : workers_) { worker->wait(); if (LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Thread #" << n << " joined"; } ++n; } #endif // NOTHREADS } void ConnectionHandler::graceful_shutdown_worker() { if (single_worker_) { return; } WorkerEvent wev{}; wev.type = GRACEFUL_SHUTDOWN; if (LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Sending graceful shutdown signal to worker"; } for (auto &worker : workers_) { worker->send(wev); } #ifndef NOTHREADS ev_async_start(loop_, &thread_join_asyncev_); thread_join_fut_ = std::async(std::launch::async, [this]() { (void)reopen_log_files(get_config()->logging); join_worker(); ev_async_send(get_loop(), &thread_join_asyncev_); delete_log_config(); }); #endif // NOTHREADS } int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen, const UpstreamAddr *faddr) { if (LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Accepted connection from " << util::numeric_name(addr, addrlen) << ", fd=" << fd; } auto config = get_config(); if (single_worker_) { auto &upstreamconf = config->conn.upstream; if (single_worker_->get_worker_stat()->num_connections >= upstreamconf.worker_connections) { if (LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Too many connections >=" << upstreamconf.worker_connections; } close(fd); return -1; } auto client = ssl::accept_connection(single_worker_.get(), fd, addr, addrlen, faddr); if (!client) { LLOG(ERROR, this) << "ClientHandler creation failed"; close(fd); return -1; } return 0; } Worker *worker; if (faddr->alt_mode == ALTMODE_API) { worker = workers_[0].get(); if (LOG_ENABLED(INFO)) { LOG(INFO) << "Dispatch connection to API worker #0"; } } else { worker = workers_[worker_round_robin_cnt_].get(); if (LOG_ENABLED(INFO)) { LOG(INFO) << "Dispatch connection to worker #" << worker_round_robin_cnt_; } if (++worker_round_robin_cnt_ == workers_.size()) { auto &apiconf = config->api; if (apiconf.enabled) { worker_round_robin_cnt_ = 1; } else { worker_round_robin_cnt_ = 0; } } } WorkerEvent wev{}; wev.type = NEW_CONNECTION; wev.client_fd = fd; memcpy(&wev.client_addr, addr, addrlen); wev.client_addrlen = addrlen; wev.faddr = faddr; worker->send(wev); return 0; } struct ev_loop *ConnectionHandler::get_loop() const { return loop_; } Worker *ConnectionHandler::get_single_worker() const { return single_worker_.get(); } void ConnectionHandler::add_acceptor(std::unique_ptr h) { acceptors_.push_back(std::move(h)); } void ConnectionHandler::enable_acceptor() { for (auto &a : acceptors_) { a->enable(); } } void ConnectionHandler::disable_acceptor() { for (auto &a : acceptors_) { a->disable(); } } void ConnectionHandler::sleep_acceptor(ev_tstamp t) { if (t == 0. || ev_is_active(&disable_acceptor_timer_)) { return; } disable_acceptor(); ev_timer_set(&disable_acceptor_timer_, t, 0.); ev_timer_start(loop_, &disable_acceptor_timer_); } void ConnectionHandler::accept_pending_connection() { for (auto &a : acceptors_) { a->accept_connection(); } } void ConnectionHandler::set_ticket_keys( std::shared_ptr ticket_keys) { ticket_keys_ = std::move(ticket_keys); if (single_worker_) { single_worker_->set_ticket_keys(ticket_keys_); } } const std::shared_ptr &ConnectionHandler::get_ticket_keys() const { return ticket_keys_; } void ConnectionHandler::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; if (single_worker_) { single_worker_->set_graceful_shutdown(f); } } bool ConnectionHandler::get_graceful_shutdown() const { return graceful_shutdown_; } void ConnectionHandler::cancel_ocsp_update() { if (ocsp_.proc.pid == 0) { return; } int rv; rv = kill(ocsp_.proc.pid, SIGTERM); if (rv != 0) { auto error = errno; LOG(ERROR) << "Could not send signal to OCSP query process: errno=" << error; } while ((rv = waitpid(ocsp_.proc.pid, nullptr, 0)) == -1 && errno == EINTR) ; if (rv == -1) { auto error = errno; LOG(ERROR) << "Error occurred while we were waiting for the completion of " "OCSP query process: errno=" << error; } } // inspired by h2o_read_command function from h2o project: // https://github.com/h2o/h2o int ConnectionHandler::start_ocsp_update(const char *cert_file) { int rv; if (LOG_ENABLED(INFO)) { LOG(INFO) << "Start ocsp update for " << cert_file; } assert(!ev_is_active(&ocsp_.rev)); assert(!ev_is_active(&ocsp_.chldev)); char *const argv[] = { const_cast( get_config()->tls.ocsp.fetch_ocsp_response_file.c_str()), const_cast(cert_file), nullptr}; Process proc; rv = exec_read_command(proc, argv); if (rv != 0) { return -1; } ocsp_.proc = proc; ev_io_set(&ocsp_.rev, ocsp_.proc.rfd, EV_READ); ev_io_start(loop_, &ocsp_.rev); ev_child_set(&ocsp_.chldev, ocsp_.proc.pid, 0); ev_child_start(loop_, &ocsp_.chldev); return 0; } void ConnectionHandler::read_ocsp_chunk() { std::array buf; for (;;) { ssize_t n; while ((n = read(ocsp_.proc.rfd, buf.data(), buf.size())) == -1 && errno == EINTR) ; if (n == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return; } auto error = errno; LOG(WARN) << "Reading from ocsp query command failed: errno=" << error; ocsp_.error = error; break; } if (n == 0) { break; } std::copy_n(std::begin(buf), n, std::back_inserter(ocsp_.resp)); } ev_io_stop(loop_, &ocsp_.rev); } void ConnectionHandler::handle_ocsp_complete() { ev_io_stop(loop_, &ocsp_.rev); ev_child_stop(loop_, &ocsp_.chldev); assert(ocsp_.next < all_ssl_ctx_.size()); auto ssl_ctx = all_ssl_ctx_[ocsp_.next]; auto tls_ctx_data = static_cast(SSL_CTX_get_app_data(ssl_ctx)); auto rstatus = ocsp_.chldev.rstatus; auto status = WEXITSTATUS(rstatus); if (ocsp_.error || !WIFEXITED(rstatus) || status != 0) { LOG(WARN) << "ocsp query command for " << tls_ctx_data->cert_file << " failed: error=" << ocsp_.error << ", rstatus=" << rstatus << ", status=" << status; ++ocsp_.next; proceed_next_cert_ocsp(); return; } if (LOG_ENABLED(INFO)) { LOG(INFO) << "ocsp update for " << tls_ctx_data->cert_file << " finished successfully"; } #ifndef OPENSSL_IS_BORINGSSL { #ifdef HAVE_ATOMIC_STD_SHARED_PTR std::atomic_store_explicit( &tls_ctx_data->ocsp_data, std::make_shared>(std::move(ocsp_.resp)), std::memory_order_release); #else // !HAVE_ATOMIC_STD_SHARED_PTR std::lock_guard g(tls_ctx_data->mu); tls_ctx_data->ocsp_data = std::make_shared>(std::move(ocsp_.resp)); #endif // !HAVE_ATOMIC_STD_SHARED_PTR } #else // OPENSSL_IS_BORINGSSL SSL_CTX_set_ocsp_response(ssl_ctx, ocsp_.resp.data(), ocsp_.resp.size()); #endif // OPENSSL_IS_BORINGSSL ++ocsp_.next; proceed_next_cert_ocsp(); } void ConnectionHandler::reset_ocsp() { if (ocsp_.proc.rfd != -1) { close(ocsp_.proc.rfd); } ocsp_.proc.rfd = -1; ocsp_.proc.pid = 0; ocsp_.error = 0; ocsp_.resp = std::vector(); } void ConnectionHandler::proceed_next_cert_ocsp() { for (;;) { reset_ocsp(); if (ocsp_.next == all_ssl_ctx_.size()) { ocsp_.next = 0; // We have updated all ocsp response, and schedule next update. ev_timer_set(&ocsp_timer_, get_config()->tls.ocsp.update_interval, 0.); ev_timer_start(loop_, &ocsp_timer_); return; } auto ssl_ctx = all_ssl_ctx_[ocsp_.next]; auto tls_ctx_data = static_cast(SSL_CTX_get_app_data(ssl_ctx)); // client SSL_CTX is also included in all_ssl_ctx_, but has no // tls_ctx_data. if (!tls_ctx_data) { ++ocsp_.next; continue; } auto cert_file = tls_ctx_data->cert_file; if (start_ocsp_update(cert_file) != 0) { ++ocsp_.next; continue; } break; } } void ConnectionHandler::set_tls_ticket_key_memcached_dispatcher( std::unique_ptr dispatcher) { tls_ticket_key_memcached_dispatcher_ = std::move(dispatcher); } MemcachedDispatcher * ConnectionHandler::get_tls_ticket_key_memcached_dispatcher() const { return tls_ticket_key_memcached_dispatcher_.get(); } // Use the similar backoff algorithm described in // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md namespace { constexpr size_t MAX_BACKOFF_EXP = 10; constexpr auto MULTIPLIER = 3.2; constexpr auto JITTER = 0.2; } // namespace void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) { if (++tls_ticket_key_memcached_get_retry_count_ >= get_config()->tls.ticket.memcached.max_retry) { LOG(WARN) << "Memcached: tls ticket get retry all failed " << tls_ticket_key_memcached_get_retry_count_ << " times."; on_tls_ticket_key_not_found(w); return; } auto base_backoff = util::int_pow( MULTIPLIER, std::min(MAX_BACKOFF_EXP, tls_ticket_key_memcached_get_retry_count_)); auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff, JITTER * base_backoff); auto backoff = base_backoff + dist(gen_); LOG(WARN) << "Memcached: tls ticket get failed due to network error, retrying in " << backoff << " seconds"; ev_timer_set(w, backoff, 0.); ev_timer_start(loop_, w); } void ConnectionHandler::on_tls_ticket_key_not_found(ev_timer *w) { tls_ticket_key_memcached_get_retry_count_ = 0; if (++tls_ticket_key_memcached_fail_count_ >= get_config()->tls.ticket.memcached.max_fail) { LOG(WARN) << "Memcached: could not get tls ticket; disable tls ticket"; tls_ticket_key_memcached_fail_count_ = 0; set_ticket_keys(nullptr); set_ticket_keys_to_worker(nullptr); } LOG(WARN) << "Memcached: tls ticket get failed, schedule next"; schedule_next_tls_ticket_key_memcached_get(w); } void ConnectionHandler::on_tls_ticket_key_get_success( const std::shared_ptr &ticket_keys, ev_timer *w) { LOG(NOTICE) << "Memcached: tls ticket get success"; tls_ticket_key_memcached_get_retry_count_ = 0; tls_ticket_key_memcached_fail_count_ = 0; schedule_next_tls_ticket_key_memcached_get(w); if (!ticket_keys || ticket_keys->keys.empty()) { LOG(WARN) << "Memcached: tls ticket keys are empty; tls ticket disabled"; set_ticket_keys(nullptr); set_ticket_keys_to_worker(nullptr); return; } if (LOG_ENABLED(INFO)) { LOG(INFO) << "ticket keys get done"; LOG(INFO) << 0 << " enc+dec: " << util::format_hex(ticket_keys->keys[0].data.name); for (size_t i = 1; i < ticket_keys->keys.size(); ++i) { auto &key = ticket_keys->keys[i]; LOG(INFO) << i << " dec: " << util::format_hex(key.data.name); } } set_ticket_keys(ticket_keys); set_ticket_keys_to_worker(ticket_keys); } void ConnectionHandler::schedule_next_tls_ticket_key_memcached_get( ev_timer *w) { ev_timer_set(w, get_config()->tls.ticket.memcached.interval, 0.); ev_timer_start(loop_, w); } SSL_CTX *ConnectionHandler::create_tls_ticket_key_memcached_ssl_ctx() { auto config = get_config(); auto &tlsconf = config->tls; auto &memcachedconf = config->tls.ticket.memcached; auto ssl_ctx = ssl::create_ssl_client_context( #ifdef HAVE_NEVERBLEED nb_.get(), #endif // HAVE_NEVERBLEED tlsconf.cacert, memcachedconf.cert_file, memcachedconf.private_key_file, nullptr); all_ssl_ctx_.push_back(ssl_ctx); return ssl_ctx; } #ifdef HAVE_NEVERBLEED void ConnectionHandler::set_neverbleed(std::unique_ptr nb) { nb_ = std::move(nb); } neverbleed_t *ConnectionHandler::get_neverbleed() const { return nb_.get(); } #endif // HAVE_NEVERBLEED void ConnectionHandler::handle_serial_event() { std::vector q; { std::lock_guard g(serial_event_mu_); q.swap(serial_events_); } for (auto &sev : q) { switch (sev.type) { case SEV_REPLACE_DOWNSTREAM: // Mmake sure that none of worker uses // get_config()->conn.downstream mod_config()->conn.downstream = sev.downstreamconf; if (single_worker_) { single_worker_->replace_downstream_config(sev.downstreamconf); break; } worker_replace_downstream(sev.downstreamconf); break; } } } void ConnectionHandler::send_replace_downstream( const std::shared_ptr &downstreamconf) { send_serial_event(SerialEvent(SEV_REPLACE_DOWNSTREAM, downstreamconf)); } void ConnectionHandler::send_serial_event(SerialEvent ev) { { std::lock_guard g(serial_event_mu_); serial_events_.push_back(std::move(ev)); } ev_async_send(loop_, &serial_event_asyncev_); } SSL_CTX *ConnectionHandler::get_ssl_ctx(size_t idx) const { return all_ssl_ctx_[idx]; } const std::vector & ConnectionHandler::get_indexed_ssl_ctx(size_t idx) const { return indexed_ssl_ctx_[idx]; } } // namespace shrpx