diff --git a/src/memchunk.h b/src/memchunk.h index 991a190a..44daf164 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -434,6 +434,102 @@ inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) { return iovcnt; } +// MemchunkBuffer is similar to Buffer, but it uses pooled Memchunk +// for its underlying buffer. +template struct MemchunkBuffer { + MemchunkBuffer(Pool *pool) : pool(pool), chunk(nullptr) {} + MemchunkBuffer(const MemchunkBuffer &) = delete; + MemchunkBuffer(MemchunkBuffer &&other) noexcept + : pool(other.pool), chunk(other.chunk) { + other.chunk = nullptr; + } + MemchunkBuffer &operator=(const MemchunkBuffer &) = delete; + MemchunkBuffer &operator=(MemchunkBuffer &&other) noexcept { + if (this == &other) { + return *this; + } + + pool = other.pool; + chunk = other.chunk; + + other.chunk = nullptr; + + return *this; + } + + ~MemchunkBuffer() { + if (!pool || !chunk) { + return; + } + pool->recycle(chunk); + } + + // Ensures that the underlying buffer is allocated. + void ensure_chunk() { + if (chunk) { + return; + } + chunk = pool->get(); + } + + // Releases the underlying buffer. + void release_chunk() { + if (!chunk) { + return; + } + pool->recycle(chunk); + chunk = nullptr; + } + + // Returns true if the underlying buffer is allocated. + bool chunk_avail() const { return chunk != nullptr; } + + // The functions below must be called after the underlying buffer is + // allocated (use ensure_chunk). + + // MemchunkBuffer provides the same interface functions with Buffer. + // Since we has chunk as a member variable, pos and last are + // implemented as wrapper functions. + + uint8_t *pos() const { return chunk->pos; } + uint8_t *last() const { return chunk->last; } + + size_t rleft() const { return chunk->len(); } + size_t wleft() const { return chunk->left(); } + size_t write(const void *src, size_t count) { + count = std::min(count, wleft()); + auto p = static_cast(src); + chunk->last = std::copy_n(p, count, chunk->last); + return count; + } + size_t write(size_t count) { + count = std::min(count, wleft()); + chunk->last += count; + return count; + } + size_t drain(size_t count) { + count = std::min(count, rleft()); + chunk->pos += count; + return count; + } + size_t drain_reset(size_t count) { + count = std::min(count, rleft()); + std::copy(chunk->pos + count, chunk->last, std::begin(chunk->buf)); + chunk->last = std::begin(chunk->buf) + (chunk->last - (chunk->pos + count)); + chunk->pos = std::begin(chunk->buf); + return count; + } + void reset() { chunk->reset(); } + uint8_t *begin() { return std::begin(chunk->buf); } + uint8_t &operator[](size_t n) { return chunk->buf[n]; } + const uint8_t &operator[](size_t n) const { return chunk->buf[n]; } + + Pool *pool; + Memchunk *chunk; +}; + +using DefaultMemchunkBuffer = MemchunkBuffer; + } // namespace nghttp2 #endif // MEMCHUNK_H diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 1280d452..3f75ada4 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -117,6 +117,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { int ClientHandler::noop() { return 0; } int ClientHandler::read_clear() { + rb_.ensure_chunk(); for (;;) { if (rb_.rleft() && on_read() != 0) { return -1; @@ -132,9 +133,12 @@ int ClientHandler::read_clear() { return 0; } - auto nread = conn_.read_clear(rb_.last, rb_.wleft()); + auto nread = conn_.read_clear(rb_.last(), rb_.wleft()); if (nread == 0) { + if (rb_.rleft() == 0) { + rb_.release_chunk(); + } return 0; } @@ -209,6 +213,8 @@ int ClientHandler::tls_handshake() { int ClientHandler::read_tls() { ERR_clear_error(); + rb_.ensure_chunk(); + for (;;) { // we should process buffered data first before we read EOF. if (rb_.rleft() && on_read() != 0) { @@ -225,9 +231,12 @@ int ClientHandler::read_tls() { return 0; } - auto nread = conn_.read_tls(rb_.last, rb_.wleft()); + auto nread = conn_.read_tls(rb_.last(), rb_.wleft()); if (nread == 0) { + if (rb_.rleft() == 0) { + rb_.release_chunk(); + } return 0; } @@ -303,7 +312,7 @@ int ClientHandler::upstream_write() { int ClientHandler::upstream_http2_connhd_read() { auto nread = std::min(left_connhd_len_, rb_.rleft()); if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_, - rb_.pos, nread) != 0) { + rb_.pos(), nread) != 0) { // There is no downgrade path here. Just drop the connection. if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "invalid client connection header"; @@ -332,7 +341,7 @@ int ClientHandler::upstream_http2_connhd_read() { int ClientHandler::upstream_http1_connhd_read() { auto nread = std::min(left_connhd_len_, rb_.rleft()); if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_, - rb_.pos, nread) != 0) { + rb_.pos(), nread) != 0) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "This is HTTP/1.1 connection, " << "but may be upgraded to HTTP/2 later."; @@ -386,6 +395,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, // so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 + // 32 + 8 + 8 * 8 = 328. balloc_(512, 512), + rb_(worker->get_mcpool()), conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(), get_config()->conn.upstream.timeout.write, get_config()->conn.upstream.timeout.read, @@ -647,9 +657,11 @@ int ClientHandler::do_read() { return read_(*this); } int ClientHandler::do_write() { return write_(*this); } int ClientHandler::on_read() { - auto rv = on_read_(*this); - if (rv != 0) { - return rv; + if (rb_.chunk_avail()) { + auto rv = on_read_(*this); + if (rv != 0) { + return rv; + } } conn_.handle_tls_pending_read(); return 0; @@ -1325,7 +1337,7 @@ ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) { int ClientHandler::on_proxy_protocol_finish() { if (conn_.tls.ssl) { - conn_.tls.rbuf.append(rb_.pos, rb_.rleft()); + conn_.tls.rbuf.append(rb_.pos(), rb_.rleft()); rb_.reset(); } @@ -1346,7 +1358,7 @@ int ClientHandler::proxy_protocol_read() { CLOG(INFO, this) << "PROXY-protocol: Started"; } - auto first = rb_.pos; + auto first = rb_.pos(); // NULL character really destroys functions which expects NULL // terminated string. We won't expect it in PROXY protocol line, so @@ -1355,12 +1367,12 @@ int ClientHandler::proxy_protocol_read() { constexpr size_t MAX_PROXY_LINELEN = 107; - auto bufend = rb_.pos + std::min(MAX_PROXY_LINELEN, rb_.rleft()); + auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft()); auto end = - std::find_first_of(rb_.pos, bufend, std::begin(chrs), std::end(chrs)); + std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs)); - if (end == bufend || *end == '\0' || end == rb_.pos || *(end - 1) != '\r') { + if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found"; } @@ -1371,14 +1383,14 @@ int ClientHandler::proxy_protocol_read() { constexpr auto HEADER = StringRef::from_lit("PROXY "); - if (static_cast(end - rb_.pos) < HEADER.size()) { + if (static_cast(end - rb_.pos()) < HEADER.size()) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found"; } return -1; } - if (!util::streq(HEADER, StringRef{rb_.pos, HEADER.size()})) { + if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID"; } @@ -1389,22 +1401,22 @@ int ClientHandler::proxy_protocol_read() { int family; - if (rb_.pos[0] == 'T') { - if (end - rb_.pos < 5) { + if (rb_.pos()[0] == 'T') { + if (end - rb_.pos() < 5) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found"; } return -1; } - if (rb_.pos[1] != 'C' || rb_.pos[2] != 'P') { + if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family"; } return -1; } - switch (rb_.pos[3]) { + switch (rb_.pos()[3]) { case '4': family = AF_INET; break; @@ -1420,26 +1432,26 @@ int ClientHandler::proxy_protocol_read() { rb_.drain(5); } else { - if (end - rb_.pos < 7) { + if (end - rb_.pos() < 7) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found"; } return -1; } - if (!util::streq_l("UNKNOWN", rb_.pos, 7)) { + if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family"; } return -1; } - rb_.drain(end + 2 - rb_.pos); + rb_.drain(end + 2 - rb_.pos()); return on_proxy_protocol_finish(); } // source address - auto token_end = std::find(rb_.pos, end, ' '); + auto token_end = std::find(rb_.pos(), end, ' '); if (token_end == end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found"; @@ -1448,20 +1460,20 @@ int ClientHandler::proxy_protocol_read() { } *token_end = '\0'; - if (!util::numeric_host(reinterpret_cast(rb_.pos), family)) { + if (!util::numeric_host(reinterpret_cast(rb_.pos()), family)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address"; } return -1; } - auto src_addr = rb_.pos; - auto src_addrlen = token_end - rb_.pos; + auto src_addr = rb_.pos(); + auto src_addrlen = token_end - rb_.pos(); - rb_.drain(token_end - rb_.pos + 1); + rb_.drain(token_end - rb_.pos() + 1); // destination address - token_end = std::find(rb_.pos, end, ' '); + token_end = std::find(rb_.pos(), end, ' '); if (token_end == end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found"; @@ -1470,7 +1482,7 @@ int ClientHandler::proxy_protocol_read() { } *token_end = '\0'; - if (!util::numeric_host(reinterpret_cast(rb_.pos), family)) { + if (!util::numeric_host(reinterpret_cast(rb_.pos()), family)) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address"; } @@ -1479,26 +1491,26 @@ int ClientHandler::proxy_protocol_read() { // Currently we don't use destination address - rb_.drain(token_end - rb_.pos + 1); + rb_.drain(token_end - rb_.pos() + 1); // source port - auto n = parse_proxy_line_port(rb_.pos, end); - if (n <= 0 || *(rb_.pos + n) != ' ') { + auto n = parse_proxy_line_port(rb_.pos(), end); + if (n <= 0 || *(rb_.pos() + n) != ' ') { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port"; } return -1; } - rb_.pos[n] = '\0'; - auto src_port = rb_.pos; + rb_.pos()[n] = '\0'; + auto src_port = rb_.pos(); auto src_portlen = n; rb_.drain(n + 1); // destination port - n = parse_proxy_line_port(rb_.pos, end); - if (n <= 0 || rb_.pos + n != end) { + n = parse_proxy_line_port(rb_.pos(), end); + if (n <= 0 || rb_.pos() + n != end) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port"; } @@ -1507,14 +1519,14 @@ int ClientHandler::proxy_protocol_read() { // Currently we don't use destination port - rb_.drain(end + 2 - rb_.pos); + rb_.drain(end + 2 - rb_.pos()); ipaddr_ = make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen}); port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen}); if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos - first) + CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first) << " bytes read"; } diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 120a2a88..4185d649 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -124,7 +124,7 @@ public: int64_t body_bytes_sent); Worker *get_worker() const; - using ReadBuf = Buffer<16_k>; + using ReadBuf = DefaultMemchunkBuffer; ReadBuf *get_rb(); @@ -171,6 +171,7 @@ private: // sure that the allocations must be bounded, and not proportional // to the number of requests. BlockAllocator balloc_; + DefaultMemchunkBuffer rb_; Connection conn_; ev_timer reneg_shutdown_timer_; std::unique_ptr upstream_; @@ -197,7 +198,6 @@ private: bool should_close_after_write_; // true if affinity_hash_ is computed bool affinity_hash_computed_; - ReadBuf rb_; }; } // namespace shrpx diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 68ab81c5..e55e3451 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -1055,7 +1055,7 @@ int Http2Upstream::on_read() { auto rlimit = handler_->get_rlimit(); if (rb->rleft()) { - rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft()); + rv = nghttp2_session_mem_recv(session_, rb->pos(), rb->rleft()); if (rv < 0) { if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) { ULOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: " diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 6b315c4e..b9a5df00 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -532,7 +532,7 @@ int HttpsUpstream::on_read() { // callback chain called by http_parser_execute() if (downstream && downstream->get_upgraded()) { - auto rv = downstream->push_upload_data_chunk(rb->pos, rb->rleft()); + auto rv = downstream->push_upload_data_chunk(rb->pos(), rb->rleft()); if (rv != 0) { return -1; @@ -565,8 +565,9 @@ int HttpsUpstream::on_read() { } // http_parser_execute() does nothing once it entered error state. - auto nread = http_parser_execute( - &htp_, &htp_hooks, reinterpret_cast(rb->pos), rb->rleft()); + auto nread = http_parser_execute(&htp_, &htp_hooks, + reinterpret_cast(rb->pos()), + rb->rleft()); rb->drain(nread); rlimit->startw(); diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index b4cbac81..1bc9bfa7 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -105,7 +105,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len, auto nread = std::min(rb->rleft(), len); - memcpy(buf, rb->pos, nread); + memcpy(buf, rb->pos(), nread); rb->drain(nread); rlimit->startw();