From 6b1ef95d3f8a19c78c530ab02571c93297e84aa0 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 29 Jan 2015 22:47:37 +0900 Subject: [PATCH] nghttpx: Replace RingBuf with sequential Buffer It turns out that we don't need circular buffer functionality. We replaced RingBuf with simple sequential Buffer. --- src/Makefile.am | 3 +- src/buffer.h | 67 +++++++++++ src/buffer_test.cc | 78 +++++++++++++ src/buffer_test.h | 34 ++++++ src/shrpx-unittest.cc | 2 + src/shrpx_client_handler.cc | 174 ++++++++++++--------------- src/shrpx_client_handler.h | 6 +- src/shrpx_http2_upstream.cc | 17 +-- src/shrpx_https_upstream.cc | 227 ++++++++++++++++++------------------ src/shrpx_spdy_upstream.cc | 9 +- 10 files changed, 384 insertions(+), 233 deletions(-) create mode 100644 src/buffer.h create mode 100644 src/buffer_test.cc create mode 100644 src/buffer_test.h diff --git a/src/Makefile.am b/src/Makefile.am index 585c5042..da7ebacb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -119,7 +119,7 @@ NGHTTPX_SRCS = \ shrpx_connect_blocker.cc shrpx_connect_blocker.h \ shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \ shrpx_rate_limit.cc shrpx_rate_limit.h \ - ringbuf.h memchunk.h + buffer.h memchunk.h if HAVE_SPDYLAY NGHTTPX_SRCS += shrpx_spdy_upstream.cc shrpx_spdy_upstream.h @@ -142,6 +142,7 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \ nghttp2_gzip_test.c nghttp2_gzip_test.h \ nghttp2_gzip.c nghttp2_gzip.h \ ringbuf_test.cc ringbuf_test.h \ + buffer_test.cc buffer_test.h \ memchunk_test.cc memchunk_test.h nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS}\ -DNGHTTP2_TESTS_DIR=\"$(top_srcdir)/tests\" diff --git a/src/buffer.h b/src/buffer.h new file mode 100644 index 00000000..ab1c4b54 --- /dev/null +++ b/src/buffer.h @@ -0,0 +1,67 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2014 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef BUFFER_H +#define BUFFER_H + +#include "nghttp2_config.h" + +#include +#include + +namespace nghttp2 { + +template struct Buffer { + Buffer() : pos(begin), last(begin) {} + // Returns the number of bytes to read. + size_t rleft() const { return last - pos; } + // Returns the number of bytes this buffer can store. + size_t wleft() const { return begin + N - last; } + // Writes up to min(wleft(), |count|) bytes from buffer pointed by + // |buf|. Returns number of bytes written. + size_t write(const void *buf, size_t count) { + count = std::min(count, wleft()); + memcpy(last, buf, count); + last += count; + return count; + } + size_t write(size_t count) { + count = std::min(count, wleft()); + last += count; + return count; + } + // Drains min(rleft(), |count|) bytes from start of the buffer. + size_t drain(size_t count) { + count = std::min(count, rleft()); + pos += count; + return count; + } + void reset() { pos = last = begin; } + uint8_t begin[N]; + uint8_t *pos, *last; +}; + +} // namespace nghttp2 + +#endif // RINGBUF_H diff --git a/src/buffer_test.cc b/src/buffer_test.cc new file mode 100644 index 00000000..0c9354d9 --- /dev/null +++ b/src/buffer_test.cc @@ -0,0 +1,78 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "buffer_test.h" + +#include +#include +#include + +#include + +#include + +#include "buffer.h" + +namespace nghttp2 { + +void test_buffer_write(void) { + Buffer<16> b; + CU_ASSERT(0 == b.rleft()); + CU_ASSERT(16 == b.wleft()); + + b.write("012", 3); + + CU_ASSERT(3 == b.rleft()); + CU_ASSERT(13 == b.wleft()); + CU_ASSERT(0 == b.pos - b.begin); + + b.drain(3); + + CU_ASSERT(0 == b.rleft()); + CU_ASSERT(13 == b.wleft()); + CU_ASSERT(3 == b.pos - b.begin); + + auto n = b.write("0123456789ABCDEF", 16); + + CU_ASSERT(n == 13); + + CU_ASSERT(13 == b.rleft()); + CU_ASSERT(0 == b.wleft()); + CU_ASSERT(3 == b.pos - b.begin); + CU_ASSERT(0 == memcmp(b.pos, "0123456789ABC", 13)); + + b.reset(); + + CU_ASSERT(0 == b.rleft()); + CU_ASSERT(0 == b.wleft()); + CU_ASSERT(0 == b.pos - b.begin); + + b.write(5); + + CU_ASSERT(5 == b.rleft()); + CU_ASSERT(11 == b.wleft()); + CU_ASSERT(0 == b.pos - b.begin); +} + +} // namespace nghttp2 diff --git a/src/buffer_test.h b/src/buffer_test.h new file mode 100644 index 00000000..d1a889b5 --- /dev/null +++ b/src/buffer_test.h @@ -0,0 +1,34 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef BUFFER_TEST_H +#define BUFFER_TEST_H + +namespace nghttp2 { + +void test_buffer_write(void); + +} // namespace nghttp2 + +#endif // BUFFER_TEST_H diff --git a/src/shrpx-unittest.cc b/src/shrpx-unittest.cc index 900e8f4a..f513a630 100644 --- a/src/shrpx-unittest.cc +++ b/src/shrpx-unittest.cc @@ -39,6 +39,7 @@ #include "util_test.h" #include "nghttp2_gzip_test.h" #include "ringbuf_test.h" +#include "buffer_test.h" #include "memchunk_test.h" #include "shrpx_config.h" @@ -139,6 +140,7 @@ int main(int argc, char *argv[]) { !CU_add_test(pSuite, "gzip_inflate", test_nghttp2_gzip_inflate) || !CU_add_test(pSuite, "ringbuf_write", nghttp2::test_ringbuf_write) || !CU_add_test(pSuite, "ringbuf_iovec", nghttp2::test_ringbuf_iovec) || + !CU_add_test(pSuite, "buffer_write", nghttp2::test_ringbuf_write) || !CU_add_test(pSuite, "pool_recycle", nghttp2::test_pool_recycle) || !CU_add_test(pSuite, "memchunk_append", nghttp2::test_memchunks_append) || !CU_add_test(pSuite, "memchunk_drain", nghttp2::test_memchunks_drain) || diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 8b5d449e..544fcc00 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -105,15 +105,13 @@ int ClientHandler::read_clear() { return 0; } rb_.reset(); - struct iovec iov[2]; - auto iovcnt = rb_.wiovec(iov); - iovcnt = limit_iovec(iov, iovcnt, rlimit_.avail()); - if (iovcnt == 0) { + + ssize_t nread = std::min(rb_.wleft(), rlimit_.avail()); + if (nread == 0) { break; } - ssize_t nread; - while ((nread = readv(fd_, iov, iovcnt)) == -1 && errno == EINTR) + while ((nread = read(fd_, rb_.last, nread)) == -1 && errno == EINTR) ; if (nread == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -138,15 +136,12 @@ int ClientHandler::write_clear() { for (;;) { if (wb_.rleft() > 0) { - struct iovec iov[2]; - auto iovcnt = wb_.riovec(iov); - iovcnt = limit_iovec(iov, iovcnt, wlimit_.avail()); - if (iovcnt == 0) { + ssize_t nwrite = std::min(wb_.rleft(), wlimit_.avail()); + if (nwrite == 0) { return 0; } - ssize_t nwrite; - while ((nwrite = writev(fd_, iov, iovcnt)) == -1 && errno == EINTR) + while ((nwrite = write(fd_, wb_.pos, nwrite)) == -1 && errno == EINTR) ; if (nwrite == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -238,8 +233,8 @@ int ClientHandler::read_tls() { return 0; } rb_.reset(); - struct iovec iov[2]; - auto iovcnt = rb_.wiovec(iov); + + ssize_t nread; // SSL_read requires the same arguments (buf pointer and its // length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE. // rlimit_.avail() or rlimit_.avail() may return different length @@ -248,16 +243,16 @@ int ClientHandler::read_tls() { // to SSL_read to tls_last_readlen_ if SSL_read indicated I/O // blocking. if (tls_last_readlen_ == 0) { - iovcnt = limit_iovec(iov, iovcnt, rlimit_.avail()); - if (iovcnt == 0) { + nread = std::min(rb_.wleft(), rlimit_.avail()); + if (nread == 0) { return 0; } } else { - assert(iov[0].iov_len == tls_last_readlen_); + nread = tls_last_readlen_; tls_last_readlen_ = 0; } - auto rv = SSL_read(ssl_, iov[0].iov_base, iov[0].iov_len); + auto rv = SSL_read(ssl_, rb_.last, nread); if (rv == 0) { return -1; @@ -267,7 +262,7 @@ int ClientHandler::read_tls() { auto err = SSL_get_error(ssl_, rv); switch (err) { case SSL_ERROR_WANT_READ: - tls_last_readlen_ = iov[0].iov_len; + tls_last_readlen_ = nread; return 0; case SSL_ERROR_WANT_WRITE: if (LOG_ENABLED(INFO)) { @@ -294,10 +289,7 @@ int ClientHandler::write_tls() { for (;;) { if (wb_.rleft() > 0) { - const void *p; - size_t len; - std::tie(p, len) = wb_.get(); - + ssize_t nwrite; // SSL_write requires the same arguments (buf pointer and its // length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE. // get_write_limit() may return smaller length than previously @@ -305,23 +297,21 @@ int ClientHandler::write_tls() { // avoid this, we keep last legnth passed to SSL_write to // tls_last_writelen_ if SSL_write indicated I/O blocking. if (tls_last_writelen_ == 0) { - len = std::min(len, wlimit_.avail()); - if (len == 0) { + nwrite = std::min(wb_.rleft(), wlimit_.avail()); + if (nwrite == 0) { return 0; } auto limit = get_write_limit(); if (limit != -1) { - len = std::min(len, static_cast(limit)); + nwrite = std::min(nwrite, limit); } } else { - assert(len >= tls_last_writelen_); - - len = tls_last_writelen_; + nwrite = tls_last_writelen_; tls_last_writelen_ = 0; } - auto rv = SSL_write(ssl_, p, len); + auto rv = SSL_write(ssl_, wb_.pos, nwrite); if (rv == 0) { return -1; @@ -338,7 +328,7 @@ int ClientHandler::write_tls() { } return -1; case SSL_ERROR_WANT_WRITE: - tls_last_writelen_ = len; + tls_last_writelen_ = nwrite; wlimit_.startw(); ev_timer_again(loop_, &wt_); return 0; @@ -396,85 +386,75 @@ int ClientHandler::upstream_write() { } int ClientHandler::upstream_http2_connhd_read() { - struct iovec iov[2]; - auto iovcnt = rb_.riovec(iov); - for (int i = 0; i < iovcnt; ++i) { - auto nread = - std::min(left_connhd_len_, static_cast(iov[i].iov_len)); - if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE + - NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_, - iov[i].iov_base, nread) != 0) { - // There is no downgrade path here. Just drop the connection. - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "invalid client connection header"; - } + auto nread = std::min(left_connhd_len_, rb_.rleft()); + if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE + + NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_, + rb_.pos, nread) != 0) { + // There is no downgrade path here. Just drop the connection. + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "invalid client connection header"; + } + return -1; + } + + left_connhd_len_ -= nread; + rb_.drain(nread); + + if (left_connhd_len_ == 0) { + on_read_ = &ClientHandler::upstream_read; + // Run on_read to process data left in buffer since they are not + // notified further + if (on_read() != 0) { return -1; } - - left_connhd_len_ -= nread; - rb_.drain(nread); - - if (left_connhd_len_ == 0) { - on_read_ = &ClientHandler::upstream_read; - // Run on_read to process data left in buffer since they are not - // notified further - if (on_read() != 0) { - return -1; - } - return 0; - } + return 0; } return 0; } int ClientHandler::upstream_http1_connhd_read() { - struct iovec iov[2]; - auto iovcnt = rb_.riovec(iov); - for (int i = 0; i < iovcnt; ++i) { - auto nread = - std::min(left_connhd_len_, static_cast(iov[i].iov_len)); - if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE + - NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_, - iov[i].iov_base, nread) != 0) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "This is HTTP/1.1 connection, " - << "but may be upgraded to HTTP/2 later."; - } - - // Reset header length for later HTTP/2 upgrade - left_connhd_len_ = NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN; - on_read_ = &ClientHandler::upstream_read; - on_write_ = &ClientHandler::upstream_write; - - if (on_read() != 0) { - return -1; - } - - return 0; + auto nread = std::min(left_connhd_len_, rb_.rleft()); + if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE + + NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_, + rb_.pos, nread) != 0) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "This is HTTP/1.1 connection, " + << "but may be upgraded to HTTP/2 later."; } - left_connhd_len_ -= nread; - rb_.drain(nread); + // Reset header length for later HTTP/2 upgrade + left_connhd_len_ = NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN; + on_read_ = &ClientHandler::upstream_read; + on_write_ = &ClientHandler::upstream_write; - if (left_connhd_len_ == 0) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "direct HTTP/2 connection"; - } - - direct_http2_upgrade(); - on_read_ = &ClientHandler::upstream_read; - on_write_ = &ClientHandler::upstream_write; - - // Run on_read to process data left in buffer since they are not - // notified further - if (on_read() != 0) { - return -1; - } - - return 0; + if (on_read() != 0) { + return -1; } + + return 0; + } + + left_connhd_len_ -= nread; + rb_.drain(nread); + + if (left_connhd_len_ == 0) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "direct HTTP/2 connection"; + } + + direct_http2_upgrade(); + on_read_ = &ClientHandler::upstream_read; + on_write_ = &ClientHandler::upstream_write; + + // Run on_read to process data left in buffer since they are not + // notified further + if (on_read() != 0) { + return -1; + } + + return 0; } return 0; diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 9ad7bf47..3e7243d4 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -34,7 +34,7 @@ #include #include "shrpx_rate_limit.h" -#include "ringbuf.h" +#include "buffer.h" using namespace nghttp2; @@ -135,8 +135,8 @@ public: int64_t body_bytes_sent); WorkerStat *get_worker_stat() const; - using WriteBuf = RingBuf<65536>; - using ReadBuf = RingBuf<8192>; + using WriteBuf = Buffer<65536>; + using ReadBuf = Buffer<8192>; WriteBuf *get_wb(); ReadBuf *get_rb(); diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 649fae00..bf586a3d 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -777,23 +777,18 @@ int Http2Upstream::on_read() { ssize_t rv = 0; auto rb = handler_->get_rb(); - for (;;) { - const void *data; - size_t nread; - std::tie(data, nread) = rb->get(); - if (nread == 0) { - break; - } - - rv = nghttp2_session_mem_recv( - session_, reinterpret_cast(data), nread); + if (rb->rleft()) { + rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft()); if (rv < 0) { ULOG(ERROR, this) << "nghttp2_session_recv() returned error: " << nghttp2_strerror(rv); return -1; } - rb->drain(nread); + // nghttp2_session_mem_recv should consume all input bytes on + // success. + assert(static_cast(rv) == rb->rleft()); + rb->reset(); } auto wb = handler_->get_wb(); diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index fac33d81..701b2ab3 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -248,135 +248,127 @@ http_parser_settings htp_hooks = { int HttpsUpstream::on_read() { auto rb = handler_->get_rb(); auto downstream = get_downstream(); - const void *data; - size_t datalen; + + if (rb->rleft() == 0) { + return 0; + } // downstream can be nullptr here, because it is initialized in the // callback chain called by http_parser_execute() if (downstream && downstream->get_upgraded()) { - for (;;) { - std::tie(data, datalen) = rb->get(); - if (datalen == 0) { - return 0; - } - auto rv = downstream->push_upload_data_chunk( - reinterpret_cast(data), datalen); + auto rv = downstream->push_upload_data_chunk(rb->pos, rb->rleft()); - if (rv != 0) { - return -1; - } - - rb->drain(datalen); - - if (downstream->request_buf_full()) { - if (LOG_ENABLED(INFO)) { - ULOG(INFO, this) << "Downstream request buf is full"; - } - pause_read(SHRPX_NO_BUFFER); - - return 0; - } + if (rv != 0) { + return -1; } + + rb->reset(); + + if (downstream->request_buf_full()) { + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Downstream request buf is full"; + } + pause_read(SHRPX_NO_BUFFER); + + return 0; + } + + return 0; } - for (;;) { - std::tie(data, datalen) = rb->get(); - if (datalen == 0) { + auto nread = http_parser_execute( + &htp_, &htp_hooks, reinterpret_cast(rb->pos), rb->rleft()); + + rb->drain(nread); + + // Well, actually header length + some body bytes + current_header_length_ += nread; + + // Get downstream again because it may be initialized in http parser + // execution + downstream = get_downstream(); + + auto handler = get_client_handler(); + auto htperr = HTTP_PARSER_ERRNO(&htp_); + + if (htperr == HPE_PAUSED) { + + assert(downstream); + + if (downstream->get_request_state() == Downstream::CONNECT_FAIL) { + // Following paues_read is needed to avoid reading next data. + pause_read(SHRPX_MSG_BLOCK); + error_reply(503); + handler_->signal_write(); + // Downstream gets deleted after response body is read. return 0; } - auto nread = http_parser_execute( - &htp_, &htp_hooks, reinterpret_cast(data), datalen); + assert(downstream->get_request_state() == Downstream::MSG_COMPLETE); - rb->drain(nread); - - // Well, actually header length + some body bytes - current_header_length_ += nread; - - // Get downstream again because it may be initialized in http parser - // execution - downstream = get_downstream(); - - auto handler = get_client_handler(); - auto htperr = HTTP_PARSER_ERRNO(&htp_); - - if (htperr == HPE_PAUSED) { - - assert(downstream); - - if (downstream->get_request_state() == Downstream::CONNECT_FAIL) { - // Following paues_read is needed to avoid reading next data. - pause_read(SHRPX_MSG_BLOCK); - error_reply(503); - handler_->signal_write(); - // Downstream gets deleted after response body is read. - return 0; - } - - assert(downstream->get_request_state() == Downstream::MSG_COMPLETE); - - if (downstream->get_downstream_connection() == nullptr) { - // Error response has already be sent - assert(downstream->get_response_state() == Downstream::MSG_COMPLETE); - delete_downstream(); - - return 0; - } - - if (handler->get_http2_upgrade_allowed() && - downstream->get_http2_upgrade_request()) { - - if (handler->perform_http2_upgrade(this) != 0) { - return -1; - } - - handler_->signal_write(); - - return 0; - } - - pause_read(SHRPX_MSG_BLOCK); + if (downstream->get_downstream_connection() == nullptr) { + // Error response has already be sent + assert(downstream->get_response_state() == Downstream::MSG_COMPLETE); + delete_downstream(); return 0; } - if (htperr != HPE_OK) { - if (LOG_ENABLED(INFO)) { - ULOG(INFO, this) << "HTTP parse failure: " - << "(" << http_errno_name(htperr) << ") " - << http_errno_description(htperr); + if (handler->get_http2_upgrade_allowed() && + downstream->get_http2_upgrade_request()) { + + if (handler->perform_http2_upgrade(this) != 0) { + return -1; } - pause_read(SHRPX_MSG_BLOCK); - - unsigned int status_code; - - if (downstream && - downstream->get_request_state() == Downstream::CONNECT_FAIL) { - status_code = 503; - } else { - status_code = 400; - } - - error_reply(status_code); - handler_->signal_write(); return 0; } - // downstream can be NULL here. - if (downstream && downstream->request_buf_full()) { - if (LOG_ENABLED(INFO)) { - ULOG(INFO, this) << "Downstream request buffer is full"; - } + pause_read(SHRPX_MSG_BLOCK); - pause_read(SHRPX_NO_BUFFER); - - return 0; - } + return 0; } + + if (htperr != HPE_OK) { + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "HTTP parse failure: " + << "(" << http_errno_name(htperr) << ") " + << http_errno_description(htperr); + } + + pause_read(SHRPX_MSG_BLOCK); + + unsigned int status_code; + + if (downstream && + downstream->get_request_state() == Downstream::CONNECT_FAIL) { + status_code = 503; + } else { + status_code = 400; + } + + error_reply(status_code); + + handler_->signal_write(); + + return 0; + } + + // downstream can be NULL here. + if (downstream && downstream->request_buf_full()) { + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Downstream request buffer is full"; + } + + pause_read(SHRPX_NO_BUFFER); + + return 0; + } + + return 0; } int HttpsUpstream::on_write() { @@ -384,24 +376,29 @@ int HttpsUpstream::on_write() { if (!downstream) { return 0; } - auto dconn = downstream->get_downstream_connection(); auto wb = handler_->get_wb(); - if (wb->rleft() == 0 && dconn && + if (wb->wleft() == 0) { + return 0; + } + + auto dconn = downstream->get_downstream_connection(); + auto output = downstream->get_response_buf(); + + if (output->rleft() == 0 && dconn && downstream->get_response_state() != Downstream::MSG_COMPLETE) { + if (downstream->resume_read(SHRPX_NO_BUFFER, + downstream->get_response_datalen()) != 0) { + return -1; + } + if (downstream_read(dconn) != 0) { return -1; } } - struct iovec iov[2]; - auto iovcnt = wb->wiovec(iov); - if (iovcnt == 0) { - return 0; - } - auto output = downstream->get_response_buf(); - for (int i = 0; i < iovcnt; ++i) { - auto n = output->remove(iov[i].iov_base, iov[i].iov_len); - wb->write(n); - } + + auto n = output->remove(wb->last, wb->wleft()); + wb->write(n); + if (wb->rleft() > 0) { return 0; } diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index d7c5c04a..06b8414d 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -69,17 +69,14 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len, auto upstream = static_cast(user_data); auto handler = upstream->get_client_handler(); auto rb = handler->get_rb(); - const void *data; - size_t nread; - std::tie(data, nread) = rb->get(); - if (nread == 0) { + if (rb->rleft() == 0) { return SPDYLAY_ERR_WOULDBLOCK; } - nread = std::min(nread, len); + auto nread = std::min(rb->rleft(), len); - memcpy(buf, data, nread); + memcpy(buf, rb->pos, nread); rb->drain(nread); return nread;