From 98253b1d0d570dd3bee96fd085f638f54d571102 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 26 Jan 2016 23:04:53 +0900 Subject: [PATCH] nghttpx: Use DefaultMemchunks as HTTP/2 and SPDY frontend response buffer --- src/memchunk.h | 46 ++++++++++-- src/shrpx_client_handler.cc | 18 +---- src/shrpx_http2_upstream.cc | 145 ++++++------------------------------ src/shrpx_http2_upstream.h | 29 +------- src/shrpx_spdy_upstream.cc | 24 +++--- src/shrpx_spdy_upstream.h | 6 +- 6 files changed, 78 insertions(+), 190 deletions(-) diff --git a/src/memchunk.h b/src/memchunk.h index 43d2ebb9..81e4ef11 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -41,6 +41,14 @@ namespace nghttp2 { +#define DEFAULT_WR_IOVCNT 16 + +#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT +#define MAX_WR_IOVCNT IOV_MAX +#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT +#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT +#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT + template struct Memchunk { Memchunk(std::unique_ptr next_chunk) : pos(std::begin(buf)), last(pos), knext(std::move(next_chunk)), @@ -199,6 +207,36 @@ template struct Memchunks { return first - static_cast(dest); } + size_t remove(Memchunks &dest, size_t count) { + if (!tail || count == 0) { + return 0; + } + + auto left = count; + auto m = head; + + while (m) { + auto next = m->next; + auto n = std::min(left, m->len()); + + assert(m->len()); + dest.append(m->pos, n); + m->pos += n; + len -= n; + left -= n; + if (m->len() > 0) { + break; + } + pool->recycle(m); + m = next; + } + head = m; + if (head == nullptr) { + tail = nullptr; + } + + return count - left; + } size_t drain(size_t count) { auto ndata = count; auto m = head; @@ -374,14 +412,6 @@ using MemchunkPool = Pool; using DefaultMemchunks = Memchunks; using DefaultPeekMemchunks = PeekMemchunks; -#define DEFAULT_WR_IOVCNT 16 - -#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT -#define MAX_WR_IOVCNT IOV_MAX -#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT -#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT -#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT - inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) { if (max == 0) { return 0; diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index de15f4b4..01e2955d 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -761,17 +761,6 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n" "\r\n"; - auto required_size = str_size(res) + input->rleft(); - - if (output->wleft() < required_size) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) - << "HTTP Upgrade failed because of insufficient buffer space: need " - << required_size << ", available " << output->wleft(); - } - return -1; - } - if (upstream->upgrade_upstream(http) != 0) { return -1; } @@ -783,11 +772,8 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { on_read_ = &ClientHandler::upstream_http2_connhd_read; write_ = &ClientHandler::write_clear; - auto nread = - downstream->get_response_buf()->remove(output->last, output->wleft()); - output->write(nread); - - output->write(res, str_size(res)); + input->remove(*output, input->rleft()); + output->append(res, str_size(res)); upstream_ = std::move(upstream); signal_write(); diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 2f175e65..2c525a74 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -50,6 +50,10 @@ using namespace nghttp2; namespace shrpx { +namespace { +constexpr size_t MAX_BUFFER_SIZE = 32_k; +} // namespace + namespace { int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { @@ -661,13 +665,6 @@ int on_frame_not_send_callback(nghttp2_session *session, } } // namespace -void Http2Upstream::set_pending_data_downstream(Downstream *downstream, - size_t n, size_t padlen) { - pending_data_downstream_ = downstream; - data_pendinglen_ = n; - padding_pendinglen_ = padlen; -} - namespace { constexpr auto PADDING = std::array{}; } // namespace @@ -682,51 +679,21 @@ int send_data_callback(nghttp2_session *session, nghttp2_frame *frame, auto wb = upstream->get_response_buf(); - size_t padlen; + size_t padlen = 0; - if (frame->data.padlen == 0) { - if (wb->wleft() < 9) { - return NGHTTP2_ERR_WOULDBLOCK; - } - - wb->write(framehd, 9); - padlen = 0; - } else { - if (wb->wleft() < 10) { - return NGHTTP2_ERR_WOULDBLOCK; - } - - wb->write(framehd, 9); + wb->append(framehd, 9); + if (frame->data.padlen > 0) { padlen = frame->data.padlen - 1; - *wb->last++ = padlen; + wb->append(static_cast(padlen)); } - size_t npadwrite = 0; - auto nwrite = std::min(length, wb->wleft()); - body->remove(wb->last, nwrite); - wb->write(nwrite); - if (nwrite < length) { - // We must store unsent amount of data to somewhere. We just tell - // libnghttp2 that we wrote everything, so downstream could be - // deleted. We handle this situation in - // Http2Upstream::remove_downstream(). - upstream->set_pending_data_downstream(downstream, length - nwrite, padlen); - } else if (padlen > 0) { - npadwrite = std::min(padlen, wb->wleft()); - wb->write(PADDING.data(), npadwrite); + body->remove(*wb, length); - if (npadwrite < padlen) { - upstream->set_pending_data_downstream(nullptr, 0, padlen - npadwrite); - } - } + wb->append(PADDING.data(), padlen); - if (wb->rleft() == 0) { - downstream->disable_upstream_wtimer(); - } else { - downstream->reset_upstream_wtimer(); - } + downstream->reset_upstream_wtimer(); - if (nwrite > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nwrite) != 0) { + if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; } @@ -734,7 +701,7 @@ int send_data_callback(nghttp2_session *session, nghttp2_frame *frame, // data transferred. downstream->response_sent_body_length += length; - return (nwrite < length || npadwrite < padlen) ? NGHTTP2_ERR_PAUSE : 0; + return wb->rleft() >= MAX_BUFFER_SIZE ? NGHTTP2_ERR_PAUSE : 0; } } // namespace @@ -851,17 +818,15 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() { } Http2Upstream::Http2Upstream(ClientHandler *handler) - : downstream_queue_( + : wb_(handler->get_worker()->get_mcpool()), + downstream_queue_( get_config()->http2_proxy ? get_config()->conn.downstream.connections_per_host : get_config()->conn.downstream.proto == PROTO_HTTP ? get_config()->conn.downstream.connections_per_frontend : 0, !get_config()->http2_proxy), - pending_response_buf_(handler->get_worker()->get_mcpool()), - pending_data_downstream_(nullptr), handler_(handler), session_(nullptr), - data_pending_(nullptr), data_pendinglen_(0), padding_pendinglen_(0), - shutdown_handled_(false) { + handler_(handler), session_(nullptr), shutdown_handled_(false) { int rv; @@ -963,66 +928,11 @@ int Http2Upstream::on_read() { // After this function call, downstream may be deleted. int Http2Upstream::on_write() { - if (wb_.rleft() == 0) { - wb_.reset(); - } - - if (data_pendinglen_ > 0) { - if (data_pending_) { - auto n = std::min(wb_.wleft(), data_pendinglen_); - wb_.write(data_pending_, n); - data_pending_ += n; - data_pendinglen_ -= n; - - if (data_pendinglen_ > 0) { - return 0; - } - - data_pending_ = nullptr; - } else { - auto nwrite = std::min(wb_.wleft(), data_pendinglen_); - DefaultMemchunks *body; - if (pending_data_downstream_) { - body = pending_data_downstream_->get_response_buf(); - } else { - body = &pending_response_buf_; - } - body->remove(wb_.last, nwrite); - wb_.write(nwrite); - data_pendinglen_ -= nwrite; - - if (pending_data_downstream_ && nwrite > 0) { - if (pending_data_downstream_->resume_read(SHRPX_NO_BUFFER, nwrite) != - 0) { - return -1; - } - } - - if (data_pendinglen_ > 0) { - return 0; - } - - if (pending_data_downstream_) { - pending_data_downstream_ = nullptr; - } else { - // Downstream was already deleted, and we don't need its - // response data. - body->reset(); - } - } - } - - if (padding_pendinglen_ > 0) { - auto nwrite = std::min(wb_.wleft(), padding_pendinglen_); - wb_.write(PADDING.data(), nwrite); - padding_pendinglen_ -= nwrite; - - if (padding_pendinglen_ > 0) { + for (;;) { + if (wb_.rleft() >= MAX_BUFFER_SIZE) { return 0; } - } - for (;;) { const uint8_t *data; auto datalen = nghttp2_session_mem_send(session_, &data); @@ -1034,12 +944,7 @@ int Http2Upstream::on_write() { if (datalen == 0) { break; } - auto n = wb_.write(data, datalen); - if (n < static_cast(datalen)) { - data_pending_ = data + n; - data_pendinglen_ = datalen - n; - return 0; - } + wb_.append(data, datalen); } if (nghttp2_session_want_read(session_) == 0 && @@ -1409,11 +1314,6 @@ void Http2Upstream::remove_downstream(Downstream *downstream) { nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(), nullptr); - if (downstream == pending_data_downstream_) { - pending_data_downstream_ = nullptr; - pending_response_buf_ = downstream->pop_response_buf(); - } - auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream); if (next_downstream) { @@ -1999,17 +1899,14 @@ int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const { return 0; } - iov->iov_base = wb_.pos; - iov->iov_len = wb_.rleft(); - - return 1; + return wb_.riovec(iov, iovcnt); } void Http2Upstream::response_drain(size_t n) { wb_.drain(n); } bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; } -Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; } +DefaultMemchunks *Http2Upstream::get_response_buf() { return &wb_; } Downstream * Http2Upstream::on_downstream_push_promise(Downstream *downstream, diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 0839dc66..d090f149 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -118,46 +118,21 @@ public: int on_request_headers(Downstream *downstream, const nghttp2_frame *frame); - using WriteBuffer = Buffer<32_k>; - - WriteBuffer *get_response_buf(); - - void set_pending_data_downstream(Downstream *downstream, size_t n, - size_t padlen); + DefaultMemchunks *get_response_buf(); // Changes stream priority of |downstream|, which is assumed to be a // pushed stream. int adjust_pushed_stream_priority(Downstream *downstream); private: - WriteBuffer wb_; + DefaultMemchunks wb_; std::unique_ptr pre_upstream_; DownstreamQueue downstream_queue_; ev_timer settings_timer_; ev_timer shutdown_timer_; ev_prepare prep_; - // A response buffer used to belong to Downstream object. This is - // moved here when response is partially written to wb_ in - // send_data_callback, but before writing them all, Downstream - // object was destroyed. On destruction of Downstream, - // pending_data_downstream_ becomes nullptr. - DefaultMemchunks pending_response_buf_; - // Downstream object whose DATA frame payload is partillay written - // to wb_ in send_data_callback. This field exists to keep track of - // its lifetime. When it is destroyed, its response buffer is - // transferred to pending_response_buf_, and this field becomes - // nullptr. - Downstream *pending_data_downstream_; ClientHandler *handler_; nghttp2_session *session_; - const uint8_t *data_pending_; - // The length of lending data to be written into wb_. If - // data_pending_ is not nullptr, data_pending_ points to the data to - // write. Otherwise, pending_data_downstream_->get_response_buf() - // if pending_data_downstream_ is not nullptr, or - // pending_response_buf_ holds data to write. - size_t data_pendinglen_; - size_t padding_pendinglen_; bool flow_control_; bool shutdown_handled_; }; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 4858fa7d..d9e8df4a 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -49,19 +49,23 @@ using namespace nghttp2; namespace shrpx { +namespace { +constexpr size_t MAX_BUFFER_SIZE = 32_k; +} // namespace + namespace { ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len, int flags, void *user_data) { auto upstream = static_cast(user_data); auto wb = upstream->get_response_buf(); - if (wb->wleft() == 0) { + if (wb->rleft() >= MAX_BUFFER_SIZE) { return SPDYLAY_ERR_WOULDBLOCK; } - auto nread = wb->write(data, len); + wb->append(data, len); - return nread; + return len; } } // namespace @@ -492,7 +496,8 @@ uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_error_code) { } // namespace SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler) - : downstream_queue_( + : wb_(handler->get_worker()->get_mcpool()), + downstream_queue_( get_config()->http2_proxy ? get_config()->conn.downstream.connections_per_host : get_config()->conn.downstream.proto == PROTO_HTTP @@ -586,8 +591,8 @@ int SpdyUpstream::on_read() { int SpdyUpstream::on_write() { int rv = 0; - if (wb_.rleft() == 0) { - wb_.reset(); + if (wb_.rleft() >= MAX_BUFFER_SIZE) { + return 0; } rv = spdylay_session_send(session_); @@ -1248,17 +1253,14 @@ int SpdyUpstream::response_riovec(struct iovec *iov, int iovcnt) const { return 0; } - iov->iov_base = wb_.pos; - iov->iov_len = wb_.rleft(); - - return 1; + return wb_.riovec(iov, iovcnt); } void SpdyUpstream::response_drain(size_t n) { wb_.drain(n); } bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; } -SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; } +DefaultMemchunks *SpdyUpstream::get_response_buf() { return &wb_; } Downstream * SpdyUpstream::on_downstream_push_promise(Downstream *downstream, diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 1a40e6dc..403c3e5a 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -97,12 +97,10 @@ public: void start_downstream(Downstream *downstream); void initiate_downstream(Downstream *downstream); - using WriteBuffer = Buffer<32_k>; - - WriteBuffer *get_response_buf(); + DefaultMemchunks *get_response_buf(); private: - WriteBuffer wb_; + DefaultMemchunks wb_; DownstreamQueue downstream_queue_; ClientHandler *handler_; spdylay_session *session_;