/* * nghttp2 - HTTP/2 C Library * * Copyright (c) 2012 Tatsuhiro Tsujikawa * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "shrpx_client_handler.h" #ifdef HAVE_UNISTD_H # include #endif // HAVE_UNISTD_H #ifdef HAVE_SYS_SOCKET_H # include #endif // HAVE_SYS_SOCKET_H #ifdef HAVE_NETDB_H # include #endif // HAVE_NETDB_H #include #include "shrpx_upstream.h" #include "shrpx_http2_upstream.h" #include "shrpx_https_upstream.h" #include "shrpx_config.h" #include "shrpx_http_downstream_connection.h" #include "shrpx_http2_downstream_connection.h" #include "shrpx_tls.h" #include "shrpx_worker.h" #include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream.h" #include "shrpx_http2_session.h" #include "shrpx_connect_blocker.h" #include "shrpx_api_downstream_connection.h" #include "shrpx_health_monitor_downstream_connection.h" #include "shrpx_null_downstream_connection.h" #ifdef ENABLE_HTTP3 # include "shrpx_http3_upstream.h" #endif // ENABLE_HTTP3 #include "shrpx_log.h" #include "util.h" #include "template.h" #include "tls.h" using namespace nghttp2; namespace shrpx { namespace { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { auto conn = static_cast(w->data); auto handler = static_cast(conn->data); if (LOG_ENABLED(INFO)) { CLOG(INFO, handler) << "Time out"; } delete handler; } } // namespace namespace { void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) { auto handler = static_cast(w->data); if (LOG_ENABLED(INFO)) { CLOG(INFO, handler) << "Close connection due to TLS renegotiation"; } delete handler; } } // namespace namespace { void readcb(struct ev_loop *loop, ev_io *w, int revents) { auto conn = static_cast(w->data); auto handler = static_cast(conn->data); if (handler->do_read() != 0) { delete handler; return; } } } // namespace namespace { void writecb(struct ev_loop *loop, ev_io *w, int revents) { auto conn = static_cast(w->data); auto handler = static_cast(conn->data); if (handler->do_write() != 0) { delete handler; return; } } } // namespace int ClientHandler::noop() { return 0; } int ClientHandler::read_clear() { auto should_break = false; rb_.ensure_chunk(); for (;;) { if (rb_.rleft() && on_read() != 0) { return -1; } if (rb_.rleft() == 0) { rb_.reset(); } else if (rb_.wleft() == 0) { conn_.rlimit.stopw(); return 0; } if (!ev_is_active(&conn_.rev) || should_break) { return 0; } auto nread = conn_.read_clear(rb_.last(), rb_.wleft()); if (nread == 0) { if (rb_.rleft() == 0) { rb_.release_chunk(); } return 0; } if (nread < 0) { return -1; } rb_.write(nread); should_break = true; } } int ClientHandler::write_clear() { std::array iov; for (;;) { if (on_write() != 0) { return -1; } auto iovcnt = upstream_->response_riovec(iov.data(), iov.size()); if (iovcnt == 0) { break; } auto nwrite = conn_.writev_clear(iov.data(), iovcnt); if (nwrite < 0) { return -1; } if (nwrite == 0) { return 0; } upstream_->response_drain(nwrite); } conn_.wlimit.stopw(); ev_timer_stop(conn_.loop, &conn_.wt); return 0; } int ClientHandler::proxy_protocol_peek_clear() { rb_.ensure_chunk(); assert(rb_.rleft() == 0); auto nread = conn_.peek_clear(rb_.last(), rb_.wleft()); if (nread < 0) { return -1; } if (nread == 0) { return 0; } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol: Peek " << nread << " bytes from socket"; } rb_.write(nread); if (on_read() != 0) { return -1; } rb_.reset(); return 0; } int ClientHandler::tls_handshake() { ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); auto rv = conn_.tls_handshake(); if (rv == SHRPX_ERR_INPROGRESS) { return 0; } if (rv < 0) { return -1; } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "SSL/TLS handshake completed"; } if (validate_next_proto() != 0) { return -1; } read_ = &ClientHandler::read_tls; write_ = &ClientHandler::write_tls; return 0; } int ClientHandler::read_tls() { auto should_break = false; ERR_clear_error(); rb_.ensure_chunk(); for (;;) { // we should process buffered data first before we read EOF. if (rb_.rleft() && on_read() != 0) { return -1; } if (rb_.rleft() == 0) { rb_.reset(); } else if (rb_.wleft() == 0) { conn_.rlimit.stopw(); return 0; } if (!ev_is_active(&conn_.rev) || should_break) { return 0; } auto nread = conn_.read_tls(rb_.last(), rb_.wleft()); if (nread == 0) { if (rb_.rleft() == 0) { rb_.release_chunk(); } return 0; } if (nread < 0) { return -1; } rb_.write(nread); should_break = true; } } int ClientHandler::write_tls() { struct iovec iov; ERR_clear_error(); if (on_write() != 0) { return -1; } auto iovcnt = upstream_->response_riovec(&iov, 1); if (iovcnt == 0) { conn_.start_tls_write_idle(); conn_.wlimit.stopw(); ev_timer_stop(conn_.loop, &conn_.wt); return 0; } for (;;) { auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len); if (nwrite < 0) { return -1; } if (nwrite == 0) { return 0; } upstream_->response_drain(nwrite); iovcnt = upstream_->response_riovec(&iov, 1); if (iovcnt == 0) { return 0; } } } #ifdef ENABLE_HTTP3 int ClientHandler::read_quic(const UpstreamAddr *faddr, const Address &remote_addr, const Address &local_addr, const ngtcp2_pkt_info &pi, const uint8_t *data, size_t datalen) { auto upstream = static_cast(upstream_.get()); return upstream->on_read(faddr, remote_addr, local_addr, pi, data, datalen); } int ClientHandler::write_quic() { return upstream_->on_write(); } #endif // ENABLE_HTTP3 int ClientHandler::upstream_noop() { return 0; } int ClientHandler::upstream_read() { assert(upstream_); if (upstream_->on_read() != 0) { return -1; } return 0; } int ClientHandler::upstream_write() { assert(upstream_); if (upstream_->on_write() != 0) { return -1; } if (get_should_close_after_write() && upstream_->response_empty()) { return -1; } return 0; } int ClientHandler::upstream_http2_connhd_read() { auto nread = std::min(left_connhd_len_, rb_.rleft()); if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_], rb_.pos(), nread) != 0) { // There is no downgrade path here. Just drop the connection. if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "invalid client connection header"; } return -1; } left_connhd_len_ -= nread; rb_.drain(nread); conn_.rlimit.startw(); if (left_connhd_len_ == 0) { on_read_ = &ClientHandler::upstream_read; // Run on_read to process data left in buffer since they are not // notified further if (on_read() != 0) { return -1; } return 0; } return 0; } int ClientHandler::upstream_http1_connhd_read() { auto nread = std::min(left_connhd_len_, rb_.rleft()); if (memcmp(&NGHTTP2_CLIENT_MAGIC[NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_], rb_.pos(), nread) != 0) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "This is HTTP/1.1 connection, " << "but may be upgraded to HTTP/2 later."; } // Reset header length for later HTTP/2 upgrade left_connhd_len_ = NGHTTP2_CLIENT_MAGIC_LEN; on_read_ = &ClientHandler::upstream_read; on_write_ = &ClientHandler::upstream_write; if (on_read() != 0) { return -1; } return 0; } left_connhd_len_ -= nread; rb_.drain(nread); conn_.rlimit.startw(); if (left_connhd_len_ == 0) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "direct HTTP/2 connection"; } direct_http2_upgrade(); on_read_ = &ClientHandler::upstream_read; on_write_ = &ClientHandler::upstream_write; // Run on_read to process data left in buffer since they are not // notified further if (on_read() != 0) { return -1; } return 0; } return 0; } ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, const StringRef &ipaddr, const StringRef &port, int family, const UpstreamAddr *faddr) : // We use balloc_ for TLS session ID (64), ipaddr (IPv6) (39), // port (5), forwarded-for (IPv6) (41), alpn (5), proxyproto // ipaddr (15), proxyproto port (5), sni (32, estimated). we // need terminal NULL byte for each. We also require 8 bytes // header for each allocation. We align at 16 bytes boundary, // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 + // 32 + 8 + 8 * 8 = 328. balloc_(512, 512), rb_(worker->get_mcpool()), conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(), get_config()->conn.upstream.timeout.write, get_config()->conn.upstream.timeout.read, get_config()->conn.upstream.ratelimit.write, get_config()->conn.upstream.ratelimit.read, writecb, readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold, get_config()->tls.dyn_rec.idle_timeout, faddr->quic ? Proto::HTTP3 : Proto::NONE), ipaddr_(make_string_ref(balloc_, ipaddr)), port_(make_string_ref(balloc_, port)), faddr_(faddr), worker_(worker), left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN), affinity_hash_(0), should_close_after_write_(false), affinity_hash_computed_(false) { ++worker_->get_worker_stat()->num_connections; ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.); reneg_shutdown_timer_.data = this; if (!faddr->quic) { conn_.rlimit.startw(); } ev_timer_again(conn_.loop, &conn_.rt); auto config = get_config(); if (!faddr->quic) { if (faddr_->accept_proxy_protocol || config->conn.upstream.accept_proxy_protocol) { read_ = &ClientHandler::proxy_protocol_peek_clear; write_ = &ClientHandler::noop; on_read_ = &ClientHandler::proxy_protocol_read; on_write_ = &ClientHandler::upstream_noop; } else { setup_upstream_io_callback(); } } auto &fwdconf = config->http.forwarded; if (fwdconf.params & FORWARDED_FOR) { if (fwdconf.for_node_type == ForwardedNode::OBFUSCATED) { // 1 for '_' auto len = SHRPX_OBFUSCATED_NODE_LENGTH + 1; // 1 for terminating NUL. auto buf = make_byte_ref(balloc_, len + 1); auto p = buf.base; *p++ = '_'; p = util::random_alpha_digit(p, p + SHRPX_OBFUSCATED_NODE_LENGTH, worker_->get_randgen()); *p = '\0'; forwarded_for_ = StringRef{buf.base, p}; } else { init_forwarded_for(family, ipaddr_); } } } void ClientHandler::init_forwarded_for(int family, const StringRef &ipaddr) { if (family == AF_INET6) { // 2 for '[' and ']' auto len = 2 + ipaddr.size(); // 1 for terminating NUL. auto buf = make_byte_ref(balloc_, len + 1); auto p = buf.base; *p++ = '['; p = std::copy(std::begin(ipaddr), std::end(ipaddr), p); *p++ = ']'; *p = '\0'; forwarded_for_ = StringRef{buf.base, p}; } else { // family == AF_INET or family == AF_UNIX forwarded_for_ = ipaddr; } } void ClientHandler::setup_upstream_io_callback() { if (conn_.tls.ssl) { conn_.prepare_server_handshake(); read_ = write_ = &ClientHandler::tls_handshake; on_read_ = &ClientHandler::upstream_noop; on_write_ = &ClientHandler::upstream_write; } else { // For non-TLS version, first create HttpsUpstream. It may be // upgraded to HTTP/2 through HTTP Upgrade or direct HTTP/2 // connection. upstream_ = std::make_unique(this); alpn_ = StringRef::from_lit("http/1.1"); read_ = &ClientHandler::read_clear; write_ = &ClientHandler::write_clear; on_read_ = &ClientHandler::upstream_http1_connhd_read; on_write_ = &ClientHandler::upstream_noop; } } #ifdef ENABLE_HTTP3 void ClientHandler::setup_http3_upstream( std::unique_ptr &&upstream) { upstream_ = std::move(upstream); write_ = &ClientHandler::write_quic; auto config = get_config(); reset_upstream_read_timeout(config->conn.upstream.timeout.http3_read); } #endif // ENABLE_HTTP3 ClientHandler::~ClientHandler() { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Deleting"; } if (upstream_) { upstream_->on_handler_delete(); } auto worker_stat = worker_->get_worker_stat(); --worker_stat->num_connections; if (worker_stat->num_connections == 0) { worker_->schedule_clear_mcpool(); } ev_timer_stop(conn_.loop, &reneg_shutdown_timer_); // TODO If backend is http/2, and it is in CONNECTED state, signal // it and make it loopbreak when output is zero. if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0 && worker_stat->num_close_waits == 0) { ev_break(conn_.loop); } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Deleted"; } } Upstream *ClientHandler::get_upstream() { return upstream_.get(); } struct ev_loop *ClientHandler::get_loop() const { return conn_.loop; } void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) { conn_.rt.repeat = t; if (ev_is_active(&conn_.rt)) { ev_timer_again(conn_.loop, &conn_.rt); } } void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) { conn_.wt.repeat = t; if (ev_is_active(&conn_.wt)) { ev_timer_again(conn_.loop, &conn_.wt); } } void ClientHandler::repeat_read_timer() { ev_timer_again(conn_.loop, &conn_.rt); } void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); } int ClientHandler::validate_next_proto() { const unsigned char *next_proto = nullptr; unsigned int next_proto_len = 0; // First set callback for catch all cases on_read_ = &ClientHandler::upstream_read; #ifndef OPENSSL_NO_NEXTPROTONEG SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len); #endif // !OPENSSL_NO_NEXTPROTONEG #if OPENSSL_VERSION_NUMBER >= 0x10002000L if (next_proto == nullptr) { SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len); } #endif // OPENSSL_VERSION_NUMBER >= 0x10002000L StringRef proto; if (next_proto) { proto = StringRef{next_proto, next_proto_len}; if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "The negotiated next protocol: " << proto; } } else { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "No protocol negotiated. Fallback to HTTP/1.1"; } proto = StringRef::from_lit("http/1.1"); } if (!tls::in_proto_list(get_config()->tls.npn_list, proto)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "The negotiated protocol is not supported: " << proto; } return -1; } if (util::check_h2_is_selected(proto)) { on_read_ = &ClientHandler::upstream_http2_connhd_read; auto http2_upstream = std::make_unique(this); upstream_ = std::move(http2_upstream); alpn_ = make_string_ref(balloc_, proto); // At this point, input buffer is already filled with some bytes. // The read callback is not called until new data come. So consume // input buffer here. if (on_read() != 0) { return -1; } return 0; } if (proto == StringRef::from_lit("http/1.1")) { upstream_ = std::make_unique(this); alpn_ = StringRef::from_lit("http/1.1"); // At this point, input buffer is already filled with some bytes. // The read callback is not called until new data come. So consume // input buffer here. if (on_read() != 0) { return -1; } return 0; } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "The negotiated protocol is not supported"; } return -1; } int ClientHandler::do_read() { return read_(*this); } int ClientHandler::do_write() { return write_(*this); } int ClientHandler::on_read() { if (rb_.chunk_avail()) { auto rv = on_read_(*this); if (rv != 0) { return rv; } } conn_.handle_tls_pending_read(); return 0; } int ClientHandler::on_write() { return on_write_(*this); } const StringRef &ClientHandler::get_ipaddr() const { return ipaddr_; } bool ClientHandler::get_should_close_after_write() const { return should_close_after_write_; } void ClientHandler::set_should_close_after_write(bool f) { should_close_after_write_ = f; } void ClientHandler::pool_downstream_connection( std::unique_ptr dconn) { if (!dconn->poolable()) { return; } dconn->set_client_handler(nullptr); auto &group = dconn->get_downstream_addr_group(); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get() << " in group " << group; } auto addr = dconn->get_addr(); auto &dconn_pool = addr->dconn_pool; dconn_pool->add_downstream_connection(std::move(dconn)); } namespace { // Computes 32bits hash for session affinity for IP address |ip|. uint32_t compute_affinity_from_ip(const StringRef &ip) { int rv; std::array buf; rv = util::sha256(buf.data(), ip); if (rv != 0) { // Not sure when sha256 failed. Just fall back to another // function. return util::hash32(ip); } return (static_cast(buf[0]) << 24) | (static_cast(buf[1]) << 16) | (static_cast(buf[2]) << 8) | static_cast(buf[3]); } } // namespace Http2Session *ClientHandler::get_http2_session( const std::shared_ptr &group, DownstreamAddr *addr) { auto &shared_addr = group->shared_addr; if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Selected DownstreamAddr=" << addr << ", index=" << (addr - shared_addr->addrs.data()); } for (auto session = addr->http2_extra_freelist.head; session;) { auto next = session->dlnext; if (session->max_concurrency_reached(0)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Maximum streams have been reached for Http2Session(" << session << "). Skip it"; } session->remove_from_freelist(); session = next; continue; } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Use Http2Session " << session << " from http2_extra_freelist"; } if (session->max_concurrency_reached(1)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" << session << ")."; } session->remove_from_freelist(); } return session; } auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(), worker_, group, addr); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Create new Http2Session " << session; } session->add_to_extra_freelist(); return session; } uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream, const StringRef &cookie_name) { auto h = downstream->find_affinity_cookie(cookie_name); if (h) { return h; } auto d = std::uniform_int_distribution( 1, std::numeric_limits::max()); auto rh = d(worker_->get_randgen()); h = util::hash32(StringRef{reinterpret_cast(&rh), reinterpret_cast(&rh) + sizeof(rh)}); downstream->renew_affinity_cookie(h); return h; } namespace { void reschedule_addr( std::priority_queue, DownstreamAddrEntryGreater> &pq, DownstreamAddr *addr) { auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty; addr->cycle += penalty / addr->weight; addr->pending_penalty = penalty % addr->weight; pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle}); addr->queued = true; } } // namespace namespace { void reschedule_wg( std::priority_queue, WeightGroupEntryGreater> &pq, WeightGroup *wg) { auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty; wg->cycle += penalty / wg->weight; wg->pending_penalty = penalty % wg->weight; pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle}); wg->queued = true; } } // namespace DownstreamAddr *ClientHandler::get_downstream_addr(int &err, DownstreamAddrGroup *group, Downstream *downstream) { err = 0; switch (faddr_->alt_mode) { case UpstreamAltMode::API: case UpstreamAltMode::HEALTHMON: assert(0); default: break; } auto &shared_addr = group->shared_addr; if (shared_addr->affinity.type != SessionAffinity::NONE) { uint32_t hash; switch (shared_addr->affinity.type) { case SessionAffinity::IP: if (!affinity_hash_computed_) { affinity_hash_ = compute_affinity_from_ip(ipaddr_); affinity_hash_computed_ = true; } hash = affinity_hash_; break; case SessionAffinity::COOKIE: if (shared_addr->affinity.cookie.stickiness == SessionAffinityCookieStickiness::STRICT) { return get_downstream_addr_strict_affinity(err, shared_addr, downstream); } hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name); break; default: assert(0); } const auto &affinity_hash = shared_addr->affinity_hash; auto it = std::lower_bound( std::begin(affinity_hash), std::end(affinity_hash), hash, [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; }); if (it == std::end(affinity_hash)) { it = std::begin(affinity_hash); } auto aff_idx = static_cast(std::distance(std::begin(affinity_hash), it)); auto idx = (*it).idx; auto addr = &shared_addr->addrs[idx]; if (addr->connect_blocker->blocked()) { size_t i; for (i = aff_idx + 1; i != aff_idx; ++i) { if (i == shared_addr->affinity_hash.size()) { i = 0; } addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx]; if (addr->connect_blocker->blocked()) { continue; } break; } if (i == aff_idx) { err = -1; return nullptr; } } return addr; } auto &wgpq = shared_addr->pq; for (;;) { if (wgpq.empty()) { CLOG(INFO, this) << "No working downstream address found"; err = -1; return nullptr; } auto wg = wgpq.top().wg; wgpq.pop(); wg->queued = false; for (;;) { if (wg->pq.empty()) { break; } auto addr = wg->pq.top().addr; wg->pq.pop(); addr->queued = false; if (addr->connect_blocker->blocked()) { continue; } reschedule_addr(wg->pq, addr); reschedule_wg(wgpq, wg); return addr; } } } DownstreamAddr *ClientHandler::get_downstream_addr_strict_affinity( int &err, const std::shared_ptr &shared_addr, Downstream *downstream) { const auto &affinity_hash = shared_addr->affinity_hash; auto h = downstream->find_affinity_cookie(shared_addr->affinity.cookie.name); if (h) { auto it = shared_addr->affinity_hash_map.find(h); if (it != std::end(shared_addr->affinity_hash_map)) { auto addr = &shared_addr->addrs[(*it).second]; if (!addr->connect_blocker->blocked()) { return addr; } } } else { auto d = std::uniform_int_distribution( 1, std::numeric_limits::max()); auto rh = d(worker_->get_randgen()); h = util::hash32(StringRef{reinterpret_cast(&rh), reinterpret_cast(&rh) + sizeof(rh)}); } // Client is not bound to a particular backend, or the bound backend // is not found, or is blocked. Find new backend using h. Using // existing h allows us to find new server in a deterministic way. // It is preferable because multiple concurrent requests with the // stale cookie might be in-flight. auto it = std::lower_bound( std::begin(affinity_hash), std::end(affinity_hash), h, [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; }); if (it == std::end(affinity_hash)) { it = std::begin(affinity_hash); } auto aff_idx = static_cast(std::distance(std::begin(affinity_hash), it)); auto idx = (*it).idx; auto addr = &shared_addr->addrs[idx]; if (addr->connect_blocker->blocked()) { size_t i; for (i = aff_idx + 1; i != aff_idx; ++i) { if (i == shared_addr->affinity_hash.size()) { i = 0; } addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx]; if (addr->connect_blocker->blocked()) { continue; } break; } if (i == aff_idx) { err = -1; return nullptr; } } downstream->renew_affinity_cookie(addr->affinity_hash); return addr; } std::unique_ptr ClientHandler::get_downstream_connection(int &err, Downstream *downstream) { size_t group_idx; auto &downstreamconf = *worker_->get_downstream_config(); auto &routerconf = downstreamconf.router; auto catch_all = downstreamconf.addr_group_catch_all; auto &groups = worker_->get_downstream_addr_groups(); auto &req = downstream->request(); err = 0; switch (faddr_->alt_mode) { case UpstreamAltMode::API: { auto dconn = std::make_unique(worker_); dconn->set_client_handler(this); return dconn; } case UpstreamAltMode::HEALTHMON: { auto dconn = std::make_unique(); dconn->set_client_handler(this); return dconn; } default: break; } auto &balloc = downstream->get_block_allocator(); StringRef authority, path; if (req.forwarded_once) { if (groups.size() != 1) { authority = req.orig_authority; path = req.orig_path; } } else { if (faddr_->sni_fwd) { authority = sni_; } else if (!req.authority.empty()) { authority = req.authority; } else { auto h = req.fs.header(http2::HD_HOST); if (h) { authority = h->value; } } // CONNECT method does not have path. But we requires path in // host-path mapping. As workaround, we assume that path is // "/". if (!req.regular_connect_method()) { path = req.path; } // Cache the authority and path used for the first-time backend // selection because per-pattern mruby script can change them. req.orig_authority = authority; req.orig_path = path; req.forwarded_once = true; } // Fast path. If we have one group, it must be catch-all group. if (groups.size() == 1) { group_idx = 0; } else { group_idx = match_downstream_addr_group(routerconf, authority, path, groups, catch_all, balloc); } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Downstream address group_idx: " << group_idx; } if (groups[group_idx]->shared_addr->redirect_if_not_tls && !conn_.tls.ssl) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Downstream address group " << group_idx << " requires frontend TLS connection."; } err = SHRPX_ERR_TLS_REQUIRED; return nullptr; } auto &group = groups[group_idx]; if (group->shared_addr->dnf) { auto dconn = std::make_unique(group); dconn->set_client_handler(this); return dconn; } auto addr = get_downstream_addr(err, group.get(), downstream); if (addr == nullptr) { return nullptr; } if (addr->proto == Proto::HTTP1) { auto dconn = addr->dconn_pool->pop_downstream_connection(); if (dconn) { dconn->set_client_handler(this); return dconn; } if (worker_->get_connect_blocker()->blocked()) { if (LOG_ENABLED(INFO)) { DCLOG(INFO, this) << "Worker wide backend connection was blocked temporarily"; } return nullptr; } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Downstream connection pool is empty." << " Create new one"; } dconn = std::make_unique(group, addr, conn_.loop, worker_); dconn->set_client_handler(this); return dconn; } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Downstream connection pool is empty." << " Create new one"; } auto http2session = get_http2_session(group, addr); auto dconn = std::make_unique(http2session); dconn->set_client_handler(this); return dconn; } MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); } SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; } void ClientHandler::direct_http2_upgrade() { upstream_ = std::make_unique(this); alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID); on_read_ = &ClientHandler::upstream_read; write_ = &ClientHandler::write_clear; } int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { auto upstream = std::make_unique(this); auto output = upstream->get_response_buf(); // We might have written non-final header in response_buf, in this // case, response_state is still INITIAL. If this non-final header // and upgrade header fit in output buffer, do upgrade. Otherwise, // to avoid to send this non-final header as response body in HTTP/2 // upstream, fail upgrade. auto downstream = http->get_downstream(); auto input = downstream->get_response_buf(); if (upstream->upgrade_upstream(http) != 0) { return -1; } // http pointer is now owned by upstream. upstream_.release(); // TODO We might get other version id in HTTP2-settings, if we // support aliasing for h2, but we just use library default for now. alpn_ = StringRef::from_lit(NGHTTP2_CLEARTEXT_PROTO_VERSION_ID); on_read_ = &ClientHandler::upstream_http2_connhd_read; write_ = &ClientHandler::write_clear; input->remove(*output, input->rleft()); constexpr auto res = StringRef::from_lit("HTTP/1.1 101 Switching Protocols\r\n" "Connection: Upgrade\r\n" "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n" "\r\n"); output->append(res); upstream_ = std::move(upstream); signal_write(); return 0; } bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; } StringRef ClientHandler::get_upstream_scheme() const { if (conn_.tls.ssl) { return StringRef::from_lit("https"); } else { return StringRef::from_lit("http"); } } void ClientHandler::start_immediate_shutdown() { ev_timer_start(conn_.loop, &reneg_shutdown_timer_); } void ClientHandler::write_accesslog(Downstream *downstream) { auto &req = downstream->request(); auto config = get_config(); if (!req.tstamp) { auto lgconf = log_config(); lgconf->update_tstamp(std::chrono::system_clock::now()); req.tstamp = lgconf->tstamp; } upstream_accesslog( config->logging.access.format, LogSpec{ downstream, ipaddr_, alpn_, sni_, conn_.tls.ssl, std::chrono::high_resolution_clock::now(), // request_end_time port_, faddr_->port, config->pid, }); } ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; } void ClientHandler::signal_write() { conn_.wlimit.startw(); } RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; } RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; } ev_io *ClientHandler::get_wev() { return &conn_.wev; } Worker *ClientHandler::get_worker() const { return worker_; } namespace { ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) { auto p = first; int32_t port = 0; if (p == last) { return -1; } if (*p == '0') { if (p + 1 != last && util::is_digit(*(p + 1))) { return -1; } return 1; } for (; p != last && util::is_digit(*p); ++p) { port *= 10; port += *p - '0'; if (port > 65535) { return -1; } } return p - first; } } // namespace int ClientHandler::on_proxy_protocol_finish() { auto len = rb_.pos() - rb_.begin(); assert(len); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol: Draining " << len << " bytes from socket"; } rb_.reset(); if (conn_.read_nolim_clear(rb_.pos(), len) < 0) { return -1; } rb_.reset(); setup_upstream_io_callback(); return 0; } namespace { // PROXY-protocol v2 header signature constexpr uint8_t PROXY_PROTO_V2_SIG[] = "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A"; // PROXY-protocol v2 header length constexpr size_t PROXY_PROTO_V2_HDLEN = str_size(PROXY_PROTO_V2_SIG) + /* ver_cmd(1) + fam(1) + len(2) = */ 4; } // namespace // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt int ClientHandler::proxy_protocol_read() { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol: Started"; } auto first = rb_.pos(); if (rb_.rleft() >= PROXY_PROTO_V2_HDLEN && (*(first + str_size(PROXY_PROTO_V2_SIG)) & 0xf0) == 0x20) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol: Detected v2 header signature"; } return proxy_protocol_v2_read(); } // NULL character really destroys functions which expects NULL // terminated string. We won't expect it in PROXY protocol line, so // find it here. auto chrs = std::array{'\n', '\0'}; constexpr size_t MAX_PROXY_LINELEN = 107; auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft()); auto end = std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs)); if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found"; } return -1; } --end; constexpr auto HEADER = StringRef::from_lit("PROXY "); if (static_cast(end - rb_.pos()) < HEADER.size()) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found"; } return -1; } if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID"; } return -1; } rb_.drain(HEADER.size()); int family; if (rb_.pos()[0] == 'T') { if (end - rb_.pos() < 5) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found"; } return -1; } if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family"; } return -1; } switch (rb_.pos()[3]) { case '4': family = AF_INET; break; case '6': family = AF_INET6; break; default: if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family"; } return -1; } rb_.drain(5); } else { if (end - rb_.pos() < 7) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found"; } return -1; } if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family"; } return -1; } rb_.drain(end + 2 - rb_.pos()); return on_proxy_protocol_finish(); } // source address auto token_end = std::find(rb_.pos(), end, ' '); if (token_end == end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found"; } return -1; } *token_end = '\0'; if (!util::numeric_host(reinterpret_cast(rb_.pos()), family)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address"; } return -1; } auto src_addr = rb_.pos(); auto src_addrlen = token_end - rb_.pos(); rb_.drain(token_end - rb_.pos() + 1); // destination address token_end = std::find(rb_.pos(), end, ' '); if (token_end == end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found"; } return -1; } *token_end = '\0'; if (!util::numeric_host(reinterpret_cast(rb_.pos()), family)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address"; } return -1; } // Currently we don't use destination address rb_.drain(token_end - rb_.pos() + 1); // source port auto n = parse_proxy_line_port(rb_.pos(), end); if (n <= 0 || *(rb_.pos() + n) != ' ') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port"; } return -1; } rb_.pos()[n] = '\0'; auto src_port = rb_.pos(); auto src_portlen = n; rb_.drain(n + 1); // destination port n = parse_proxy_line_port(rb_.pos(), end); if (n <= 0 || rb_.pos() + n != end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port"; } return -1; } // Currently we don't use destination port rb_.drain(end + 2 - rb_.pos()); ipaddr_ = make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen}); port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen}); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first) << " bytes read"; } auto config = get_config(); auto &fwdconf = config->http.forwarded; if ((fwdconf.params & FORWARDED_FOR) && fwdconf.for_node_type == ForwardedNode::IP) { init_forwarded_for(family, ipaddr_); } return on_proxy_protocol_finish(); } int ClientHandler::proxy_protocol_v2_read() { // Assume that first str_size(PROXY_PROTO_V2_SIG) octets match v2 // protocol signature and followed by the bytes which indicates v2. assert(rb_.rleft() >= PROXY_PROTO_V2_HDLEN); auto p = rb_.pos() + str_size(PROXY_PROTO_V2_SIG); assert(((*p) & 0xf0) == 0x20); enum { LOCAL, PROXY } cmd; auto cmd_bits = (*p++) & 0xf; switch (cmd_bits) { case 0x0: cmd = LOCAL; break; case 0x01: cmd = PROXY; break; default: if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Unknown command " << log::hex << cmd_bits; } return -1; } auto fam = *p++; uint16_t len; memcpy(&len, p, sizeof(len)); len = ntohs(len); p += sizeof(len); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Detected family=" << log::hex << fam << ", len=" << log::dec << len; } if (rb_.last() - p < len) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Prematurely truncated header block; require " << len << " bytes, " << rb_.last() - p << " bytes left"; } return -1; } int family; std::array src_addr, dst_addr; size_t addrlen; switch (fam) { case 0x11: case 0x12: if (len < 12) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET addresses"; } return -1; } family = AF_INET; addrlen = 4; break; case 0x21: case 0x22: if (len < 36) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_INET6 addresses"; } return -1; } family = AF_INET6; addrlen = 16; break; case 0x31: case 0x32: if (len < 216) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Too short AF_UNIX addresses"; } return -1; } // fall through case 0x00: { // UNSPEC and UNIX are just ignored. if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Ignore combination of address " "family and protocol " << log::hex << fam; } rb_.drain(PROXY_PROTO_V2_HDLEN + len); return on_proxy_protocol_finish(); } default: if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Unknown combination of address " "family and protocol " << log::hex << fam; } return -1; } if (cmd != PROXY) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Ignore non-PROXY command"; } rb_.drain(PROXY_PROTO_V2_HDLEN + len); return on_proxy_protocol_finish(); } if (inet_ntop(family, p, src_addr.data(), src_addr.size()) == nullptr) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse source address"; } return -1; } p += addrlen; if (inet_ntop(family, p, dst_addr.data(), dst_addr.size()) == nullptr) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Unable to parse destination address"; } return -1; } p += addrlen; uint16_t src_port; memcpy(&src_port, p, sizeof(src_port)); src_port = ntohs(src_port); // We don't use destination port. p += 4; ipaddr_ = make_string_ref(balloc_, StringRef{src_addr.data()}); port_ = util::make_string_ref_uint(balloc_, src_port); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v2: Finished reading proxy addresses, " << p - rb_.pos() << " bytes read, " << PROXY_PROTO_V2_HDLEN + len - (p - rb_.pos()) << " bytes left"; } auto config = get_config(); auto &fwdconf = config->http.forwarded; if ((fwdconf.params & FORWARDED_FOR) && fwdconf.for_node_type == ForwardedNode::IP) { init_forwarded_for(family, ipaddr_); } rb_.drain(PROXY_PROTO_V2_HDLEN + len); return on_proxy_protocol_finish(); } StringRef ClientHandler::get_forwarded_by() const { auto &fwdconf = get_config()->http.forwarded; if (fwdconf.by_node_type == ForwardedNode::OBFUSCATED) { return fwdconf.by_obfuscated; } return faddr_->hostport; } StringRef ClientHandler::get_forwarded_for() const { return forwarded_for_; } const UpstreamAddr *ClientHandler::get_upstream_addr() const { return faddr_; } Connection *ClientHandler::get_connection() { return &conn_; }; void ClientHandler::set_tls_sni(const StringRef &sni) { sni_ = make_string_ref(balloc_, sni); } StringRef ClientHandler::get_tls_sni() const { return sni_; } StringRef ClientHandler::get_alpn() const { return alpn_; } BlockAllocator &ClientHandler::get_block_allocator() { return balloc_; } void ClientHandler::set_alpn_from_conn() { const unsigned char *alpn; unsigned int alpnlen; SSL_get0_alpn_selected(conn_.tls.ssl, &alpn, &alpnlen); alpn_ = make_string_ref(balloc_, StringRef{alpn, alpnlen}); } } // namespace shrpx