nghttpx: Replace RingBuf with sequential Buffer

It turns out that we don't need circular buffer functionality.  We
replaced RingBuf with simple sequential Buffer.
This commit is contained in:
Tatsuhiro Tsujikawa 2015-01-29 22:47:37 +09:00
parent 147bc45658
commit 6b1ef95d3f
10 changed files with 384 additions and 233 deletions

View File

@ -119,7 +119,7 @@ NGHTTPX_SRCS = \
shrpx_connect_blocker.cc shrpx_connect_blocker.h \
shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \
shrpx_rate_limit.cc shrpx_rate_limit.h \
ringbuf.h memchunk.h
buffer.h memchunk.h
if HAVE_SPDYLAY
NGHTTPX_SRCS += shrpx_spdy_upstream.cc shrpx_spdy_upstream.h
@ -142,6 +142,7 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \
nghttp2_gzip_test.c nghttp2_gzip_test.h \
nghttp2_gzip.c nghttp2_gzip.h \
ringbuf_test.cc ringbuf_test.h \
buffer_test.cc buffer_test.h \
memchunk_test.cc memchunk_test.h
nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS}\
-DNGHTTP2_TESTS_DIR=\"$(top_srcdir)/tests\"

67
src/buffer.h Normal file
View File

@ -0,0 +1,67 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef BUFFER_H
#define BUFFER_H
#include "nghttp2_config.h"
#include <cstring>
#include <algorithm>
namespace nghttp2 {
template <size_t N> struct Buffer {
Buffer() : pos(begin), last(begin) {}
// Returns the number of bytes to read.
size_t rleft() const { return last - pos; }
// Returns the number of bytes this buffer can store.
size_t wleft() const { return begin + N - last; }
// Writes up to min(wleft(), |count|) bytes from buffer pointed by
// |buf|. Returns number of bytes written.
size_t write(const void *buf, size_t count) {
count = std::min(count, wleft());
memcpy(last, buf, count);
last += count;
return count;
}
size_t write(size_t count) {
count = std::min(count, wleft());
last += count;
return count;
}
// Drains min(rleft(), |count|) bytes from start of the buffer.
size_t drain(size_t count) {
count = std::min(count, rleft());
pos += count;
return count;
}
void reset() { pos = last = begin; }
uint8_t begin[N];
uint8_t *pos, *last;
};
} // namespace nghttp2
#endif // RINGBUF_H

78
src/buffer_test.cc Normal file
View File

@ -0,0 +1,78 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2015 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "buffer_test.h"
#include <cstring>
#include <iostream>
#include <tuple>
#include <CUnit/CUnit.h>
#include <nghttp2/nghttp2.h>
#include "buffer.h"
namespace nghttp2 {
void test_buffer_write(void) {
Buffer<16> b;
CU_ASSERT(0 == b.rleft());
CU_ASSERT(16 == b.wleft());
b.write("012", 3);
CU_ASSERT(3 == b.rleft());
CU_ASSERT(13 == b.wleft());
CU_ASSERT(0 == b.pos - b.begin);
b.drain(3);
CU_ASSERT(0 == b.rleft());
CU_ASSERT(13 == b.wleft());
CU_ASSERT(3 == b.pos - b.begin);
auto n = b.write("0123456789ABCDEF", 16);
CU_ASSERT(n == 13);
CU_ASSERT(13 == b.rleft());
CU_ASSERT(0 == b.wleft());
CU_ASSERT(3 == b.pos - b.begin);
CU_ASSERT(0 == memcmp(b.pos, "0123456789ABC", 13));
b.reset();
CU_ASSERT(0 == b.rleft());
CU_ASSERT(0 == b.wleft());
CU_ASSERT(0 == b.pos - b.begin);
b.write(5);
CU_ASSERT(5 == b.rleft());
CU_ASSERT(11 == b.wleft());
CU_ASSERT(0 == b.pos - b.begin);
}
} // namespace nghttp2

34
src/buffer_test.h Normal file
View File

@ -0,0 +1,34 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2015 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef BUFFER_TEST_H
#define BUFFER_TEST_H
namespace nghttp2 {
void test_buffer_write(void);
} // namespace nghttp2
#endif // BUFFER_TEST_H

View File

@ -39,6 +39,7 @@
#include "util_test.h"
#include "nghttp2_gzip_test.h"
#include "ringbuf_test.h"
#include "buffer_test.h"
#include "memchunk_test.h"
#include "shrpx_config.h"
@ -139,6 +140,7 @@ int main(int argc, char *argv[]) {
!CU_add_test(pSuite, "gzip_inflate", test_nghttp2_gzip_inflate) ||
!CU_add_test(pSuite, "ringbuf_write", nghttp2::test_ringbuf_write) ||
!CU_add_test(pSuite, "ringbuf_iovec", nghttp2::test_ringbuf_iovec) ||
!CU_add_test(pSuite, "buffer_write", nghttp2::test_ringbuf_write) ||
!CU_add_test(pSuite, "pool_recycle", nghttp2::test_pool_recycle) ||
!CU_add_test(pSuite, "memchunk_append", nghttp2::test_memchunks_append) ||
!CU_add_test(pSuite, "memchunk_drain", nghttp2::test_memchunks_drain) ||

View File

@ -105,15 +105,13 @@ int ClientHandler::read_clear() {
return 0;
}
rb_.reset();
struct iovec iov[2];
auto iovcnt = rb_.wiovec(iov);
iovcnt = limit_iovec(iov, iovcnt, rlimit_.avail());
if (iovcnt == 0) {
ssize_t nread = std::min(rb_.wleft(), rlimit_.avail());
if (nread == 0) {
break;
}
ssize_t nread;
while ((nread = readv(fd_, iov, iovcnt)) == -1 && errno == EINTR)
while ((nread = read(fd_, rb_.last, nread)) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
@ -138,15 +136,12 @@ int ClientHandler::write_clear() {
for (;;) {
if (wb_.rleft() > 0) {
struct iovec iov[2];
auto iovcnt = wb_.riovec(iov);
iovcnt = limit_iovec(iov, iovcnt, wlimit_.avail());
if (iovcnt == 0) {
ssize_t nwrite = std::min(wb_.rleft(), wlimit_.avail());
if (nwrite == 0) {
return 0;
}
ssize_t nwrite;
while ((nwrite = writev(fd_, iov, iovcnt)) == -1 && errno == EINTR)
while ((nwrite = write(fd_, wb_.pos, nwrite)) == -1 && errno == EINTR)
;
if (nwrite == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
@ -238,8 +233,8 @@ int ClientHandler::read_tls() {
return 0;
}
rb_.reset();
struct iovec iov[2];
auto iovcnt = rb_.wiovec(iov);
ssize_t nread;
// SSL_read requires the same arguments (buf pointer and its
// length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.
// rlimit_.avail() or rlimit_.avail() may return different length
@ -248,16 +243,16 @@ int ClientHandler::read_tls() {
// to SSL_read to tls_last_readlen_ if SSL_read indicated I/O
// blocking.
if (tls_last_readlen_ == 0) {
iovcnt = limit_iovec(iov, iovcnt, rlimit_.avail());
if (iovcnt == 0) {
nread = std::min(rb_.wleft(), rlimit_.avail());
if (nread == 0) {
return 0;
}
} else {
assert(iov[0].iov_len == tls_last_readlen_);
nread = tls_last_readlen_;
tls_last_readlen_ = 0;
}
auto rv = SSL_read(ssl_, iov[0].iov_base, iov[0].iov_len);
auto rv = SSL_read(ssl_, rb_.last, nread);
if (rv == 0) {
return -1;
@ -267,7 +262,7 @@ int ClientHandler::read_tls() {
auto err = SSL_get_error(ssl_, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
tls_last_readlen_ = iov[0].iov_len;
tls_last_readlen_ = nread;
return 0;
case SSL_ERROR_WANT_WRITE:
if (LOG_ENABLED(INFO)) {
@ -294,10 +289,7 @@ int ClientHandler::write_tls() {
for (;;) {
if (wb_.rleft() > 0) {
const void *p;
size_t len;
std::tie(p, len) = wb_.get();
ssize_t nwrite;
// SSL_write requires the same arguments (buf pointer and its
// length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.
// get_write_limit() may return smaller length than previously
@ -305,23 +297,21 @@ int ClientHandler::write_tls() {
// avoid this, we keep last legnth passed to SSL_write to
// tls_last_writelen_ if SSL_write indicated I/O blocking.
if (tls_last_writelen_ == 0) {
len = std::min(len, wlimit_.avail());
if (len == 0) {
nwrite = std::min(wb_.rleft(), wlimit_.avail());
if (nwrite == 0) {
return 0;
}
auto limit = get_write_limit();
if (limit != -1) {
len = std::min(len, static_cast<size_t>(limit));
nwrite = std::min(nwrite, limit);
}
} else {
assert(len >= tls_last_writelen_);
len = tls_last_writelen_;
nwrite = tls_last_writelen_;
tls_last_writelen_ = 0;
}
auto rv = SSL_write(ssl_, p, len);
auto rv = SSL_write(ssl_, wb_.pos, nwrite);
if (rv == 0) {
return -1;
@ -338,7 +328,7 @@ int ClientHandler::write_tls() {
}
return -1;
case SSL_ERROR_WANT_WRITE:
tls_last_writelen_ = len;
tls_last_writelen_ = nwrite;
wlimit_.startw();
ev_timer_again(loop_, &wt_);
return 0;
@ -396,85 +386,75 @@ int ClientHandler::upstream_write() {
}
int ClientHandler::upstream_http2_connhd_read() {
struct iovec iov[2];
auto iovcnt = rb_.riovec(iov);
for (int i = 0; i < iovcnt; ++i) {
auto nread =
std::min(left_connhd_len_, static_cast<size_t>(iov[i].iov_len));
if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE +
NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_,
iov[i].iov_base, nread) != 0) {
// There is no downgrade path here. Just drop the connection.
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "invalid client connection header";
}
auto nread = std::min(left_connhd_len_, rb_.rleft());
if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE +
NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_,
rb_.pos, nread) != 0) {
// There is no downgrade path here. Just drop the connection.
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "invalid client connection header";
}
return -1;
}
left_connhd_len_ -= nread;
rb_.drain(nread);
if (left_connhd_len_ == 0) {
on_read_ = &ClientHandler::upstream_read;
// Run on_read to process data left in buffer since they are not
// notified further
if (on_read() != 0) {
return -1;
}
left_connhd_len_ -= nread;
rb_.drain(nread);
if (left_connhd_len_ == 0) {
on_read_ = &ClientHandler::upstream_read;
// Run on_read to process data left in buffer since they are not
// notified further
if (on_read() != 0) {
return -1;
}
return 0;
}
return 0;
}
return 0;
}
int ClientHandler::upstream_http1_connhd_read() {
struct iovec iov[2];
auto iovcnt = rb_.riovec(iov);
for (int i = 0; i < iovcnt; ++i) {
auto nread =
std::min(left_connhd_len_, static_cast<size_t>(iov[i].iov_len));
if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE +
NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_,
iov[i].iov_base, nread) != 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "This is HTTP/1.1 connection, "
<< "but may be upgraded to HTTP/2 later.";
}
// Reset header length for later HTTP/2 upgrade
left_connhd_len_ = NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN;
on_read_ = &ClientHandler::upstream_read;
on_write_ = &ClientHandler::upstream_write;
if (on_read() != 0) {
return -1;
}
return 0;
auto nread = std::min(left_connhd_len_, rb_.rleft());
if (memcmp(NGHTTP2_CLIENT_CONNECTION_PREFACE +
NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN - left_connhd_len_,
rb_.pos, nread) != 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "This is HTTP/1.1 connection, "
<< "but may be upgraded to HTTP/2 later.";
}
left_connhd_len_ -= nread;
rb_.drain(nread);
// Reset header length for later HTTP/2 upgrade
left_connhd_len_ = NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN;
on_read_ = &ClientHandler::upstream_read;
on_write_ = &ClientHandler::upstream_write;
if (left_connhd_len_ == 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "direct HTTP/2 connection";
}
direct_http2_upgrade();
on_read_ = &ClientHandler::upstream_read;
on_write_ = &ClientHandler::upstream_write;
// Run on_read to process data left in buffer since they are not
// notified further
if (on_read() != 0) {
return -1;
}
return 0;
if (on_read() != 0) {
return -1;
}
return 0;
}
left_connhd_len_ -= nread;
rb_.drain(nread);
if (left_connhd_len_ == 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "direct HTTP/2 connection";
}
direct_http2_upgrade();
on_read_ = &ClientHandler::upstream_read;
on_write_ = &ClientHandler::upstream_write;
// Run on_read to process data left in buffer since they are not
// notified further
if (on_read() != 0) {
return -1;
}
return 0;
}
return 0;

View File

@ -34,7 +34,7 @@
#include <openssl/ssl.h>
#include "shrpx_rate_limit.h"
#include "ringbuf.h"
#include "buffer.h"
using namespace nghttp2;
@ -135,8 +135,8 @@ public:
int64_t body_bytes_sent);
WorkerStat *get_worker_stat() const;
using WriteBuf = RingBuf<65536>;
using ReadBuf = RingBuf<8192>;
using WriteBuf = Buffer<65536>;
using ReadBuf = Buffer<8192>;
WriteBuf *get_wb();
ReadBuf *get_rb();

View File

@ -777,23 +777,18 @@ int Http2Upstream::on_read() {
ssize_t rv = 0;
auto rb = handler_->get_rb();
for (;;) {
const void *data;
size_t nread;
std::tie(data, nread) = rb->get();
if (nread == 0) {
break;
}
rv = nghttp2_session_mem_recv(
session_, reinterpret_cast<const uint8_t *>(data), nread);
if (rb->rleft()) {
rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft());
if (rv < 0) {
ULOG(ERROR, this) << "nghttp2_session_recv() returned error: "
<< nghttp2_strerror(rv);
return -1;
}
rb->drain(nread);
// nghttp2_session_mem_recv should consume all input bytes on
// success.
assert(static_cast<size_t>(rv) == rb->rleft());
rb->reset();
}
auto wb = handler_->get_wb();

View File

@ -248,135 +248,127 @@ http_parser_settings htp_hooks = {
int HttpsUpstream::on_read() {
auto rb = handler_->get_rb();
auto downstream = get_downstream();
const void *data;
size_t datalen;
if (rb->rleft() == 0) {
return 0;
}
// downstream can be nullptr here, because it is initialized in the
// callback chain called by http_parser_execute()
if (downstream && downstream->get_upgraded()) {
for (;;) {
std::tie(data, datalen) = rb->get();
if (datalen == 0) {
return 0;
}
auto rv = downstream->push_upload_data_chunk(
reinterpret_cast<const uint8_t *>(data), datalen);
auto rv = downstream->push_upload_data_chunk(rb->pos, rb->rleft());
if (rv != 0) {
return -1;
}
rb->drain(datalen);
if (downstream->request_buf_full()) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream request buf is full";
}
pause_read(SHRPX_NO_BUFFER);
return 0;
}
if (rv != 0) {
return -1;
}
rb->reset();
if (downstream->request_buf_full()) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream request buf is full";
}
pause_read(SHRPX_NO_BUFFER);
return 0;
}
return 0;
}
for (;;) {
std::tie(data, datalen) = rb->get();
if (datalen == 0) {
auto nread = http_parser_execute(
&htp_, &htp_hooks, reinterpret_cast<const char *>(rb->pos), rb->rleft());
rb->drain(nread);
// Well, actually header length + some body bytes
current_header_length_ += nread;
// Get downstream again because it may be initialized in http parser
// execution
downstream = get_downstream();
auto handler = get_client_handler();
auto htperr = HTTP_PARSER_ERRNO(&htp_);
if (htperr == HPE_PAUSED) {
assert(downstream);
if (downstream->get_request_state() == Downstream::CONNECT_FAIL) {
// Following paues_read is needed to avoid reading next data.
pause_read(SHRPX_MSG_BLOCK);
error_reply(503);
handler_->signal_write();
// Downstream gets deleted after response body is read.
return 0;
}
auto nread = http_parser_execute(
&htp_, &htp_hooks, reinterpret_cast<const char *>(data), datalen);
assert(downstream->get_request_state() == Downstream::MSG_COMPLETE);
rb->drain(nread);
// Well, actually header length + some body bytes
current_header_length_ += nread;
// Get downstream again because it may be initialized in http parser
// execution
downstream = get_downstream();
auto handler = get_client_handler();
auto htperr = HTTP_PARSER_ERRNO(&htp_);
if (htperr == HPE_PAUSED) {
assert(downstream);
if (downstream->get_request_state() == Downstream::CONNECT_FAIL) {
// Following paues_read is needed to avoid reading next data.
pause_read(SHRPX_MSG_BLOCK);
error_reply(503);
handler_->signal_write();
// Downstream gets deleted after response body is read.
return 0;
}
assert(downstream->get_request_state() == Downstream::MSG_COMPLETE);
if (downstream->get_downstream_connection() == nullptr) {
// Error response has already be sent
assert(downstream->get_response_state() == Downstream::MSG_COMPLETE);
delete_downstream();
return 0;
}
if (handler->get_http2_upgrade_allowed() &&
downstream->get_http2_upgrade_request()) {
if (handler->perform_http2_upgrade(this) != 0) {
return -1;
}
handler_->signal_write();
return 0;
}
pause_read(SHRPX_MSG_BLOCK);
if (downstream->get_downstream_connection() == nullptr) {
// Error response has already be sent
assert(downstream->get_response_state() == Downstream::MSG_COMPLETE);
delete_downstream();
return 0;
}
if (htperr != HPE_OK) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "HTTP parse failure: "
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr);
if (handler->get_http2_upgrade_allowed() &&
downstream->get_http2_upgrade_request()) {
if (handler->perform_http2_upgrade(this) != 0) {
return -1;
}
pause_read(SHRPX_MSG_BLOCK);
unsigned int status_code;
if (downstream &&
downstream->get_request_state() == Downstream::CONNECT_FAIL) {
status_code = 503;
} else {
status_code = 400;
}
error_reply(status_code);
handler_->signal_write();
return 0;
}
// downstream can be NULL here.
if (downstream && downstream->request_buf_full()) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream request buffer is full";
}
pause_read(SHRPX_MSG_BLOCK);
pause_read(SHRPX_NO_BUFFER);
return 0;
}
return 0;
}
if (htperr != HPE_OK) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "HTTP parse failure: "
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr);
}
pause_read(SHRPX_MSG_BLOCK);
unsigned int status_code;
if (downstream &&
downstream->get_request_state() == Downstream::CONNECT_FAIL) {
status_code = 503;
} else {
status_code = 400;
}
error_reply(status_code);
handler_->signal_write();
return 0;
}
// downstream can be NULL here.
if (downstream && downstream->request_buf_full()) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream request buffer is full";
}
pause_read(SHRPX_NO_BUFFER);
return 0;
}
return 0;
}
int HttpsUpstream::on_write() {
@ -384,24 +376,29 @@ int HttpsUpstream::on_write() {
if (!downstream) {
return 0;
}
auto dconn = downstream->get_downstream_connection();
auto wb = handler_->get_wb();
if (wb->rleft() == 0 && dconn &&
if (wb->wleft() == 0) {
return 0;
}
auto dconn = downstream->get_downstream_connection();
auto output = downstream->get_response_buf();
if (output->rleft() == 0 && dconn &&
downstream->get_response_state() != Downstream::MSG_COMPLETE) {
if (downstream->resume_read(SHRPX_NO_BUFFER,
downstream->get_response_datalen()) != 0) {
return -1;
}
if (downstream_read(dconn) != 0) {
return -1;
}
}
struct iovec iov[2];
auto iovcnt = wb->wiovec(iov);
if (iovcnt == 0) {
return 0;
}
auto output = downstream->get_response_buf();
for (int i = 0; i < iovcnt; ++i) {
auto n = output->remove(iov[i].iov_base, iov[i].iov_len);
wb->write(n);
}
auto n = output->remove(wb->last, wb->wleft());
wb->write(n);
if (wb->rleft() > 0) {
return 0;
}

View File

@ -69,17 +69,14 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len,
auto upstream = static_cast<SpdyUpstream *>(user_data);
auto handler = upstream->get_client_handler();
auto rb = handler->get_rb();
const void *data;
size_t nread;
std::tie(data, nread) = rb->get();
if (nread == 0) {
if (rb->rleft() == 0) {
return SPDYLAY_ERR_WOULDBLOCK;
}
nread = std::min(nread, len);
auto nread = std::min(rb->rleft(), len);
memcpy(buf, data, nread);
memcpy(buf, rb->pos, nread);
rb->drain(nread);
return nread;