nghttpx: Use send_data_callback for higher throughput
This commit is contained in:
parent
45d4c9dece
commit
777e1ee2c5
|
@ -222,7 +222,7 @@ template <typename Memchunk> struct Memchunks {
|
|||
}
|
||||
return ndata - count;
|
||||
}
|
||||
int riovec(struct iovec *iov, int iovcnt) {
|
||||
int riovec(struct iovec *iov, int iovcnt) const {
|
||||
if (!head) {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -731,7 +731,7 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
|
|||
"Connection: Upgrade\r\n"
|
||||
"Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
|
||||
"\r\n";
|
||||
upstream->response_write(res, sizeof(res) - 1);
|
||||
upstream->get_response_buf()->write(res, sizeof(res) - 1);
|
||||
upstream_ = std::move(upstream);
|
||||
|
||||
signal_write();
|
||||
|
|
|
@ -1241,4 +1241,8 @@ bool Downstream::can_detach_downstream_connection() const {
|
|||
!response_connection_close_;
|
||||
}
|
||||
|
||||
DefaultMemchunks Downstream::pop_response_buf() {
|
||||
return std::move(response_buf_);
|
||||
}
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -342,6 +342,8 @@ public:
|
|||
// Returns true if downstream_connection can be detached and reused.
|
||||
bool can_detach_downstream_connection() const;
|
||||
|
||||
DefaultMemchunks pop_response_buf();
|
||||
|
||||
enum {
|
||||
EVENT_ERROR = 0x1,
|
||||
EVENT_TIMEOUT = 0x2,
|
||||
|
|
|
@ -647,6 +647,57 @@ int on_frame_not_send_callback(nghttp2_session *session,
|
|||
}
|
||||
} // namespace
|
||||
|
||||
void Http2Upstream::set_pending_data_downstream(Downstream *downstream,
|
||||
size_t n) {
|
||||
pending_data_downstream_ = downstream;
|
||||
data_pendinglen_ = n;
|
||||
}
|
||||
|
||||
namespace {
|
||||
int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
|
||||
const uint8_t *framehd, size_t length,
|
||||
nghttp2_data_source *source, void *user_data) {
|
||||
auto downstream = static_cast<Downstream *>(source->ptr);
|
||||
auto upstream = static_cast<Http2Upstream *>(downstream->get_upstream());
|
||||
auto body = downstream->get_response_buf();
|
||||
|
||||
auto wb = upstream->get_response_buf();
|
||||
|
||||
if (wb->wleft() < 9) {
|
||||
return NGHTTP2_ERR_WOULDBLOCK;
|
||||
}
|
||||
|
||||
wb->write(framehd, 9);
|
||||
|
||||
auto nwrite = std::min(length, wb->wleft());
|
||||
body->remove(wb->last, nwrite);
|
||||
wb->write(nwrite);
|
||||
if (nwrite < length) {
|
||||
// We must store unsent amount of data to somewhere. We just tell
|
||||
// libnghttp2 that we wrote everything, so downstream could be
|
||||
// deleted. We handle this situation in
|
||||
// Http2Upstream::remove_downstream().
|
||||
upstream->set_pending_data_downstream(downstream, length - nwrite);
|
||||
}
|
||||
|
||||
if (wb->rleft() == 0) {
|
||||
downstream->disable_upstream_wtimer();
|
||||
} else {
|
||||
downstream->reset_upstream_wtimer();
|
||||
}
|
||||
|
||||
if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) {
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
}
|
||||
|
||||
if (length > 0) {
|
||||
downstream->add_response_sent_bodylen(length);
|
||||
}
|
||||
|
||||
return nwrite < length ? NGHTTP2_ERR_PAUSE : 0;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
|
||||
// NGHTTP2_REFUSED_STREAM is important because it tells upstream
|
||||
|
@ -748,6 +799,9 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() {
|
|||
nghttp2_session_callbacks_set_on_begin_headers_callback(
|
||||
callbacks, on_begin_headers_callback);
|
||||
|
||||
nghttp2_session_callbacks_set_send_data_callback(callbacks,
|
||||
send_data_callback);
|
||||
|
||||
if (get_config()->padding) {
|
||||
nghttp2_session_callbacks_set_select_padding_callback(
|
||||
callbacks, http::select_padding_callback);
|
||||
|
@ -764,8 +818,9 @@ Http2Upstream::Http2Upstream(ClientHandler *handler)
|
|||
? get_config()->downstream_connections_per_frontend
|
||||
: 0,
|
||||
!get_config()->http2_proxy),
|
||||
handler_(handler), session_(nullptr), data_pending_(nullptr),
|
||||
data_pendinglen_(0), shutdown_handled_(false) {
|
||||
pending_response_buf_(handler->get_worker()->get_mcpool()),
|
||||
pending_data_downstream_(nullptr), handler_(handler), session_(nullptr),
|
||||
data_pending_(nullptr), data_pendinglen_(0), shutdown_handled_(false) {
|
||||
|
||||
int rv;
|
||||
|
||||
|
@ -870,17 +925,42 @@ int Http2Upstream::on_write() {
|
|||
wb_.reset();
|
||||
}
|
||||
|
||||
if (data_pending_) {
|
||||
auto n = std::min(wb_.wleft(), data_pendinglen_);
|
||||
wb_.write(data_pending_, n);
|
||||
if (n < data_pendinglen_) {
|
||||
if (data_pendinglen_ > 0) {
|
||||
if (data_pending_) {
|
||||
auto n = std::min(wb_.wleft(), data_pendinglen_);
|
||||
wb_.write(data_pending_, n);
|
||||
data_pending_ += n;
|
||||
data_pendinglen_ -= n;
|
||||
return 0;
|
||||
}
|
||||
|
||||
data_pending_ = nullptr;
|
||||
data_pendinglen_ = 0;
|
||||
if (data_pendinglen_ > 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
data_pending_ = nullptr;
|
||||
} else {
|
||||
auto n = std::min(wb_.wleft(), data_pendinglen_);
|
||||
DefaultMemchunks *body;
|
||||
if (pending_data_downstream_) {
|
||||
body = pending_data_downstream_->get_response_buf();
|
||||
} else {
|
||||
body = &pending_response_buf_;
|
||||
}
|
||||
body->remove(wb_.last, n);
|
||||
wb_.write(n);
|
||||
data_pendinglen_ -= n;
|
||||
|
||||
if (data_pendinglen_ > 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pending_data_downstream_) {
|
||||
pending_data_downstream_ = nullptr;
|
||||
} else {
|
||||
// Downstream was already deleted, and we don't need its
|
||||
// response data.
|
||||
body->reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
|
@ -1118,8 +1198,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
|
|||
}
|
||||
}
|
||||
|
||||
auto nread = body->remove(buf, length);
|
||||
auto body_empty = body->rleft() == 0;
|
||||
auto nread = std::min(body->rleft(), length);
|
||||
auto body_empty = body->rleft() == nread;
|
||||
|
||||
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
|
||||
|
||||
if (body_empty &&
|
||||
downstream->get_response_state() == Downstream::MSG_COMPLETE) {
|
||||
|
@ -1147,24 +1229,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
|
|||
}
|
||||
}
|
||||
|
||||
if (body_empty) {
|
||||
downstream->disable_upstream_wtimer();
|
||||
} else {
|
||||
downstream->reset_upstream_wtimer();
|
||||
}
|
||||
|
||||
if (nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) {
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
}
|
||||
|
||||
if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
|
||||
return NGHTTP2_ERR_DEFERRED;
|
||||
}
|
||||
|
||||
if (nread > 0) {
|
||||
downstream->add_response_sent_bodylen(nread);
|
||||
}
|
||||
|
||||
return nread;
|
||||
}
|
||||
} // namespace
|
||||
|
@ -1275,6 +1343,11 @@ void Http2Upstream::remove_downstream(Downstream *downstream) {
|
|||
nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
|
||||
nullptr);
|
||||
|
||||
if (downstream == pending_data_downstream_) {
|
||||
pending_data_downstream_ = nullptr;
|
||||
pending_response_buf_ = downstream->pop_response_buf();
|
||||
}
|
||||
|
||||
auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
|
||||
|
||||
if (next_downstream) {
|
||||
|
@ -1771,8 +1844,6 @@ void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
|
|||
|
||||
bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
|
||||
|
||||
void Http2Upstream::response_write(void *data, size_t len) {
|
||||
wb_.write(data, len);
|
||||
}
|
||||
Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; }
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -87,8 +87,6 @@ public:
|
|||
virtual void response_drain(size_t n);
|
||||
virtual bool response_empty() const;
|
||||
|
||||
void response_write(void *data, size_t len);
|
||||
|
||||
bool get_flow_control() const;
|
||||
// Perform HTTP/2 upgrade from |upstream|. On success, this object
|
||||
// takes ownership of the |upstream|. This function returns 0 if it
|
||||
|
@ -114,6 +112,10 @@ public:
|
|||
|
||||
using WriteBuffer = Buffer<32_k>;
|
||||
|
||||
WriteBuffer *get_response_buf();
|
||||
|
||||
void set_pending_data_downstream(Downstream *downstream, size_t n);
|
||||
|
||||
private:
|
||||
WriteBuffer wb_;
|
||||
std::unique_ptr<HttpsUpstream> pre_upstream_;
|
||||
|
@ -121,9 +123,26 @@ private:
|
|||
ev_timer settings_timer_;
|
||||
ev_timer shutdown_timer_;
|
||||
ev_prepare prep_;
|
||||
// A response buffer used to belong to Downstream object. This is
|
||||
// moved here when response is partially written to wb_ in
|
||||
// send_data_callback, but before writing them all, Downstream
|
||||
// object was destroyed. On destruction of Downstream,
|
||||
// pending_data_downstream_ becomes nullptr.
|
||||
DefaultMemchunks pending_response_buf_;
|
||||
// Downstream object whose DATA frame payload is partillay written
|
||||
// to wb_ in send_data_callback. This field exists to keep track of
|
||||
// its lifetime. When it is destroyed, its response buffer is
|
||||
// transferred to pending_response_buf_, and this field becomes
|
||||
// nullptr.
|
||||
Downstream *pending_data_downstream_;
|
||||
ClientHandler *handler_;
|
||||
nghttp2_session *session_;
|
||||
const uint8_t *data_pending_;
|
||||
// The length of lending data to be written into wb_. If
|
||||
// data_pending_ is not nullptr, data_pending_ points to the data to
|
||||
// write. Otherwise, pending_data_downstream_->get_response_buf()
|
||||
// if pending_data_downstream_ is not nullptr, or
|
||||
// pending_response_buf_ holds data to write.
|
||||
size_t data_pendinglen_;
|
||||
bool flow_control_;
|
||||
bool shutdown_handled_;
|
||||
|
|
|
@ -455,7 +455,7 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
|
|||
|
||||
int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
|
||||
size_t consumed) {
|
||||
if (!downstream_->response_buf_full()) {
|
||||
if (downstream_->get_response_buf()->rleft() == 0) {
|
||||
ioctrl_.resume_read(reason);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue