From 4fa150c494372a61717f3fe42d976c0ed9f60e6a Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sun, 8 Jan 2017 16:41:02 +0900 Subject: [PATCH] nghttpx: Use Memchunk based read buffer for frontend connection Previously, we have dedicated read buffer for each frontend connection. With this commit, the buffer spaces are only used when needed, and pooled if they are not used. This reduces memory usage for idle client connections. --- src/memchunk.h | 96 +++++++++++++++++++++++++++++++++++++ src/shrpx_client_handler.cc | 86 +++++++++++++++++++-------------- src/shrpx_client_handler.h | 4 +- src/shrpx_http2_upstream.cc | 2 +- src/shrpx_https_upstream.cc | 7 +-- src/shrpx_spdy_upstream.cc | 2 +- 6 files changed, 153 insertions(+), 44 deletions(-) diff --git a/src/memchunk.h b/src/memchunk.h index 991a190a..44daf164 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -434,6 +434,102 @@ inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) { return iovcnt; } +// MemchunkBuffer is similar to Buffer, but it uses pooled Memchunk +// for its underlying buffer. +template struct MemchunkBuffer { + MemchunkBuffer(Pool *pool) : pool(pool), chunk(nullptr) {} + MemchunkBuffer(const MemchunkBuffer &) = delete; + MemchunkBuffer(MemchunkBuffer &&other) noexcept + : pool(other.pool), chunk(other.chunk) { + other.chunk = nullptr; + } + MemchunkBuffer &operator=(const MemchunkBuffer &) = delete; + MemchunkBuffer &operator=(MemchunkBuffer &&other) noexcept { + if (this == &other) { + return *this; + } + + pool = other.pool; + chunk = other.chunk; + + other.chunk = nullptr; + + return *this; + } + + ~MemchunkBuffer() { + if (!pool || !chunk) { + return; + } + pool->recycle(chunk); + } + + // Ensures that the underlying buffer is allocated. + void ensure_chunk() { + if (chunk) { + return; + } + chunk = pool->get(); + } + + // Releases the underlying buffer. + void release_chunk() { + if (!chunk) { + return; + } + pool->recycle(chunk); + chunk = nullptr; + } + + // Returns true if the underlying buffer is allocated. + bool chunk_avail() const { return chunk != nullptr; } + + // The functions below must be called after the underlying buffer is + // allocated (use ensure_chunk). + + // MemchunkBuffer provides the same interface functions with Buffer. + // Since we has chunk as a member variable, pos and last are + // implemented as wrapper functions. + + uint8_t *pos() const { return chunk->pos; } + uint8_t *last() const { return chunk->last; } + + size_t rleft() const { return chunk->len(); } + size_t wleft() const { return chunk->left(); } + size_t write(const void *src, size_t count) { + count = std::min(count, wleft()); + auto p = static_cast(src); + chunk->last = std::copy_n(p, count, chunk->last); + return count; + } + size_t write(size_t count) { + count = std::min(count, wleft()); + chunk->last += count; + return count; + } + size_t drain(size_t count) { + count = std::min(count, rleft()); + chunk->pos += count; + return count; + } + size_t drain_reset(size_t count) { + count = std::min(count, rleft()); + std::copy(chunk->pos + count, chunk->last, std::begin(chunk->buf)); + chunk->last = std::begin(chunk->buf) + (chunk->last - (chunk->pos + count)); + chunk->pos = std::begin(chunk->buf); + return count; + } + void reset() { chunk->reset(); } + uint8_t *begin() { return std::begin(chunk->buf); } + uint8_t &operator[](size_t n) { return chunk->buf[n]; } + const uint8_t &operator[](size_t n) const { return chunk->buf[n]; } + + Pool *pool; + Memchunk *chunk; +}; + +using DefaultMemchunkBuffer = MemchunkBuffer; + } // namespace nghttp2 #endif // MEMCHUNK_H diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 1280d452..3f75ada4 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -117,6 +117,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { int ClientHandler::noop() { return 0; } int ClientHandler::read_clear() { + rb_.ensure_chunk(); for (;;) { if (rb_.rleft() && on_read() != 0) { return -1; @@ -132,9 +133,12 @@ int ClientHandler::read_clear() { return 0; } - auto nread = conn_.read_clear(rb_.last, rb_.wleft()); + auto nread = conn_.read_clear(rb_.last(), rb_.wleft()); if (nread == 0) { + if (rb_.rleft() == 0) { + rb_.release_chunk(); + } return 0; } @@ -209,6 +213,8 @@ int ClientHandler::tls_handshake() { int ClientHandler::read_tls() { ERR_clear_error(); + rb_.ensure_chunk(); + for (;;) { // we should process buffered data first before we read EOF. if (rb_.rleft() && on_read() != 0) { @@ -225,9 +231,12 @@ int ClientHandler::read_tls() { return 0; } - auto nread = conn_.read_tls(rb_.last, rb_.wleft()); + auto nread = conn_.read_tls(rb_.last(), rb_.wleft()); if (nread == 0) { + if (rb_.rleft() == 0) { + rb_.release_chunk(); + } return 0; } @@ -303,7 +312,7 @@ int ClientHandler::upstream_write() { int ClientHandler::upstream_http2_connhd_read() { auto nread = std::min(left_connhd_len_, rb_.rleft()); if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_, - rb_.pos, nread) != 0) { + rb_.pos(), nread) != 0) { // There is no downgrade path here. Just drop the connection. if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "invalid client connection header"; @@ -332,7 +341,7 @@ int ClientHandler::upstream_http2_connhd_read() { int ClientHandler::upstream_http1_connhd_read() { auto nread = std::min(left_connhd_len_, rb_.rleft()); if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_, - rb_.pos, nread) != 0) { + rb_.pos(), nread) != 0) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "This is HTTP/1.1 connection, " << "but may be upgraded to HTTP/2 later."; @@ -386,6 +395,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 + // 32 + 8 + 8 * 8 = 328. balloc_(512, 512), + rb_(worker->get_mcpool()), conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(), get_config()->conn.upstream.timeout.write, get_config()->conn.upstream.timeout.read, @@ -647,9 +657,11 @@ int ClientHandler::do_read() { return read_(*this); } int ClientHandler::do_write() { return write_(*this); } int ClientHandler::on_read() { - auto rv = on_read_(*this); - if (rv != 0) { - return rv; + if (rb_.chunk_avail()) { + auto rv = on_read_(*this); + if (rv != 0) { + return rv; + } } conn_.handle_tls_pending_read(); return 0; @@ -1325,7 +1337,7 @@ ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) { int ClientHandler::on_proxy_protocol_finish() { if (conn_.tls.ssl) { - conn_.tls.rbuf.append(rb_.pos, rb_.rleft()); + conn_.tls.rbuf.append(rb_.pos(), rb_.rleft()); rb_.reset(); } @@ -1346,7 +1358,7 @@ int ClientHandler::proxy_protocol_read() { CLOG(INFO, this) << "PROXY-protocol: Started"; } - auto first = rb_.pos; + auto first = rb_.pos(); // NULL character really destroys functions which expects NULL // terminated string. We won't expect it in PROXY protocol line, so @@ -1355,12 +1367,12 @@ int ClientHandler::proxy_protocol_read() { constexpr size_t MAX_PROXY_LINELEN = 107; - auto bufend = rb_.pos + std::min(MAX_PROXY_LINELEN, rb_.rleft()); + auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft()); auto end = - std::find_first_of(rb_.pos, bufend, std::begin(chrs), std::end(chrs)); + std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs)); - if (end == bufend || *end == '\0' || end == rb_.pos || *(end - 1) != '\r') { + if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found"; } @@ -1371,14 +1383,14 @@ int ClientHandler::proxy_protocol_read() { constexpr auto HEADER = StringRef::from_lit("PROXY "); - if (static_cast(end - rb_.pos) < HEADER.size()) { + if (static_cast(end - rb_.pos()) < HEADER.size()) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found"; } return -1; } - if (!util::streq(HEADER, StringRef{rb_.pos, HEADER.size()})) { + if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID"; } @@ -1389,22 +1401,22 @@ int ClientHandler::proxy_protocol_read() { int family; - if (rb_.pos[0] == 'T') { - if (end - rb_.pos < 5) { + if (rb_.pos()[0] == 'T') { + if (end - rb_.pos() < 5) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found"; } return -1; } - if (rb_.pos[1] != 'C' || rb_.pos[2] != 'P') { + if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family"; } return -1; } - switch (rb_.pos[3]) { + switch (rb_.pos()[3]) { case '4': family = AF_INET; break; @@ -1420,26 +1432,26 @@ int ClientHandler::proxy_protocol_read() { rb_.drain(5); } else { - if (end - rb_.pos < 7) { + if (end - rb_.pos() < 7) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found"; } return -1; } - if (!util::streq_l("UNKNOWN", rb_.pos, 7)) { + if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family"; } return -1; } - rb_.drain(end + 2 - rb_.pos); + rb_.drain(end + 2 - rb_.pos()); return on_proxy_protocol_finish(); } // source address - auto token_end = std::find(rb_.pos, end, ' '); + auto token_end = std::find(rb_.pos(), end, ' '); if (token_end == end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found"; @@ -1448,20 +1460,20 @@ int ClientHandler::proxy_protocol_read() { } *token_end = '\0'; - if (!util::numeric_host(reinterpret_cast(rb_.pos), family)) { + if (!util::numeric_host(reinterpret_cast(rb_.pos()), family)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address"; } return -1; } - auto src_addr = rb_.pos; - auto src_addrlen = token_end - rb_.pos; + auto src_addr = rb_.pos(); + auto src_addrlen = token_end - rb_.pos(); - rb_.drain(token_end - rb_.pos + 1); + rb_.drain(token_end - rb_.pos() + 1); // destination address - token_end = std::find(rb_.pos, end, ' '); + token_end = std::find(rb_.pos(), end, ' '); if (token_end == end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found"; @@ -1470,7 +1482,7 @@ int ClientHandler::proxy_protocol_read() { } *token_end = '\0'; - if (!util::numeric_host(reinterpret_cast(rb_.pos), family)) { + if (!util::numeric_host(reinterpret_cast(rb_.pos()), family)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address"; } @@ -1479,26 +1491,26 @@ int ClientHandler::proxy_protocol_read() { // Currently we don't use destination address - rb_.drain(token_end - rb_.pos + 1); + rb_.drain(token_end - rb_.pos() + 1); // source port - auto n = parse_proxy_line_port(rb_.pos, end); - if (n <= 0 || *(rb_.pos + n) != ' ') { + auto n = parse_proxy_line_port(rb_.pos(), end); + if (n <= 0 || *(rb_.pos() + n) != ' ') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port"; } return -1; } - rb_.pos[n] = '\0'; - auto src_port = rb_.pos; + rb_.pos()[n] = '\0'; + auto src_port = rb_.pos(); auto src_portlen = n; rb_.drain(n + 1); // destination port - n = parse_proxy_line_port(rb_.pos, end); - if (n <= 0 || rb_.pos + n != end) { + n = parse_proxy_line_port(rb_.pos(), end); + if (n <= 0 || rb_.pos() + n != end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port"; } @@ -1507,14 +1519,14 @@ int ClientHandler::proxy_protocol_read() { // Currently we don't use destination port - rb_.drain(end + 2 - rb_.pos); + rb_.drain(end + 2 - rb_.pos()); ipaddr_ = make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen}); port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen}); if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos - first) + CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first) << " bytes read"; } diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 120a2a88..4185d649 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -124,7 +124,7 @@ public: int64_t body_bytes_sent); Worker *get_worker() const; - using ReadBuf = Buffer<16_k>; + using ReadBuf = DefaultMemchunkBuffer; ReadBuf *get_rb(); @@ -171,6 +171,7 @@ private: // sure that the allocations must be bounded, and not proportional // to the number of requests. BlockAllocator balloc_; + DefaultMemchunkBuffer rb_; Connection conn_; ev_timer reneg_shutdown_timer_; std::unique_ptr upstream_; @@ -197,7 +198,6 @@ private: bool should_close_after_write_; // true if affinity_hash_ is computed bool affinity_hash_computed_; - ReadBuf rb_; }; } // namespace shrpx diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 68ab81c5..e55e3451 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -1055,7 +1055,7 @@ int Http2Upstream::on_read() { auto rlimit = handler_->get_rlimit(); if (rb->rleft()) { - rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft()); + rv = nghttp2_session_mem_recv(session_, rb->pos(), rb->rleft()); if (rv < 0) { if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) { ULOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: " diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 6b315c4e..b9a5df00 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -532,7 +532,7 @@ int HttpsUpstream::on_read() { // callback chain called by http_parser_execute() if (downstream && downstream->get_upgraded()) { - auto rv = downstream->push_upload_data_chunk(rb->pos, rb->rleft()); + auto rv = downstream->push_upload_data_chunk(rb->pos(), rb->rleft()); if (rv != 0) { return -1; @@ -565,8 +565,9 @@ int HttpsUpstream::on_read() { } // http_parser_execute() does nothing once it entered error state. - auto nread = http_parser_execute( - &htp_, &htp_hooks, reinterpret_cast(rb->pos), rb->rleft()); + auto nread = http_parser_execute(&htp_, &htp_hooks, + reinterpret_cast(rb->pos()), + rb->rleft()); rb->drain(nread); rlimit->startw(); diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index b4cbac81..1bc9bfa7 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -105,7 +105,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len, auto nread = std::min(rb->rleft(), len); - memcpy(buf, rb->pos, nread); + memcpy(buf, rb->pos(), nread); rb->drain(nread); rlimit->startw();