Fix stream reset if data from client is arrived before dconn is attached

This commit is contained in:
Tatsuhiro Tsujikawa 2018-08-22 22:32:25 +09:00
parent 11d822c2a7
commit 9d5b781df6
7 changed files with 92 additions and 3 deletions

View File

@ -251,6 +251,29 @@ template <typename Memchunk> struct Memchunks {
return count - left;
}
size_t remove(Memchunks &dest) {
assert(pool == dest.pool);
if (head == nullptr) {
return 0;
}
auto n = len;
if (dest.tail == nullptr) {
dest.head = head;
} else {
dest.tail->next = head;
}
dest.tail = tail;
dest.len += len;
head = tail = nullptr;
len = 0;
return n;
}
size_t drain(size_t count) {
auto ndata = count;
auto m = head;

View File

@ -121,6 +121,7 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
req_(balloc_),
resp_(balloc_),
request_start_time_(std::chrono::high_resolution_clock::now()),
blocked_request_buf_(mcpool),
request_buf_(mcpool),
response_buf_(mcpool),
upstream_(upstream),
@ -142,7 +143,8 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
request_pending_(false),
request_header_sent_(false),
accesslog_written_(false),
new_affinity_cookie_(false) {
new_affinity_cookie_(false),
blocked_request_data_eof_(false) {
auto &timeoutconf = get_config()->http2.timeout;
@ -605,6 +607,12 @@ int Downstream::push_request_headers() {
int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
req_.recv_body_length += datalen;
if (!request_header_sent_) {
blocked_request_buf_.append(data, datalen);
req_.unconsumed_body_length += datalen;
return 0;
}
// Assumes that request headers have already been pushed to output
// buffer using push_request_headers().
if (!dconn_) {
@ -621,6 +629,10 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) {
}
int Downstream::end_upload_data() {
if (!request_header_sent_) {
blocked_request_data_eof_ = true;
return 0;
}
if (!dconn_) {
DLOG(INFO, this) << "dconn_ is NULL";
return -1;
@ -1052,4 +1064,12 @@ uint32_t Downstream::get_affinity_cookie_to_send() const {
return 0;
}
DefaultMemchunks *Downstream::get_blocked_request_buf() {
return &blocked_request_buf_;
}
bool Downstream::get_blocked_request_data_eof() const {
return blocked_request_data_eof_;
}
} // namespace shrpx

View File

@ -356,6 +356,9 @@ public:
// get_request_pending() returns false.
bool request_submission_ready() const;
DefaultMemchunks *get_blocked_request_buf();
bool get_blocked_request_data_eof() const;
// downstream response API
const Response &response() const { return resp_; }
Response &response() { return resp_; }
@ -491,6 +494,9 @@ private:
// or not.
StringRef request_downstream_host_;
// Data arrived in frontend before sending header fields to backend
// are stored in this buffer.
DefaultMemchunks blocked_request_buf_;
DefaultMemchunks request_buf_;
DefaultMemchunks response_buf_;
@ -547,6 +553,9 @@ private:
bool accesslog_written_;
// true if affinity cookie is generated for this request.
bool new_affinity_cookie_;
// true if eof is received from client before sending header fields
// to backend.
bool blocked_request_data_eof_;
};
} // namespace shrpx

View File

@ -1468,6 +1468,15 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
if (frame->hd.type == NGHTTP2_HEADERS &&
frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
downstream->set_request_header_sent(true);
auto src = downstream->get_blocked_request_buf();
if (src->rleft()) {
auto dest = downstream->get_request_buf();
src->remove(*dest);
if (http2session->resume_data(sd->dconn) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
downstream->ensure_downstream_wtimer();
}
}
if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {

View File

@ -590,7 +590,7 @@ int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
auto downstream = static_cast<Downstream *>(
nghttp2_session_get_stream_user_data(session, stream_id));
if (!downstream || !downstream->get_downstream_connection()) {
if (!downstream) {
if (upstream->consume(stream_id, len) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}

View File

@ -693,11 +693,37 @@ int HttpDownstreamConnection::push_request_headers() {
// Don't call signal_write() if we anticipate request body. We call
// signal_write() when we received request body chunk, and it
// enables us to send headers and data in one writev system call.
if (connect_method ||
if (connect_method || downstream_->get_blocked_request_buf()->rleft() ||
(!req.http2_expect_body && req.fs.content_length == 0)) {
signal_write();
}
return process_blocked_request_buf();
}
int HttpDownstreamConnection::process_blocked_request_buf() {
auto src = downstream_->get_blocked_request_buf();
if (src->rleft()) {
auto dest = downstream_->get_request_buf();
auto chunked = downstream_->get_chunked_request();
if (chunked) {
auto chunk_size_hex = util::utox(src->rleft());
dest->append(chunk_size_hex);
dest->append("\r\n");
}
src->remove(*dest);
if (chunked) {
dest->append("\r\n");
}
}
if (downstream_->get_blocked_request_data_eof()) {
return end_upload_data();
}
return 0;
}

View File

@ -89,6 +89,8 @@ public:
int noop();
int process_blocked_request_buf();
private:
Connection conn_;
std::function<int(HttpDownstreamConnection &)> on_read_, on_write_,