nghttpx: Cancel backend request when frontend HTTP/1 connection is lost
This commit is contained in:
parent
a473641e3f
commit
011e3b325d
|
@ -107,14 +107,15 @@ int ClientHandler::read_clear() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
// we should process buffered data first before we read EOF.
|
|
||||||
if (rb_.rleft() && on_read() != 0) {
|
if (rb_.rleft() && on_read() != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (rb_.rleft()) {
|
if (rb_.rleft() == 0) {
|
||||||
|
rb_.reset();
|
||||||
|
} else if (rb_.wleft() == 0) {
|
||||||
|
conn_.rlimit.stopw();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
rb_.reset();
|
|
||||||
|
|
||||||
auto nread = conn_.read_clear(rb_.last, rb_.wleft());
|
auto nread = conn_.read_clear(rb_.last, rb_.wleft());
|
||||||
|
|
||||||
|
@ -199,10 +200,12 @@ int ClientHandler::read_tls() {
|
||||||
if (rb_.rleft() && on_read() != 0) {
|
if (rb_.rleft() && on_read() != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (rb_.rleft()) {
|
if (rb_.rleft() == 0) {
|
||||||
|
rb_.reset();
|
||||||
|
} else if (rb_.wleft() == 0) {
|
||||||
|
conn_.rlimit.stopw();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
rb_.reset();
|
|
||||||
|
|
||||||
auto nread = conn_.read_tls(rb_.last, rb_.wleft());
|
auto nread = conn_.read_tls(rb_.last, rb_.wleft());
|
||||||
|
|
||||||
|
@ -292,6 +295,7 @@ int ClientHandler::upstream_http2_connhd_read() {
|
||||||
|
|
||||||
left_connhd_len_ -= nread;
|
left_connhd_len_ -= nread;
|
||||||
rb_.drain(nread);
|
rb_.drain(nread);
|
||||||
|
conn_.rlimit.startw();
|
||||||
|
|
||||||
if (left_connhd_len_ == 0) {
|
if (left_connhd_len_ == 0) {
|
||||||
on_read_ = &ClientHandler::upstream_read;
|
on_read_ = &ClientHandler::upstream_read;
|
||||||
|
@ -330,6 +334,7 @@ int ClientHandler::upstream_http1_connhd_read() {
|
||||||
|
|
||||||
left_connhd_len_ -= nread;
|
left_connhd_len_ -= nread;
|
||||||
rb_.drain(nread);
|
rb_.drain(nread);
|
||||||
|
conn_.rlimit.startw();
|
||||||
|
|
||||||
if (left_connhd_len_ == 0) {
|
if (left_connhd_len_ == 0) {
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
|
|
@ -842,7 +842,7 @@ int on_response_headers(Http2Session *http2session, Downstream *downstream,
|
||||||
if (downstream->get_upgraded()) {
|
if (downstream->get_upgraded()) {
|
||||||
downstream->set_response_connection_close(true);
|
downstream->set_response_connection_close(true);
|
||||||
// On upgrade sucess, both ends can send data
|
// On upgrade sucess, both ends can send data
|
||||||
if (upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) {
|
if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) {
|
||||||
// If resume_read fails, just drop connection. Not ideal.
|
// If resume_read fails, just drop connection. Not ideal.
|
||||||
delete upstream->get_client_handler();
|
delete upstream->get_client_handler();
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -811,6 +811,7 @@ Http2Upstream::~Http2Upstream() {
|
||||||
int Http2Upstream::on_read() {
|
int Http2Upstream::on_read() {
|
||||||
ssize_t rv = 0;
|
ssize_t rv = 0;
|
||||||
auto rb = handler_->get_rb();
|
auto rb = handler_->get_rb();
|
||||||
|
auto rlimit = handler_->get_rlimit();
|
||||||
|
|
||||||
if (rb->rleft()) {
|
if (rb->rleft()) {
|
||||||
rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft());
|
rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft());
|
||||||
|
@ -826,6 +827,7 @@ int Http2Upstream::on_read() {
|
||||||
// success.
|
// success.
|
||||||
assert(static_cast<size_t>(rv) == rb->rleft());
|
assert(static_cast<size_t>(rv) == rb->rleft());
|
||||||
rb->reset();
|
rb->reset();
|
||||||
|
rlimit->startw();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto wb = handler_->get_wb();
|
auto wb = handler_->get_wb();
|
||||||
|
|
|
@ -526,7 +526,7 @@ int htp_hdrs_completecb(http_parser *htp) {
|
||||||
|
|
||||||
if (downstream->get_upgraded()) {
|
if (downstream->get_upgraded()) {
|
||||||
// Upgrade complete, read until EOF in both ends
|
// Upgrade complete, read until EOF in both ends
|
||||||
if (upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) {
|
if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
downstream->set_request_state(Downstream::HEADER_COMPLETE);
|
downstream->set_request_state(Downstream::HEADER_COMPLETE);
|
||||||
|
|
|
@ -253,6 +253,7 @@ http_parser_settings htp_hooks = {
|
||||||
// one http request is fully received.
|
// one http request is fully received.
|
||||||
int HttpsUpstream::on_read() {
|
int HttpsUpstream::on_read() {
|
||||||
auto rb = handler_->get_rb();
|
auto rb = handler_->get_rb();
|
||||||
|
auto rlimit = handler_->get_rlimit();
|
||||||
auto downstream = get_downstream();
|
auto downstream = get_downstream();
|
||||||
|
|
||||||
if (rb->rleft() == 0) {
|
if (rb->rleft() == 0) {
|
||||||
|
@ -270,6 +271,7 @@ int HttpsUpstream::on_read() {
|
||||||
}
|
}
|
||||||
|
|
||||||
rb->reset();
|
rb->reset();
|
||||||
|
rlimit->startw();
|
||||||
|
|
||||||
if (downstream->request_buf_full()) {
|
if (downstream->request_buf_full()) {
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
@ -283,10 +285,22 @@ int HttpsUpstream::on_read() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (downstream) {
|
||||||
|
// To avoid reading next pipelined request
|
||||||
|
switch (downstream->get_request_state()) {
|
||||||
|
case Downstream::INITIAL:
|
||||||
|
case Downstream::HEADER_COMPLETE:
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto nread = http_parser_execute(
|
auto nread = http_parser_execute(
|
||||||
&htp_, &htp_hooks, reinterpret_cast<const char *>(rb->pos), rb->rleft());
|
&htp_, &htp_hooks, reinterpret_cast<const char *>(rb->pos), rb->rleft());
|
||||||
|
|
||||||
rb->drain(nread);
|
rb->drain(nread);
|
||||||
|
rlimit->startw();
|
||||||
|
|
||||||
// Well, actually header length + some body bytes
|
// Well, actually header length + some body bytes
|
||||||
current_header_length_ += nread;
|
current_header_length_ += nread;
|
||||||
|
@ -303,8 +317,6 @@ int HttpsUpstream::on_read() {
|
||||||
assert(downstream);
|
assert(downstream);
|
||||||
|
|
||||||
if (downstream->get_request_state() == Downstream::CONNECT_FAIL) {
|
if (downstream->get_request_state() == Downstream::CONNECT_FAIL) {
|
||||||
// Following paues_read is needed to avoid reading next data.
|
|
||||||
pause_read(SHRPX_MSG_BLOCK);
|
|
||||||
error_reply(503);
|
error_reply(503);
|
||||||
handler_->signal_write();
|
handler_->signal_write();
|
||||||
// Downstream gets deleted after response body is read.
|
// Downstream gets deleted after response body is read.
|
||||||
|
@ -333,8 +345,6 @@ int HttpsUpstream::on_read() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pause_read(SHRPX_MSG_BLOCK);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,8 +355,6 @@ int HttpsUpstream::on_read() {
|
||||||
<< http_errno_description(htperr);
|
<< http_errno_description(htperr);
|
||||||
}
|
}
|
||||||
|
|
||||||
pause_read(SHRPX_MSG_BLOCK);
|
|
||||||
|
|
||||||
unsigned int status_code;
|
unsigned int status_code;
|
||||||
|
|
||||||
if (downstream &&
|
if (downstream &&
|
||||||
|
@ -423,7 +431,7 @@ int HttpsUpstream::on_write() {
|
||||||
// We need this if response ends before request.
|
// We need this if response ends before request.
|
||||||
if (downstream->get_request_state() == Downstream::MSG_COMPLETE) {
|
if (downstream->get_request_state() == Downstream::MSG_COMPLETE) {
|
||||||
delete_downstream();
|
delete_downstream();
|
||||||
return resume_read(SHRPX_MSG_BLOCK, nullptr, 0);
|
return resume_read(SHRPX_NO_BUFFER, nullptr, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,7 +449,7 @@ void HttpsUpstream::pause_read(IOCtrlReason reason) {
|
||||||
|
|
||||||
int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream,
|
int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream,
|
||||||
size_t consumed) {
|
size_t consumed) {
|
||||||
// downstream could be nullptr if reason is SHRPX_MSG_BLOCK.
|
// downstream could be nullptr
|
||||||
if (downstream && downstream->request_buf_full()) {
|
if (downstream && downstream->request_buf_full()) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,6 +68,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len,
|
||||||
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
||||||
auto handler = upstream->get_client_handler();
|
auto handler = upstream->get_client_handler();
|
||||||
auto rb = handler->get_rb();
|
auto rb = handler->get_rb();
|
||||||
|
auto rlimit = handler->get_rlimit();
|
||||||
|
|
||||||
if (rb->rleft() == 0) {
|
if (rb->rleft() == 0) {
|
||||||
return SPDYLAY_ERR_WOULDBLOCK;
|
return SPDYLAY_ERR_WOULDBLOCK;
|
||||||
|
@ -77,6 +78,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len,
|
||||||
|
|
||||||
memcpy(buf, rb->pos, nread);
|
memcpy(buf, rb->pos, nread);
|
||||||
rb->drain(nread);
|
rb->drain(nread);
|
||||||
|
rlimit->startw();
|
||||||
|
|
||||||
return nread;
|
return nread;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue