diff --git a/src/shrpx_connect_blocker.cc b/src/shrpx_connect_blocker.cc index ae6a6c60..d70fe5ca 100644 --- a/src/shrpx_connect_blocker.cc +++ b/src/shrpx_connect_blocker.cc @@ -35,7 +35,7 @@ void connect_blocker_cb(struct ev_loop *loop, ev_timer *w, int revents) { } // namespace ConnectBlocker::ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop) - : gen_(gen), loop_(loop), fail_count_(0) { + : gen_(gen), loop_(loop), fail_count_(0), offline_(false) { ev_timer_init(&timer_, connect_blocker_cb, 0., 0.); } @@ -81,6 +81,12 @@ void ConnectBlocker::on_failure() { size_t ConnectBlocker::get_fail_count() const { return fail_count_; } void ConnectBlocker::offline() { + if (offline_) { + return; + } + + offline_ = true; + ev_timer_stop(loop_, &timer_); ev_timer_set(&timer_, std::numeric_limits::max(), 0.); ev_timer_start(loop_, &timer_); @@ -90,6 +96,10 @@ void ConnectBlocker::online() { ev_timer_stop(loop_, &timer_); fail_count_ = 0; + + offline_ = false; } +bool ConnectBlocker::in_offline() const { return offline_; } + } // namespace shrpx diff --git a/src/shrpx_connect_blocker.h b/src/shrpx_connect_blocker.h index 9d60697f..441ac277 100644 --- a/src/shrpx_connect_blocker.h +++ b/src/shrpx_connect_blocker.h @@ -57,6 +57,9 @@ public: // Peer is now considered online void online(); + // Returns true if peer is considered offline. + bool in_offline() const; + private: std::mt19937 gen_; ev_timer timer_; @@ -64,6 +67,8 @@ private: // The number of consecutive connection failure. Reset to 0 on // success. size_t fail_count_; + // true if peer is considered offline. + bool offline_; }; } // namespace diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index f33ac031..ba72a2c2 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -92,6 +92,9 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { auto http2session = static_cast(w->data); http2session->stop_settings_timer(); SSLOG(INFO, http2session) << "SETTINGS timeout"; + + downstream_failure(http2session->get_addr()); + if (http2session->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) { delete http2session; @@ -290,8 +293,6 @@ int Http2Session::initiate_connection() { } } - auto &connect_blocker = addr_->connect_blocker; - const auto &proxy = get_config()->downstream_http_proxy; if (!proxy.host.empty() && state_ == DISCONNECTED) { if (LOG_ENABLED(INFO)) { @@ -311,8 +312,6 @@ int Http2Session::initiate_connection() { return -1; } - worker_blocker->on_success(); - rv = connect(conn_.fd, &proxy.addr.su.sa, proxy.addr.len); if (rv != 0 && errno != EINPROGRESS) { auto error = errno; @@ -320,10 +319,13 @@ int Http2Session::initiate_connection() { << util::to_numeric_addr(&proxy.addr) << ", errno=" << error; - connect_blocker->on_failure(); + worker_blocker->on_failure(); + return -1; } + worker_blocker->on_success(); + ev_io_set(&conn_.rev, conn_.fd, EV_READ); ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); @@ -405,7 +407,7 @@ int Http2Session::initiate_connection() { << util::to_numeric_addr(&addr_->addr) << ", errno=" << error; - connect_blocker->on_failure(); + downstream_failure(addr_); return -1; } @@ -442,7 +444,7 @@ int Http2Session::initiate_connection() { << util::to_numeric_addr(&addr_->addr) << ", errno=" << error; - connect_blocker->on_failure(); + downstream_failure(addr_); return -1; } @@ -754,6 +756,10 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, } // namespace void Http2Session::start_settings_timer() { + if (ev_is_active(&settings_timer_)) { + return; + } + ev_timer_again(conn_.loop, &settings_timer_); } @@ -1152,12 +1158,20 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, } return 0; } - case NGHTTP2_SETTINGS: + case NGHTTP2_SETTINGS: { if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) { return 0; } + http2session->stop_settings_timer(); + + auto addr = http2session->get_addr(); + auto &connect_blocker = addr->connect_blocker; + + connect_blocker->on_success(); + return 0; + } case NGHTTP2_PING: if (frame->hd.flags & NGHTTP2_FLAG_ACK) { if (LOG_ENABLED(INFO)) { @@ -1526,6 +1540,8 @@ int Http2Session::connection_made() { auto must_terminate = shared_addr->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); + reset_connection_check_timer(CONNCHK_TIMEOUT); + if (must_terminate) { if (LOG_ENABLED(INFO)) { LOG(INFO) << "TLSv1.2 was not negotiated. HTTP/2 must not be negotiated."; @@ -1536,16 +1552,10 @@ int Http2Session::connection_made() { if (rv != 0) { return -1; } + } else { + submit_pending_requests(); } - if (must_terminate) { - return 0; - } - - reset_connection_check_timer(CONNCHK_TIMEOUT); - - submit_pending_requests(); - signal_write(); return 0; } @@ -1564,7 +1574,7 @@ int Http2Session::downstream_read(const uint8_t *data, size_t datalen) { rv = nghttp2_session_mem_recv(session_, data, datalen); if (rv < 0) { - SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: " + SSLOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: " << nghttp2_strerror(rv); return -1; } @@ -1763,8 +1773,6 @@ int Http2Session::read_noop(const uint8_t *data, size_t datalen) { return 0; } int Http2Session::write_noop() { return 0; } int Http2Session::connected() { - auto &connect_blocker = addr_->connect_blocker; - if (!util::check_socket_connected(conn_.fd)) { if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Backend connect failed; addr=" @@ -1776,8 +1784,6 @@ int Http2Session::connected() { return -1; } - connect_blocker->on_success(); - if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Connection established"; } diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 1fa49a3a..6ee1acab 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -146,6 +146,9 @@ int Http2Upstream::upgrade_upstream(HttpsUpstream *http) { } void Http2Upstream::start_settings_timer() { + if (ev_is_active(&settings_timer_)) { + return; + } ev_timer_start(handler_->get_loop(), &settings_timer_); } @@ -930,7 +933,7 @@ int Http2Upstream::on_read() { rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft()); if (rv < 0) { if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) { - ULOG(ERROR, this) << "nghttp2_session_recv() returned error: " + ULOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: " << nghttp2_strerror(rv); } return -1; diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 144b49f6..8afdce58 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -220,7 +220,8 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { << util::to_numeric_addr(&addr.addr) << ", errno=" << error; - connect_blocker->on_failure(); + downstream_failure(&addr); + close(conn_.fd); conn_.fd = -1; diff --git a/src/shrpx_live_check.cc b/src/shrpx_live_check.cc index 0c128ebf..fc2cc4a2 100644 --- a/src/shrpx_live_check.cc +++ b/src/shrpx_live_check.cc @@ -29,6 +29,10 @@ namespace shrpx { +namespace { +constexpr size_t MAX_BUFFER_SIZE = 4_k; +} // namespace + namespace { void readcb(struct ev_loop *loop, ev_io *w, int revents) { int rv; @@ -79,6 +83,18 @@ void backoff_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { } } // namespace +namespace { +void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto live_check = static_cast(w->data); + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "SETTINGS timeout"; + } + + live_check->on_failure(); +} +} // namespace + LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, DownstreamAddrGroup *group, DownstreamAddr *addr, std::mt19937 &gen) @@ -87,6 +103,7 @@ LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold, get_config()->tls.dyn_rec.idle_timeout, PROTO_NONE), + wb_(worker->get_mcpool()), gen_(gen), read_(&LiveCheck::noop), write_(&LiveCheck::noop), @@ -94,10 +111,18 @@ LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, ssl_ctx_(ssl_ctx), group_(group), addr_(addr), + session_(nullptr), success_count_(0), - fail_count_(0) { + fail_count_(0), + settings_ack_received_(false), + session_closing_(false) { ev_timer_init(&backoff_timer_, backoff_timeoutcb, 0., 0.); backoff_timer_.data = this; + + // SETTINGS ACK must be received in a short timeout. Otherwise, we + // assume that connection is broken. + ev_timer_init(&settings_timer_, settings_timeout_cb, 0., 0.); + settings_timer_.data = this; } LiveCheck::~LiveCheck() { @@ -110,9 +135,19 @@ void LiveCheck::disconnect() { conn_.rlimit.stopw(); conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &settings_timer_); + read_ = write_ = &LiveCheck::noop; conn_.disconnect(); + + nghttp2_session_del(session_); + session_ = nullptr; + + settings_ack_received_ = false; + session_closing_ = false; + + wb_.reset(); } // Use the similar backoff algorithm described in @@ -243,8 +278,21 @@ int LiveCheck::connected() { return do_write(); } + const auto &shared_addr = group_->shared_addr; + if (shared_addr->proto == PROTO_HTTP2) { + // For HTTP/2, we try to read SETTINGS ACK from server to make + // sure it is really alive, and serving HTTP/2. + read_ = &LiveCheck::read_clear; + write_ = &LiveCheck::write_clear; + + if (connection_made() != 0) { + return -1; + } + + return 0; + } + on_success(); - disconnect(); return 0; } @@ -305,7 +353,16 @@ int LiveCheck::tls_handshake() { return -1; case PROTO_HTTP2: if (util::check_h2_is_selected(proto)) { - break; + // For HTTP/2, we try to read SETTINGS ACK from server to make + // sure it is really alive, and serving HTTP/2. + read_ = &LiveCheck::read_tls; + write_ = &LiveCheck::write_tls; + + if (connection_made() != 0) { + return -1; + } + + return 0; } return -1; default: @@ -313,7 +370,209 @@ int LiveCheck::tls_handshake() { } on_success(); - disconnect(); + + return 0; +} + +int LiveCheck::read_tls() { + ev_timer_again(conn_.loop, &conn_.rt); + + std::array buf; + + ERR_clear_error(); + + for (;;) { + auto nread = conn_.read_tls(buf.data(), buf.size()); + + if (nread == 0) { + return 0; + } + + if (nread < 0) { + return nread; + } + + if (on_read(buf.data(), nread) != 0) { + return -1; + } + } +} + +int LiveCheck::write_tls() { + ev_timer_again(conn_.loop, &conn_.rt); + + ERR_clear_error(); + + struct iovec iov; + + for (;;) { + if (wb_.rleft() > 0) { + auto iovcnt = wb_.riovec(&iov, 1); + assert(iovcnt == 1); + auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len); + + if (nwrite == 0) { + return 0; + } + + if (nwrite < 0) { + return nwrite; + } + + wb_.drain(nwrite); + + continue; + } + + if (on_write() != 0) { + return -1; + } + + if (wb_.rleft() == 0) { + conn_.start_tls_write_idle(); + break; + } + } + + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); + + if (settings_ack_received_) { + on_success(); + } + + return 0; +} + +int LiveCheck::read_clear() { + ev_timer_again(conn_.loop, &conn_.rt); + + std::array buf; + + for (;;) { + auto nread = conn_.read_clear(buf.data(), buf.size()); + + if (nread == 0) { + return 0; + } + + if (nread < 0) { + return nread; + } + + if (on_read(buf.data(), nread) != 0) { + return -1; + } + } +} + +int LiveCheck::write_clear() { + ev_timer_again(conn_.loop, &conn_.rt); + + struct iovec iov; + + for (;;) { + if (wb_.rleft() > 0) { + auto iovcnt = wb_.riovec(&iov, 1); + assert(iovcnt == 1); + auto nwrite = conn_.write_clear(iov.iov_base, iov.iov_len); + + if (nwrite == 0) { + return 0; + } + + if (nwrite < 0) { + return nwrite; + } + + wb_.drain(nwrite); + + continue; + } + + if (on_write() != 0) { + return -1; + } + + if (wb_.rleft() == 0) { + break; + } + } + + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); + + if (settings_ack_received_) { + on_success(); + } + + return 0; +} + +int LiveCheck::on_read(const uint8_t *data, size_t len) { + ssize_t rv; + + rv = nghttp2_session_mem_recv(session_, data, len); + if (rv < 0) { + LOG(ERROR) << "nghttp2_session_mem_recv() returned error: " + << nghttp2_strerror(rv); + return -1; + } + + if (settings_ack_received_ && !session_closing_) { + session_closing_ = true; + rv = nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR); + if (rv != 0) { + return -1; + } + } + + if (nghttp2_session_want_read(session_) == 0 && + nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "No more read/write for this session"; + } + + return -1; + } + + signal_write(); + + return 0; +} + +int LiveCheck::on_write() { + for (;;) { + const uint8_t *data; + auto datalen = nghttp2_session_mem_send(session_, &data); + + if (datalen < 0) { + LOG(ERROR) << "nghttp2_session_mem_send() returned error: " + << nghttp2_strerror(datalen); + return -1; + } + if (datalen == 0) { + break; + } + wb_.append(data, datalen); + + if (wb_.rleft() >= MAX_BUFFER_SIZE) { + break; + } + } + + if (nghttp2_session_want_read(session_) == 0 && + nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "No more read/write for this session"; + } + + if (settings_ack_received_) { + return 0; + } + + return -1; + } return 0; } @@ -360,4 +619,102 @@ void LiveCheck::on_success() { int LiveCheck::noop() { return 0; } +void LiveCheck::start_settings_timer() { + if (ev_is_active(&settings_timer_)) { + return; + } + + ev_timer_set(&settings_timer_, 10., 0.); + ev_timer_start(conn_.loop, &settings_timer_); +} + +void LiveCheck::stop_settings_timer() { + ev_timer_stop(conn_.loop, &settings_timer_); +} + +void LiveCheck::settings_ack_received() { settings_ack_received_ = true; } + +namespace { +int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame, + void *user_data) { + auto live_check = static_cast(user_data); + + if (frame->hd.type != NGHTTP2_SETTINGS || + (frame->hd.flags & NGHTTP2_FLAG_ACK)) { + return 0; + } + + live_check->start_settings_timer(); + + return 0; +} +} // namespace + +namespace { +int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, + void *user_data) { + auto live_check = static_cast(user_data); + + if (frame->hd.type != NGHTTP2_SETTINGS || + (frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) { + return 0; + } + + live_check->stop_settings_timer(); + live_check->settings_ack_received(); + + return 0; +} +} // namespace + +int LiveCheck::connection_made() { + int rv; + + nghttp2_session_callbacks *callbacks; + rv = nghttp2_session_callbacks_new(&callbacks); + if (rv != 0) { + return -1; + } + + nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, + on_frame_send_callback); + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, + on_frame_recv_callback); + + rv = nghttp2_session_client_new(&session_, callbacks, this); + + nghttp2_session_callbacks_del(callbacks); + + if (rv != 0) { + return -1; + } + + rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, nullptr, 0); + if (rv != 0) { + return -1; + } + + auto &shared_addr = group_->shared_addr; + auto must_terminate = + shared_addr->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); + + if (must_terminate) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "TLSv1.2 was not negotiated. HTTP/2 must not be negotiated."; + } + + rv = nghttp2_session_terminate_session(session_, + NGHTTP2_INADEQUATE_SECURITY); + if (rv != 0) { + return -1; + } + } + + signal_write(); + + return 0; +} + +void LiveCheck::signal_write() { conn_.wlimit.startw(); } + } // namespace shrpx diff --git a/src/shrpx_live_check.h b/src/shrpx_live_check.h index c2afce0a..fa85141d 100644 --- a/src/shrpx_live_check.h +++ b/src/shrpx_live_check.h @@ -34,6 +34,8 @@ #include +#include + #include "shrpx_connection.h" namespace shrpx { @@ -64,14 +66,37 @@ public: int noop(); int connected(); int tls_handshake(); + int read_tls(); + int write_tls(); + int read_clear(); + int write_clear(); int do_read(); int do_write(); + // These functions are used to feed / extract data to + // nghttp2_session object. + int on_read(const uint8_t *data, size_t len); + int on_write(); + + // Call this function when HTTP/2 connection was established. We + // don't call this function for HTTP/1 at the moment. + int connection_made(); + + void start_settings_timer(); + void stop_settings_timer(); + + // Call this function when SETTINGS ACK was received from server. + void settings_ack_received(); + + void signal_write(); + private: Connection conn_; + DefaultMemchunks wb_; std::mt19937 &gen_; ev_timer backoff_timer_; + ev_timer settings_timer_; std::function read_, write_; Worker *worker_; // nullptr if no TLS is configured @@ -79,10 +104,15 @@ private: DownstreamAddrGroup *group_; // Address of remote endpoint DownstreamAddr *addr_; + nghttp2_session *session_; // The number of successful connect attempt in a row. size_t success_count_; // The number of unsuccessful connect attempt in a row. size_t fail_count_; + // true when SETTINGS ACK has been received from server. + bool settings_ack_received_; + // true when GOAWAY has been queued. + bool session_closing_; }; } // namespace shrpx diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 1a620788..d88f48ea 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -478,6 +478,10 @@ size_t match_downstream_addr_group( void downstream_failure(DownstreamAddr *addr) { const auto &connect_blocker = addr->connect_blocker; + if (connect_blocker->in_offline()) { + return; + } + connect_blocker->on_failure(); if (addr->fall == 0) {