nghttpx: Use DefaultMemchunks as HTTP/2 and SPDY frontend response buffer
This commit is contained in:
parent
ad93cea544
commit
98253b1d0d
|
@ -41,6 +41,14 @@
|
|||
|
||||
namespace nghttp2 {
|
||||
|
||||
#define DEFAULT_WR_IOVCNT 16
|
||||
|
||||
#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT
|
||||
#define MAX_WR_IOVCNT IOV_MAX
|
||||
#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
|
||||
#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT
|
||||
#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
|
||||
|
||||
template <size_t N> struct Memchunk {
|
||||
Memchunk(std::unique_ptr<Memchunk> next_chunk)
|
||||
: pos(std::begin(buf)), last(pos), knext(std::move(next_chunk)),
|
||||
|
@ -199,6 +207,36 @@ template <typename Memchunk> struct Memchunks {
|
|||
|
||||
return first - static_cast<uint8_t *>(dest);
|
||||
}
|
||||
size_t remove(Memchunks &dest, size_t count) {
|
||||
if (!tail || count == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto left = count;
|
||||
auto m = head;
|
||||
|
||||
while (m) {
|
||||
auto next = m->next;
|
||||
auto n = std::min(left, m->len());
|
||||
|
||||
assert(m->len());
|
||||
dest.append(m->pos, n);
|
||||
m->pos += n;
|
||||
len -= n;
|
||||
left -= n;
|
||||
if (m->len() > 0) {
|
||||
break;
|
||||
}
|
||||
pool->recycle(m);
|
||||
m = next;
|
||||
}
|
||||
head = m;
|
||||
if (head == nullptr) {
|
||||
tail = nullptr;
|
||||
}
|
||||
|
||||
return count - left;
|
||||
}
|
||||
size_t drain(size_t count) {
|
||||
auto ndata = count;
|
||||
auto m = head;
|
||||
|
@ -374,14 +412,6 @@ using MemchunkPool = Pool<Memchunk16K>;
|
|||
using DefaultMemchunks = Memchunks<Memchunk16K>;
|
||||
using DefaultPeekMemchunks = PeekMemchunks<Memchunk16K>;
|
||||
|
||||
#define DEFAULT_WR_IOVCNT 16
|
||||
|
||||
#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT
|
||||
#define MAX_WR_IOVCNT IOV_MAX
|
||||
#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
|
||||
#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT
|
||||
#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
|
||||
|
||||
inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) {
|
||||
if (max == 0) {
|
||||
return 0;
|
||||
|
|
|
@ -761,17 +761,6 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
|
|||
"Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
|
||||
"\r\n";
|
||||
|
||||
auto required_size = str_size(res) + input->rleft();
|
||||
|
||||
if (output->wleft() < required_size) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this)
|
||||
<< "HTTP Upgrade failed because of insufficient buffer space: need "
|
||||
<< required_size << ", available " << output->wleft();
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (upstream->upgrade_upstream(http) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -783,11 +772,8 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
|
|||
on_read_ = &ClientHandler::upstream_http2_connhd_read;
|
||||
write_ = &ClientHandler::write_clear;
|
||||
|
||||
auto nread =
|
||||
downstream->get_response_buf()->remove(output->last, output->wleft());
|
||||
output->write(nread);
|
||||
|
||||
output->write(res, str_size(res));
|
||||
input->remove(*output, input->rleft());
|
||||
output->append(res, str_size(res));
|
||||
upstream_ = std::move(upstream);
|
||||
|
||||
signal_write();
|
||||
|
|
|
@ -50,6 +50,10 @@ using namespace nghttp2;
|
|||
|
||||
namespace shrpx {
|
||||
|
||||
namespace {
|
||||
constexpr size_t MAX_BUFFER_SIZE = 32_k;
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
|
||||
uint32_t error_code, void *user_data) {
|
||||
|
@ -661,13 +665,6 @@ int on_frame_not_send_callback(nghttp2_session *session,
|
|||
}
|
||||
} // namespace
|
||||
|
||||
void Http2Upstream::set_pending_data_downstream(Downstream *downstream,
|
||||
size_t n, size_t padlen) {
|
||||
pending_data_downstream_ = downstream;
|
||||
data_pendinglen_ = n;
|
||||
padding_pendinglen_ = padlen;
|
||||
}
|
||||
|
||||
namespace {
|
||||
constexpr auto PADDING = std::array<uint8_t, 256>{};
|
||||
} // namespace
|
||||
|
@ -682,51 +679,21 @@ int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
|
|||
|
||||
auto wb = upstream->get_response_buf();
|
||||
|
||||
size_t padlen;
|
||||
size_t padlen = 0;
|
||||
|
||||
if (frame->data.padlen == 0) {
|
||||
if (wb->wleft() < 9) {
|
||||
return NGHTTP2_ERR_WOULDBLOCK;
|
||||
}
|
||||
|
||||
wb->write(framehd, 9);
|
||||
padlen = 0;
|
||||
} else {
|
||||
if (wb->wleft() < 10) {
|
||||
return NGHTTP2_ERR_WOULDBLOCK;
|
||||
}
|
||||
|
||||
wb->write(framehd, 9);
|
||||
wb->append(framehd, 9);
|
||||
if (frame->data.padlen > 0) {
|
||||
padlen = frame->data.padlen - 1;
|
||||
*wb->last++ = padlen;
|
||||
wb->append(static_cast<uint8_t>(padlen));
|
||||
}
|
||||
|
||||
size_t npadwrite = 0;
|
||||
auto nwrite = std::min(length, wb->wleft());
|
||||
body->remove(wb->last, nwrite);
|
||||
wb->write(nwrite);
|
||||
if (nwrite < length) {
|
||||
// We must store unsent amount of data to somewhere. We just tell
|
||||
// libnghttp2 that we wrote everything, so downstream could be
|
||||
// deleted. We handle this situation in
|
||||
// Http2Upstream::remove_downstream().
|
||||
upstream->set_pending_data_downstream(downstream, length - nwrite, padlen);
|
||||
} else if (padlen > 0) {
|
||||
npadwrite = std::min(padlen, wb->wleft());
|
||||
wb->write(PADDING.data(), npadwrite);
|
||||
body->remove(*wb, length);
|
||||
|
||||
if (npadwrite < padlen) {
|
||||
upstream->set_pending_data_downstream(nullptr, 0, padlen - npadwrite);
|
||||
}
|
||||
}
|
||||
wb->append(PADDING.data(), padlen);
|
||||
|
||||
if (wb->rleft() == 0) {
|
||||
downstream->disable_upstream_wtimer();
|
||||
} else {
|
||||
downstream->reset_upstream_wtimer();
|
||||
}
|
||||
downstream->reset_upstream_wtimer();
|
||||
|
||||
if (nwrite > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nwrite) != 0) {
|
||||
if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) {
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
}
|
||||
|
||||
|
@ -734,7 +701,7 @@ int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
|
|||
// data transferred.
|
||||
downstream->response_sent_body_length += length;
|
||||
|
||||
return (nwrite < length || npadwrite < padlen) ? NGHTTP2_ERR_PAUSE : 0;
|
||||
return wb->rleft() >= MAX_BUFFER_SIZE ? NGHTTP2_ERR_PAUSE : 0;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
|
@ -851,17 +818,15 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() {
|
|||
}
|
||||
|
||||
Http2Upstream::Http2Upstream(ClientHandler *handler)
|
||||
: downstream_queue_(
|
||||
: wb_(handler->get_worker()->get_mcpool()),
|
||||
downstream_queue_(
|
||||
get_config()->http2_proxy
|
||||
? get_config()->conn.downstream.connections_per_host
|
||||
: get_config()->conn.downstream.proto == PROTO_HTTP
|
||||
? get_config()->conn.downstream.connections_per_frontend
|
||||
: 0,
|
||||
!get_config()->http2_proxy),
|
||||
pending_response_buf_(handler->get_worker()->get_mcpool()),
|
||||
pending_data_downstream_(nullptr), handler_(handler), session_(nullptr),
|
||||
data_pending_(nullptr), data_pendinglen_(0), padding_pendinglen_(0),
|
||||
shutdown_handled_(false) {
|
||||
handler_(handler), session_(nullptr), shutdown_handled_(false) {
|
||||
|
||||
int rv;
|
||||
|
||||
|
@ -963,66 +928,11 @@ int Http2Upstream::on_read() {
|
|||
|
||||
// After this function call, downstream may be deleted.
|
||||
int Http2Upstream::on_write() {
|
||||
if (wb_.rleft() == 0) {
|
||||
wb_.reset();
|
||||
}
|
||||
|
||||
if (data_pendinglen_ > 0) {
|
||||
if (data_pending_) {
|
||||
auto n = std::min(wb_.wleft(), data_pendinglen_);
|
||||
wb_.write(data_pending_, n);
|
||||
data_pending_ += n;
|
||||
data_pendinglen_ -= n;
|
||||
|
||||
if (data_pendinglen_ > 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
data_pending_ = nullptr;
|
||||
} else {
|
||||
auto nwrite = std::min(wb_.wleft(), data_pendinglen_);
|
||||
DefaultMemchunks *body;
|
||||
if (pending_data_downstream_) {
|
||||
body = pending_data_downstream_->get_response_buf();
|
||||
} else {
|
||||
body = &pending_response_buf_;
|
||||
}
|
||||
body->remove(wb_.last, nwrite);
|
||||
wb_.write(nwrite);
|
||||
data_pendinglen_ -= nwrite;
|
||||
|
||||
if (pending_data_downstream_ && nwrite > 0) {
|
||||
if (pending_data_downstream_->resume_read(SHRPX_NO_BUFFER, nwrite) !=
|
||||
0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (data_pendinglen_ > 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pending_data_downstream_) {
|
||||
pending_data_downstream_ = nullptr;
|
||||
} else {
|
||||
// Downstream was already deleted, and we don't need its
|
||||
// response data.
|
||||
body->reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (padding_pendinglen_ > 0) {
|
||||
auto nwrite = std::min(wb_.wleft(), padding_pendinglen_);
|
||||
wb_.write(PADDING.data(), nwrite);
|
||||
padding_pendinglen_ -= nwrite;
|
||||
|
||||
if (padding_pendinglen_ > 0) {
|
||||
for (;;) {
|
||||
if (wb_.rleft() >= MAX_BUFFER_SIZE) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
const uint8_t *data;
|
||||
auto datalen = nghttp2_session_mem_send(session_, &data);
|
||||
|
||||
|
@ -1034,12 +944,7 @@ int Http2Upstream::on_write() {
|
|||
if (datalen == 0) {
|
||||
break;
|
||||
}
|
||||
auto n = wb_.write(data, datalen);
|
||||
if (n < static_cast<decltype(n)>(datalen)) {
|
||||
data_pending_ = data + n;
|
||||
data_pendinglen_ = datalen - n;
|
||||
return 0;
|
||||
}
|
||||
wb_.append(data, datalen);
|
||||
}
|
||||
|
||||
if (nghttp2_session_want_read(session_) == 0 &&
|
||||
|
@ -1409,11 +1314,6 @@ void Http2Upstream::remove_downstream(Downstream *downstream) {
|
|||
nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
|
||||
nullptr);
|
||||
|
||||
if (downstream == pending_data_downstream_) {
|
||||
pending_data_downstream_ = nullptr;
|
||||
pending_response_buf_ = downstream->pop_response_buf();
|
||||
}
|
||||
|
||||
auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
|
||||
|
||||
if (next_downstream) {
|
||||
|
@ -1999,17 +1899,14 @@ int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
|||
return 0;
|
||||
}
|
||||
|
||||
iov->iov_base = wb_.pos;
|
||||
iov->iov_len = wb_.rleft();
|
||||
|
||||
return 1;
|
||||
return wb_.riovec(iov, iovcnt);
|
||||
}
|
||||
|
||||
void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
|
||||
|
||||
bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
|
||||
|
||||
Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; }
|
||||
DefaultMemchunks *Http2Upstream::get_response_buf() { return &wb_; }
|
||||
|
||||
Downstream *
|
||||
Http2Upstream::on_downstream_push_promise(Downstream *downstream,
|
||||
|
|
|
@ -118,46 +118,21 @@ public:
|
|||
|
||||
int on_request_headers(Downstream *downstream, const nghttp2_frame *frame);
|
||||
|
||||
using WriteBuffer = Buffer<32_k>;
|
||||
|
||||
WriteBuffer *get_response_buf();
|
||||
|
||||
void set_pending_data_downstream(Downstream *downstream, size_t n,
|
||||
size_t padlen);
|
||||
DefaultMemchunks *get_response_buf();
|
||||
|
||||
// Changes stream priority of |downstream|, which is assumed to be a
|
||||
// pushed stream.
|
||||
int adjust_pushed_stream_priority(Downstream *downstream);
|
||||
|
||||
private:
|
||||
WriteBuffer wb_;
|
||||
DefaultMemchunks wb_;
|
||||
std::unique_ptr<HttpsUpstream> pre_upstream_;
|
||||
DownstreamQueue downstream_queue_;
|
||||
ev_timer settings_timer_;
|
||||
ev_timer shutdown_timer_;
|
||||
ev_prepare prep_;
|
||||
// A response buffer used to belong to Downstream object. This is
|
||||
// moved here when response is partially written to wb_ in
|
||||
// send_data_callback, but before writing them all, Downstream
|
||||
// object was destroyed. On destruction of Downstream,
|
||||
// pending_data_downstream_ becomes nullptr.
|
||||
DefaultMemchunks pending_response_buf_;
|
||||
// Downstream object whose DATA frame payload is partillay written
|
||||
// to wb_ in send_data_callback. This field exists to keep track of
|
||||
// its lifetime. When it is destroyed, its response buffer is
|
||||
// transferred to pending_response_buf_, and this field becomes
|
||||
// nullptr.
|
||||
Downstream *pending_data_downstream_;
|
||||
ClientHandler *handler_;
|
||||
nghttp2_session *session_;
|
||||
const uint8_t *data_pending_;
|
||||
// The length of lending data to be written into wb_. If
|
||||
// data_pending_ is not nullptr, data_pending_ points to the data to
|
||||
// write. Otherwise, pending_data_downstream_->get_response_buf()
|
||||
// if pending_data_downstream_ is not nullptr, or
|
||||
// pending_response_buf_ holds data to write.
|
||||
size_t data_pendinglen_;
|
||||
size_t padding_pendinglen_;
|
||||
bool flow_control_;
|
||||
bool shutdown_handled_;
|
||||
};
|
||||
|
|
|
@ -49,19 +49,23 @@ using namespace nghttp2;
|
|||
|
||||
namespace shrpx {
|
||||
|
||||
namespace {
|
||||
constexpr size_t MAX_BUFFER_SIZE = 32_k;
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len,
|
||||
int flags, void *user_data) {
|
||||
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
||||
auto wb = upstream->get_response_buf();
|
||||
|
||||
if (wb->wleft() == 0) {
|
||||
if (wb->rleft() >= MAX_BUFFER_SIZE) {
|
||||
return SPDYLAY_ERR_WOULDBLOCK;
|
||||
}
|
||||
|
||||
auto nread = wb->write(data, len);
|
||||
wb->append(data, len);
|
||||
|
||||
return nread;
|
||||
return len;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
|
@ -492,7 +496,8 @@ uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_error_code) {
|
|||
} // namespace
|
||||
|
||||
SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
|
||||
: downstream_queue_(
|
||||
: wb_(handler->get_worker()->get_mcpool()),
|
||||
downstream_queue_(
|
||||
get_config()->http2_proxy
|
||||
? get_config()->conn.downstream.connections_per_host
|
||||
: get_config()->conn.downstream.proto == PROTO_HTTP
|
||||
|
@ -586,8 +591,8 @@ int SpdyUpstream::on_read() {
|
|||
int SpdyUpstream::on_write() {
|
||||
int rv = 0;
|
||||
|
||||
if (wb_.rleft() == 0) {
|
||||
wb_.reset();
|
||||
if (wb_.rleft() >= MAX_BUFFER_SIZE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
rv = spdylay_session_send(session_);
|
||||
|
@ -1248,17 +1253,14 @@ int SpdyUpstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
|||
return 0;
|
||||
}
|
||||
|
||||
iov->iov_base = wb_.pos;
|
||||
iov->iov_len = wb_.rleft();
|
||||
|
||||
return 1;
|
||||
return wb_.riovec(iov, iovcnt);
|
||||
}
|
||||
|
||||
void SpdyUpstream::response_drain(size_t n) { wb_.drain(n); }
|
||||
|
||||
bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; }
|
||||
|
||||
SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; }
|
||||
DefaultMemchunks *SpdyUpstream::get_response_buf() { return &wb_; }
|
||||
|
||||
Downstream *
|
||||
SpdyUpstream::on_downstream_push_promise(Downstream *downstream,
|
||||
|
|
|
@ -97,12 +97,10 @@ public:
|
|||
void start_downstream(Downstream *downstream);
|
||||
void initiate_downstream(Downstream *downstream);
|
||||
|
||||
using WriteBuffer = Buffer<32_k>;
|
||||
|
||||
WriteBuffer *get_response_buf();
|
||||
DefaultMemchunks *get_response_buf();
|
||||
|
||||
private:
|
||||
WriteBuffer wb_;
|
||||
DefaultMemchunks wb_;
|
||||
DownstreamQueue downstream_queue_;
|
||||
ClientHandler *handler_;
|
||||
spdylay_session *session_;
|
||||
|
|
Loading…
Reference in New Issue