Merge branch 'nghttpx-response-buffer'
This commit is contained in:
commit
9fa5010eac
|
@ -1199,18 +1199,22 @@ typedef ssize_t (*nghttp2_send_callback)(nghttp2_session *session,
|
||||||
* The application has to send complete DATA frame in this callback.
|
* The application has to send complete DATA frame in this callback.
|
||||||
* If all data were written successfully, return 0.
|
* If all data were written successfully, return 0.
|
||||||
*
|
*
|
||||||
* If it cannot send it all, just return
|
* If it cannot send any data at all, just return
|
||||||
* :enum:`NGHTTP2_ERR_WOULDBLOCK`; the library will call this callback
|
* :enum:`NGHTTP2_ERR_WOULDBLOCK`; the library will call this callback
|
||||||
* with the same parameters later (It is recommended to send complete
|
* with the same parameters later (It is recommended to send complete
|
||||||
* DATA frame at once in this function to deal with error; if partial
|
* DATA frame at once in this function to deal with error; if partial
|
||||||
* frame data has already sent, it is impossible to send another data
|
* frame data has already sent, it is impossible to send another data
|
||||||
* in that state, and all we can do is tear down connection). If
|
* in that state, and all we can do is tear down connection). When
|
||||||
* application decided to reset this stream, return
|
* data is fully processed, but application wants to make
|
||||||
* :enum:`NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE`, then the library
|
* `nghttp2_session_mem_send()` or `nghttp2_session_send()` return
|
||||||
* will send RST_STREAM with INTERNAL_ERROR as error code. The
|
* immediately without processing next frames, return
|
||||||
* application can also return :enum:`NGHTTP2_ERR_CALLBACK_FAILURE`,
|
* :enum:`NGHTTP2_ERR_PAUSE`. If application decided to reset this
|
||||||
* which will result in connection closure. Returning any other value
|
* stream, return :enum:`NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE`, then
|
||||||
* is treated as :enum:`NGHTTP2_ERR_CALLBACK_FAILURE` is returned.
|
* the library will send RST_STREAM with INTERNAL_ERROR as error code.
|
||||||
|
* The application can also return
|
||||||
|
* :enum:`NGHTTP2_ERR_CALLBACK_FAILURE`, which will result in
|
||||||
|
* connection closure. Returning any other value is treated as
|
||||||
|
* :enum:`NGHTTP2_ERR_CALLBACK_FAILURE` is returned.
|
||||||
*/
|
*/
|
||||||
typedef int (*nghttp2_send_data_callback)(nghttp2_session *session,
|
typedef int (*nghttp2_send_data_callback)(nghttp2_session *session,
|
||||||
nghttp2_frame *frame,
|
nghttp2_frame *frame,
|
||||||
|
|
|
@ -2610,12 +2610,15 @@ static int session_call_send_data(nghttp2_session *session,
|
||||||
&aux_data->data_prd.source,
|
&aux_data->data_prd.source,
|
||||||
session->user_data);
|
session->user_data);
|
||||||
|
|
||||||
if (rv == 0 || rv == NGHTTP2_ERR_WOULDBLOCK ||
|
switch (rv) {
|
||||||
rv == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) {
|
case 0:
|
||||||
|
case NGHTTP2_ERR_WOULDBLOCK:
|
||||||
|
case NGHTTP2_ERR_PAUSE:
|
||||||
|
case NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE:
|
||||||
return rv;
|
return rv;
|
||||||
|
default:
|
||||||
|
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
|
static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
|
||||||
|
@ -2790,6 +2793,7 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
|
||||||
case NGHTTP2_OB_SEND_NO_COPY: {
|
case NGHTTP2_OB_SEND_NO_COPY: {
|
||||||
nghttp2_stream *stream;
|
nghttp2_stream *stream;
|
||||||
nghttp2_frame *frame;
|
nghttp2_frame *frame;
|
||||||
|
int pause;
|
||||||
|
|
||||||
DEBUGF(fprintf(stderr, "send: no copy DATA\n"));
|
DEBUGF(fprintf(stderr, "send: no copy DATA\n"));
|
||||||
|
|
||||||
|
@ -2833,7 +2837,7 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(rv == 0);
|
pause = (rv == NGHTTP2_ERR_PAUSE);
|
||||||
|
|
||||||
rv = session_after_frame_sent1(session);
|
rv = session_after_frame_sent1(session);
|
||||||
if (rv < 0) {
|
if (rv < 0) {
|
||||||
|
@ -2848,6 +2852,10 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
|
||||||
|
|
||||||
/* We have already adjusted the next state */
|
/* We have already adjusted the next state */
|
||||||
|
|
||||||
|
if (pause) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case NGHTTP2_OB_SEND_CLIENT_MAGIC: {
|
case NGHTTP2_OB_SEND_CLIENT_MAGIC: {
|
||||||
|
|
|
@ -222,7 +222,7 @@ template <typename Memchunk> struct Memchunks {
|
||||||
}
|
}
|
||||||
return ndata - count;
|
return ndata - count;
|
||||||
}
|
}
|
||||||
int riovec(struct iovec *iov, int iovcnt) {
|
int riovec(struct iovec *iov, int iovcnt) const {
|
||||||
if (!head) {
|
if (!head) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,68 +135,30 @@ int ClientHandler::read_clear() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int ClientHandler::write_clear() {
|
int ClientHandler::write_clear() {
|
||||||
|
std::array<iovec, 2> iov;
|
||||||
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (wb_.rleft() > 0) {
|
|
||||||
auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft());
|
|
||||||
if (nwrite == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if (nwrite < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
wb_.drain(nwrite);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
wb_.reset();
|
|
||||||
if (on_write() != 0) {
|
if (on_write() != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (wb_.rleft() == 0) {
|
|
||||||
|
auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
|
||||||
|
if (iovcnt == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
conn_.wlimit.stopw();
|
auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
|
||||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
if (nwrite < 0) {
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ClientHandler::writev_clear() {
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
auto buf = upstream_->get_response_buf();
|
|
||||||
if (!buf) {
|
|
||||||
conn_.wlimit.stopw();
|
|
||||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (buf->rleft() > 0) {
|
|
||||||
std::array<iovec, 2> iov;
|
|
||||||
auto iovcnt = buf->riovec(iov.data(), iov.size());
|
|
||||||
auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
|
|
||||||
if (nwrite == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if (nwrite < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
buf->drain(nwrite);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (on_write() != 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// buf may be destroyed inside on_write()
|
|
||||||
buf = upstream_->get_response_buf();
|
if (nwrite == 0) {
|
||||||
if (!buf || buf->rleft() == 0) {
|
return 0;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
upstream_->response_drain(nwrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
conn_.wlimit.stopw();
|
conn_.wlimit.stopw();
|
||||||
|
@ -229,12 +191,7 @@ int ClientHandler::tls_handshake() {
|
||||||
}
|
}
|
||||||
|
|
||||||
read_ = &ClientHandler::read_tls;
|
read_ = &ClientHandler::read_tls;
|
||||||
|
write_ = &ClientHandler::write_tls;
|
||||||
if (alpn_ == "http/1.1") {
|
|
||||||
write_ = &ClientHandler::writev_tls;
|
|
||||||
} else {
|
|
||||||
write_ = &ClientHandler::write_tls;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -271,85 +228,33 @@ int ClientHandler::read_tls() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int ClientHandler::write_tls() {
|
int ClientHandler::write_tls() {
|
||||||
|
struct iovec iov;
|
||||||
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
|
|
||||||
ERR_clear_error();
|
ERR_clear_error();
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (wb_.rleft() > 0) {
|
|
||||||
auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft());
|
|
||||||
|
|
||||||
if (nwrite == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nwrite < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
wb_.drain(nwrite);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
wb_.reset();
|
|
||||||
if (on_write() != 0) {
|
if (on_write() != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (wb_.rleft() == 0) {
|
|
||||||
|
auto iovcnt = upstream_->response_riovec(&iov, 1);
|
||||||
|
if (iovcnt == 0) {
|
||||||
conn_.start_tls_write_idle();
|
conn_.start_tls_write_idle();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
conn_.wlimit.stopw();
|
auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
|
||||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
if (nwrite < 0) {
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ClientHandler::writev_tls() {
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
auto buf = upstream_->get_response_buf();
|
|
||||||
if (!buf) {
|
|
||||||
conn_.wlimit.stopw();
|
|
||||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
ERR_clear_error();
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (buf->rleft() > 0) {
|
|
||||||
iovec iov;
|
|
||||||
auto iovcnt = buf->riovec(&iov, 1);
|
|
||||||
if (iovcnt == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
|
|
||||||
|
|
||||||
if (nwrite == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nwrite < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
buf->drain(nwrite);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (on_write() != 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
buf = upstream_->get_response_buf();
|
|
||||||
if (!buf || buf->rleft() == 0) {
|
if (nwrite == 0) {
|
||||||
conn_.start_tls_write_idle();
|
return 0;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
upstream_->response_drain(nwrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
conn_.wlimit.stopw();
|
conn_.wlimit.stopw();
|
||||||
|
@ -374,9 +279,7 @@ int ClientHandler::upstream_write() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (get_should_close_after_write() && wb_.rleft() == 0 &&
|
if (get_should_close_after_write() && upstream_->response_empty()) {
|
||||||
(!upstream_->get_response_buf() ||
|
|
||||||
upstream_->get_response_buf()->rleft() == 0)) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -506,7 +409,7 @@ void ClientHandler::setup_upstream_io_callback() {
|
||||||
upstream_ = make_unique<HttpsUpstream>(this);
|
upstream_ = make_unique<HttpsUpstream>(this);
|
||||||
alpn_ = "http/1.1";
|
alpn_ = "http/1.1";
|
||||||
read_ = &ClientHandler::read_clear;
|
read_ = &ClientHandler::read_clear;
|
||||||
write_ = &ClientHandler::writev_clear;
|
write_ = &ClientHandler::write_clear;
|
||||||
on_read_ = &ClientHandler::upstream_http1_connhd_read;
|
on_read_ = &ClientHandler::upstream_http1_connhd_read;
|
||||||
on_write_ = &ClientHandler::upstream_noop;
|
on_write_ = &ClientHandler::upstream_noop;
|
||||||
}
|
}
|
||||||
|
@ -818,7 +721,6 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
|
||||||
}
|
}
|
||||||
// http pointer is now owned by upstream.
|
// http pointer is now owned by upstream.
|
||||||
upstream_.release();
|
upstream_.release();
|
||||||
upstream_ = std::move(upstream);
|
|
||||||
// TODO We might get other version id in HTTP2-settings, if we
|
// TODO We might get other version id in HTTP2-settings, if we
|
||||||
// support aliasing for h2, but we just use library default for now.
|
// support aliasing for h2, but we just use library default for now.
|
||||||
alpn_ = NGHTTP2_CLEARTEXT_PROTO_VERSION_ID;
|
alpn_ = NGHTTP2_CLEARTEXT_PROTO_VERSION_ID;
|
||||||
|
@ -829,7 +731,9 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
|
||||||
"Connection: Upgrade\r\n"
|
"Connection: Upgrade\r\n"
|
||||||
"Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
|
"Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
|
||||||
"\r\n";
|
"\r\n";
|
||||||
wb_.write(res, sizeof(res) - 1);
|
upstream->get_response_buf()->write(res, sizeof(res) - 1);
|
||||||
|
upstream_ = std::move(upstream);
|
||||||
|
|
||||||
signal_write();
|
signal_write();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -932,8 +836,6 @@ void ClientHandler::write_accesslog(int major, int minor, unsigned int status,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; }
|
|
||||||
|
|
||||||
ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
|
ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
|
||||||
|
|
||||||
void ClientHandler::signal_write() { conn_.wlimit.startw(); }
|
void ClientHandler::signal_write() { conn_.wlimit.startw(); }
|
||||||
|
|
|
@ -60,13 +60,11 @@ public:
|
||||||
// Performs clear text I/O
|
// Performs clear text I/O
|
||||||
int read_clear();
|
int read_clear();
|
||||||
int write_clear();
|
int write_clear();
|
||||||
int writev_clear();
|
|
||||||
// Performs TLS handshake
|
// Performs TLS handshake
|
||||||
int tls_handshake();
|
int tls_handshake();
|
||||||
// Performs TLS I/O
|
// Performs TLS I/O
|
||||||
int read_tls();
|
int read_tls();
|
||||||
int write_tls();
|
int write_tls();
|
||||||
int writev_tls();
|
|
||||||
|
|
||||||
int upstream_noop();
|
int upstream_noop();
|
||||||
int upstream_read();
|
int upstream_read();
|
||||||
|
@ -124,10 +122,8 @@ public:
|
||||||
int64_t body_bytes_sent);
|
int64_t body_bytes_sent);
|
||||||
Worker *get_worker() const;
|
Worker *get_worker() const;
|
||||||
|
|
||||||
using WriteBuf = Buffer<32768>;
|
|
||||||
using ReadBuf = Buffer<8_k>;
|
using ReadBuf = Buffer<8_k>;
|
||||||
|
|
||||||
WriteBuf *get_wb();
|
|
||||||
ReadBuf *get_rb();
|
ReadBuf *get_rb();
|
||||||
|
|
||||||
RateLimit *get_rlimit();
|
RateLimit *get_rlimit();
|
||||||
|
@ -153,7 +149,6 @@ private:
|
||||||
// The number of bytes of HTTP/2 client connection header to read
|
// The number of bytes of HTTP/2 client connection header to read
|
||||||
size_t left_connhd_len_;
|
size_t left_connhd_len_;
|
||||||
bool should_close_after_write_;
|
bool should_close_after_write_;
|
||||||
WriteBuf wb_;
|
|
||||||
ReadBuf rb_;
|
ReadBuf rb_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1241,4 +1241,8 @@ bool Downstream::can_detach_downstream_connection() const {
|
||||||
!response_connection_close_;
|
!response_connection_close_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DefaultMemchunks Downstream::pop_response_buf() {
|
||||||
|
return std::move(response_buf_);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -342,6 +342,8 @@ public:
|
||||||
// Returns true if downstream_connection can be detached and reused.
|
// Returns true if downstream_connection can be detached and reused.
|
||||||
bool can_detach_downstream_connection() const;
|
bool can_detach_downstream_connection() const;
|
||||||
|
|
||||||
|
DefaultMemchunks pop_response_buf();
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
EVENT_ERROR = 0x1,
|
EVENT_ERROR = 0x1,
|
||||||
EVENT_TIMEOUT = 0x2,
|
EVENT_TIMEOUT = 0x2,
|
||||||
|
|
|
@ -647,6 +647,57 @@ int on_frame_not_send_callback(nghttp2_session *session,
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
void Http2Upstream::set_pending_data_downstream(Downstream *downstream,
|
||||||
|
size_t n) {
|
||||||
|
pending_data_downstream_ = downstream;
|
||||||
|
data_pendinglen_ = n;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
|
||||||
|
const uint8_t *framehd, size_t length,
|
||||||
|
nghttp2_data_source *source, void *user_data) {
|
||||||
|
auto downstream = static_cast<Downstream *>(source->ptr);
|
||||||
|
auto upstream = static_cast<Http2Upstream *>(downstream->get_upstream());
|
||||||
|
auto body = downstream->get_response_buf();
|
||||||
|
|
||||||
|
auto wb = upstream->get_response_buf();
|
||||||
|
|
||||||
|
if (wb->wleft() < 9) {
|
||||||
|
return NGHTTP2_ERR_WOULDBLOCK;
|
||||||
|
}
|
||||||
|
|
||||||
|
wb->write(framehd, 9);
|
||||||
|
|
||||||
|
auto nwrite = std::min(length, wb->wleft());
|
||||||
|
body->remove(wb->last, nwrite);
|
||||||
|
wb->write(nwrite);
|
||||||
|
if (nwrite < length) {
|
||||||
|
// We must store unsent amount of data to somewhere. We just tell
|
||||||
|
// libnghttp2 that we wrote everything, so downstream could be
|
||||||
|
// deleted. We handle this situation in
|
||||||
|
// Http2Upstream::remove_downstream().
|
||||||
|
upstream->set_pending_data_downstream(downstream, length - nwrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (wb->rleft() == 0) {
|
||||||
|
downstream->disable_upstream_wtimer();
|
||||||
|
} else {
|
||||||
|
downstream->reset_upstream_wtimer();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) {
|
||||||
|
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (length > 0) {
|
||||||
|
downstream->add_response_sent_bodylen(length);
|
||||||
|
}
|
||||||
|
|
||||||
|
return nwrite < length ? NGHTTP2_ERR_PAUSE : 0;
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
|
uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
|
||||||
// NGHTTP2_REFUSED_STREAM is important because it tells upstream
|
// NGHTTP2_REFUSED_STREAM is important because it tells upstream
|
||||||
|
@ -748,6 +799,9 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() {
|
||||||
nghttp2_session_callbacks_set_on_begin_headers_callback(
|
nghttp2_session_callbacks_set_on_begin_headers_callback(
|
||||||
callbacks, on_begin_headers_callback);
|
callbacks, on_begin_headers_callback);
|
||||||
|
|
||||||
|
nghttp2_session_callbacks_set_send_data_callback(callbacks,
|
||||||
|
send_data_callback);
|
||||||
|
|
||||||
if (get_config()->padding) {
|
if (get_config()->padding) {
|
||||||
nghttp2_session_callbacks_set_select_padding_callback(
|
nghttp2_session_callbacks_set_select_padding_callback(
|
||||||
callbacks, http::select_padding_callback);
|
callbacks, http::select_padding_callback);
|
||||||
|
@ -764,8 +818,9 @@ Http2Upstream::Http2Upstream(ClientHandler *handler)
|
||||||
? get_config()->downstream_connections_per_frontend
|
? get_config()->downstream_connections_per_frontend
|
||||||
: 0,
|
: 0,
|
||||||
!get_config()->http2_proxy),
|
!get_config()->http2_proxy),
|
||||||
handler_(handler), session_(nullptr), data_pending_(nullptr),
|
pending_response_buf_(handler->get_worker()->get_mcpool()),
|
||||||
data_pendinglen_(0), shutdown_handled_(false) {
|
pending_data_downstream_(nullptr), handler_(handler), session_(nullptr),
|
||||||
|
data_pending_(nullptr), data_pendinglen_(0), shutdown_handled_(false) {
|
||||||
|
|
||||||
int rv;
|
int rv;
|
||||||
|
|
||||||
|
@ -852,9 +907,8 @@ int Http2Upstream::on_read() {
|
||||||
rlimit->startw();
|
rlimit->startw();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto wb = handler_->get_wb();
|
|
||||||
if (nghttp2_session_want_read(session_) == 0 &&
|
if (nghttp2_session_want_read(session_) == 0 &&
|
||||||
nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) {
|
nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
|
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
|
||||||
}
|
}
|
||||||
|
@ -867,19 +921,46 @@ int Http2Upstream::on_read() {
|
||||||
|
|
||||||
// After this function call, downstream may be deleted.
|
// After this function call, downstream may be deleted.
|
||||||
int Http2Upstream::on_write() {
|
int Http2Upstream::on_write() {
|
||||||
auto wb = handler_->get_wb();
|
if (wb_.rleft() == 0) {
|
||||||
|
wb_.reset();
|
||||||
|
}
|
||||||
|
|
||||||
if (data_pending_) {
|
if (data_pendinglen_ > 0) {
|
||||||
auto n = std::min(wb->wleft(), data_pendinglen_);
|
if (data_pending_) {
|
||||||
wb->write(data_pending_, n);
|
auto n = std::min(wb_.wleft(), data_pendinglen_);
|
||||||
if (n < data_pendinglen_) {
|
wb_.write(data_pending_, n);
|
||||||
data_pending_ += n;
|
data_pending_ += n;
|
||||||
data_pendinglen_ -= n;
|
data_pendinglen_ -= n;
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
data_pending_ = nullptr;
|
if (data_pendinglen_ > 0) {
|
||||||
data_pendinglen_ = 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
data_pending_ = nullptr;
|
||||||
|
} else {
|
||||||
|
auto n = std::min(wb_.wleft(), data_pendinglen_);
|
||||||
|
DefaultMemchunks *body;
|
||||||
|
if (pending_data_downstream_) {
|
||||||
|
body = pending_data_downstream_->get_response_buf();
|
||||||
|
} else {
|
||||||
|
body = &pending_response_buf_;
|
||||||
|
}
|
||||||
|
body->remove(wb_.last, n);
|
||||||
|
wb_.write(n);
|
||||||
|
data_pendinglen_ -= n;
|
||||||
|
|
||||||
|
if (data_pendinglen_ > 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pending_data_downstream_) {
|
||||||
|
pending_data_downstream_ = nullptr;
|
||||||
|
} else {
|
||||||
|
// Downstream was already deleted, and we don't need its
|
||||||
|
// response data.
|
||||||
|
body->reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
@ -894,7 +975,7 @@ int Http2Upstream::on_write() {
|
||||||
if (datalen == 0) {
|
if (datalen == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
auto n = wb->write(data, datalen);
|
auto n = wb_.write(data, datalen);
|
||||||
if (n < static_cast<decltype(n)>(datalen)) {
|
if (n < static_cast<decltype(n)>(datalen)) {
|
||||||
data_pending_ = data + n;
|
data_pending_ = data + n;
|
||||||
data_pendinglen_ = datalen - n;
|
data_pendinglen_ = datalen - n;
|
||||||
|
@ -903,7 +984,7 @@ int Http2Upstream::on_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nghttp2_session_want_read(session_) == 0 &&
|
if (nghttp2_session_want_read(session_) == 0 &&
|
||||||
nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) {
|
nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
|
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
|
||||||
}
|
}
|
||||||
|
@ -1117,8 +1198,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto nread = body->remove(buf, length);
|
auto nread = std::min(body->rleft(), length);
|
||||||
auto body_empty = body->rleft() == 0;
|
auto body_empty = body->rleft() == nread;
|
||||||
|
|
||||||
|
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
|
||||||
|
|
||||||
if (body_empty &&
|
if (body_empty &&
|
||||||
downstream->get_response_state() == Downstream::MSG_COMPLETE) {
|
downstream->get_response_state() == Downstream::MSG_COMPLETE) {
|
||||||
|
@ -1146,24 +1229,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (body_empty) {
|
|
||||||
downstream->disable_upstream_wtimer();
|
|
||||||
} else {
|
|
||||||
downstream->reset_upstream_wtimer();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) {
|
|
||||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
|
if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
|
||||||
return NGHTTP2_ERR_DEFERRED;
|
return NGHTTP2_ERR_DEFERRED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nread > 0) {
|
|
||||||
downstream->add_response_sent_bodylen(nread);
|
|
||||||
}
|
|
||||||
|
|
||||||
return nread;
|
return nread;
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
@ -1274,6 +1343,11 @@ void Http2Upstream::remove_downstream(Downstream *downstream) {
|
||||||
nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
|
nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
|
||||||
nullptr);
|
nullptr);
|
||||||
|
|
||||||
|
if (downstream == pending_data_downstream_) {
|
||||||
|
pending_data_downstream_ = nullptr;
|
||||||
|
pending_response_buf_ = downstream->pop_response_buf();
|
||||||
|
}
|
||||||
|
|
||||||
auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
|
auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
|
||||||
|
|
||||||
if (next_downstream) {
|
if (next_downstream) {
|
||||||
|
@ -1755,4 +1829,21 @@ int Http2Upstream::initiate_push(Downstream *downstream, const char *uri,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
||||||
|
if (iovcnt == 0 || wb_.rleft() == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
iov->iov_base = wb_.pos;
|
||||||
|
iov->iov_len = wb_.rleft();
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
|
||||||
|
|
||||||
|
bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
|
||||||
|
|
||||||
|
Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; }
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
#include "shrpx_upstream.h"
|
#include "shrpx_upstream.h"
|
||||||
#include "shrpx_downstream_queue.h"
|
#include "shrpx_downstream_queue.h"
|
||||||
#include "memchunk.h"
|
#include "memchunk.h"
|
||||||
|
#include "buffer.h"
|
||||||
|
|
||||||
using namespace nghttp2;
|
using namespace nghttp2;
|
||||||
|
|
||||||
|
@ -82,6 +83,9 @@ public:
|
||||||
size_t bodylen);
|
size_t bodylen);
|
||||||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||||
size_t len);
|
size_t len);
|
||||||
|
virtual int response_riovec(struct iovec *iov, int iovcnt) const;
|
||||||
|
virtual void response_drain(size_t n);
|
||||||
|
virtual bool response_empty() const;
|
||||||
|
|
||||||
bool get_flow_control() const;
|
bool get_flow_control() const;
|
||||||
// Perform HTTP/2 upgrade from |upstream|. On success, this object
|
// Perform HTTP/2 upgrade from |upstream|. On success, this object
|
||||||
|
@ -106,15 +110,39 @@ public:
|
||||||
|
|
||||||
int on_request_headers(Downstream *downstream, const nghttp2_frame *frame);
|
int on_request_headers(Downstream *downstream, const nghttp2_frame *frame);
|
||||||
|
|
||||||
|
using WriteBuffer = Buffer<32_k>;
|
||||||
|
|
||||||
|
WriteBuffer *get_response_buf();
|
||||||
|
|
||||||
|
void set_pending_data_downstream(Downstream *downstream, size_t n);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
WriteBuffer wb_;
|
||||||
std::unique_ptr<HttpsUpstream> pre_upstream_;
|
std::unique_ptr<HttpsUpstream> pre_upstream_;
|
||||||
DownstreamQueue downstream_queue_;
|
DownstreamQueue downstream_queue_;
|
||||||
ev_timer settings_timer_;
|
ev_timer settings_timer_;
|
||||||
ev_timer shutdown_timer_;
|
ev_timer shutdown_timer_;
|
||||||
ev_prepare prep_;
|
ev_prepare prep_;
|
||||||
|
// A response buffer used to belong to Downstream object. This is
|
||||||
|
// moved here when response is partially written to wb_ in
|
||||||
|
// send_data_callback, but before writing them all, Downstream
|
||||||
|
// object was destroyed. On destruction of Downstream,
|
||||||
|
// pending_data_downstream_ becomes nullptr.
|
||||||
|
DefaultMemchunks pending_response_buf_;
|
||||||
|
// Downstream object whose DATA frame payload is partillay written
|
||||||
|
// to wb_ in send_data_callback. This field exists to keep track of
|
||||||
|
// its lifetime. When it is destroyed, its response buffer is
|
||||||
|
// transferred to pending_response_buf_, and this field becomes
|
||||||
|
// nullptr.
|
||||||
|
Downstream *pending_data_downstream_;
|
||||||
ClientHandler *handler_;
|
ClientHandler *handler_;
|
||||||
nghttp2_session *session_;
|
nghttp2_session *session_;
|
||||||
const uint8_t *data_pending_;
|
const uint8_t *data_pending_;
|
||||||
|
// The length of lending data to be written into wb_. If
|
||||||
|
// data_pending_ is not nullptr, data_pending_ points to the data to
|
||||||
|
// write. Otherwise, pending_data_downstream_->get_response_buf()
|
||||||
|
// if pending_data_downstream_ is not nullptr, or
|
||||||
|
// pending_response_buf_ holds data to write.
|
||||||
size_t data_pendinglen_;
|
size_t data_pendinglen_;
|
||||||
bool flow_control_;
|
bool flow_control_;
|
||||||
bool shutdown_handled_;
|
bool shutdown_handled_;
|
||||||
|
|
|
@ -455,7 +455,7 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
|
||||||
|
|
||||||
int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
|
int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
|
||||||
size_t consumed) {
|
size_t consumed) {
|
||||||
if (!downstream_->response_buf_full()) {
|
if (downstream_->get_response_buf()->rleft() == 0) {
|
||||||
ioctrl_.resume_read(reason);
|
ioctrl_.resume_read(reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1160,12 +1160,34 @@ int HttpsUpstream::initiate_push(Downstream *downstream, const char *uri,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultMemchunks *HttpsUpstream::get_response_buf() const {
|
int HttpsUpstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
||||||
if (!downstream_) {
|
if (!downstream_) {
|
||||||
return nullptr;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return downstream_->get_response_buf();
|
auto buf = downstream_->get_response_buf();
|
||||||
|
|
||||||
|
return buf->riovec(iov, iovcnt);
|
||||||
|
}
|
||||||
|
|
||||||
|
void HttpsUpstream::response_drain(size_t n) {
|
||||||
|
if (!downstream_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto buf = downstream_->get_response_buf();
|
||||||
|
|
||||||
|
buf->drain(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HttpsUpstream::response_empty() const {
|
||||||
|
if (!downstream_) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto buf = downstream_->get_response_buf();
|
||||||
|
|
||||||
|
return buf->rleft() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -78,7 +78,9 @@ public:
|
||||||
size_t bodylen);
|
size_t bodylen);
|
||||||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||||
size_t len);
|
size_t len);
|
||||||
virtual DefaultMemchunks *get_response_buf() const;
|
virtual int response_riovec(struct iovec *iov, int iovcnt) const;
|
||||||
|
virtual void response_drain(size_t n);
|
||||||
|
virtual bool response_empty() const;
|
||||||
|
|
||||||
void reset_current_header_length();
|
void reset_current_header_length();
|
||||||
void log_response_headers(DefaultMemchunks *buf) const;
|
void log_response_headers(DefaultMemchunks *buf) const;
|
||||||
|
|
|
@ -53,8 +53,7 @@ namespace {
|
||||||
ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len,
|
ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len,
|
||||||
int flags, void *user_data) {
|
int flags, void *user_data) {
|
||||||
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
||||||
auto handler = upstream->get_client_handler();
|
auto wb = upstream->get_response_buf();
|
||||||
auto wb = handler->get_wb();
|
|
||||||
|
|
||||||
if (wb->wleft() == 0) {
|
if (wb->wleft() == 0) {
|
||||||
return SPDYLAY_ERR_WOULDBLOCK;
|
return SPDYLAY_ERR_WOULDBLOCK;
|
||||||
|
@ -555,6 +554,10 @@ int SpdyUpstream::on_read() {
|
||||||
int SpdyUpstream::on_write() {
|
int SpdyUpstream::on_write() {
|
||||||
int rv = 0;
|
int rv = 0;
|
||||||
|
|
||||||
|
if (wb_.rleft() == 0) {
|
||||||
|
wb_.reset();
|
||||||
|
}
|
||||||
|
|
||||||
rv = spdylay_session_send(session_);
|
rv = spdylay_session_send(session_);
|
||||||
if (rv != 0) {
|
if (rv != 0) {
|
||||||
ULOG(ERROR, this) << "spdylay_session_send() returned error: "
|
ULOG(ERROR, this) << "spdylay_session_send() returned error: "
|
||||||
|
@ -563,8 +566,7 @@ int SpdyUpstream::on_write() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (spdylay_session_want_read(session_) == 0 &&
|
if (spdylay_session_want_read(session_) == 0 &&
|
||||||
spdylay_session_want_write(session_) == 0 &&
|
spdylay_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
||||||
handler_->get_wb()->rleft() == 0) {
|
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
ULOG(INFO, this) << "No more read/write for this SPDY session";
|
ULOG(INFO, this) << "No more read/write for this SPDY session";
|
||||||
}
|
}
|
||||||
|
@ -1213,4 +1215,21 @@ int SpdyUpstream::initiate_push(Downstream *downstream, const char *uri,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int SpdyUpstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
||||||
|
if (iovcnt == 0 || wb_.rleft() == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
iov->iov_base = wb_.pos;
|
||||||
|
iov->iov_len = wb_.rleft();
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SpdyUpstream::response_drain(size_t n) { wb_.drain(n); }
|
||||||
|
|
||||||
|
bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; }
|
||||||
|
|
||||||
|
SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; }
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
#include "shrpx_upstream.h"
|
#include "shrpx_upstream.h"
|
||||||
#include "shrpx_downstream_queue.h"
|
#include "shrpx_downstream_queue.h"
|
||||||
#include "memchunk.h"
|
#include "memchunk.h"
|
||||||
#include "util.h"
|
#include "buffer.h"
|
||||||
|
|
||||||
namespace shrpx {
|
namespace shrpx {
|
||||||
|
|
||||||
|
@ -78,6 +78,9 @@ public:
|
||||||
size_t bodylen);
|
size_t bodylen);
|
||||||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||||
size_t len);
|
size_t len);
|
||||||
|
virtual int response_riovec(struct iovec *iov, int iovcnt) const;
|
||||||
|
virtual void response_drain(size_t n);
|
||||||
|
virtual bool response_empty() const;
|
||||||
|
|
||||||
bool get_flow_control() const;
|
bool get_flow_control() const;
|
||||||
|
|
||||||
|
@ -86,7 +89,12 @@ public:
|
||||||
void start_downstream(Downstream *downstream);
|
void start_downstream(Downstream *downstream);
|
||||||
void initiate_downstream(Downstream *downstream);
|
void initiate_downstream(Downstream *downstream);
|
||||||
|
|
||||||
|
using WriteBuffer = Buffer<32_k>;
|
||||||
|
|
||||||
|
WriteBuffer *get_response_buf();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
WriteBuffer wb_;
|
||||||
DownstreamQueue downstream_queue_;
|
DownstreamQueue downstream_queue_;
|
||||||
ClientHandler *handler_;
|
ClientHandler *handler_;
|
||||||
spdylay_session *session_;
|
spdylay_session *session_;
|
||||||
|
|
|
@ -71,9 +71,11 @@ public:
|
||||||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||||
size_t len) = 0;
|
size_t len) = 0;
|
||||||
|
|
||||||
// Returns response buffer of Downstream directly. This exists for
|
// Fills response data in |iov| whose capacity is |iovcnt|. Returns
|
||||||
// optimization purpose for cleartext HttpsUpstream.
|
// the number of iovs filled.
|
||||||
virtual DefaultMemchunks *get_response_buf() const { return nullptr; }
|
virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0;
|
||||||
|
virtual void response_drain(size_t n) = 0;
|
||||||
|
virtual bool response_empty() const = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
Loading…
Reference in New Issue