diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 14c01498..69394c98 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -33,6 +33,7 @@ #include "shrpx_config.h" #include "shrpx_error.h" #include "shrpx_downstream_connection.h" +#include "shrpx_downstream_queue.h" #include "util.h" #include "http2.h" @@ -106,24 +107,27 @@ void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { // upstream could be nullptr for unittests Downstream::Downstream(Upstream *upstream, int32_t stream_id, int32_t priority) - : request_start_time_(std::chrono::high_resolution_clock::now()), + : dlnext(nullptr), dlprev(nullptr), + request_start_time_(std::chrono::high_resolution_clock::now()), request_buf_(upstream ? upstream->get_mcpool() : nullptr), response_buf_(upstream ? upstream->get_mcpool() : nullptr), request_bodylen_(0), response_bodylen_(0), response_sent_bodylen_(0), request_content_length_(-1), response_content_length_(-1), - upstream_(upstream), request_headers_sum_(0), response_headers_sum_(0), - request_datalen_(0), response_datalen_(0), num_retry_(0), - stream_id_(stream_id), priority_(priority), downstream_stream_id_(-1), + upstream_(upstream), blocked_link_(nullptr), request_headers_sum_(0), + response_headers_sum_(0), request_datalen_(0), response_datalen_(0), + num_retry_(0), stream_id_(stream_id), priority_(priority), + downstream_stream_id_(-1), response_rst_stream_error_code_(NGHTTP2_NO_ERROR), request_state_(INITIAL), request_major_(1), request_minor_(1), response_state_(INITIAL), response_http_status_(0), response_major_(1), - response_minor_(1), upgrade_request_(false), upgraded_(false), - http2_upgrade_seen_(false), chunked_request_(false), - request_connection_close_(false), request_header_key_prev_(false), - request_trailer_key_prev_(false), request_http2_expect_body_(false), - chunked_response_(false), response_connection_close_(false), - response_header_key_prev_(false), response_trailer_key_prev_(false), - expect_final_response_(false), request_pending_(false) { + response_minor_(1), dispatch_state_(DISPATCH_NONE), + upgrade_request_(false), upgraded_(false), http2_upgrade_seen_(false), + chunked_request_(false), request_connection_close_(false), + request_header_key_prev_(false), request_trailer_key_prev_(false), + request_http2_expect_body_(false), chunked_response_(false), + response_connection_close_(false), response_header_key_prev_(false), + response_trailer_key_prev_(false), expect_final_response_(false), + request_pending_(false) { ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0., get_config()->stream_read_timeout); @@ -148,6 +152,10 @@ Downstream::~Downstream() { DLOG(INFO, this) << "Deleting"; } + if (blocked_link_) { + detach_blocked_link(blocked_link_); + } + // check nullptr for unittest if (upstream_) { auto loop = upstream_->get_client_handler()->get_loop(); @@ -1176,4 +1184,23 @@ bool Downstream::request_submission_ready() const { request_pending_ && response_state_ == Downstream::INITIAL; } +int Downstream::get_dispatch_state() const { return dispatch_state_; } + +void Downstream::set_dispatch_state(int s) { dispatch_state_ = s; } + +void Downstream::attach_blocked_link(BlockedLink *l) { + assert(!blocked_link_); + + l->downstream = this; + blocked_link_ = l; +} + +void Downstream::detach_blocked_link(BlockedLink *l) { + assert(blocked_link_); + assert(l->downstream == this); + + l->downstream = nullptr; + blocked_link_ = nullptr; +} + } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index e1a51f4c..cf22162d 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -48,6 +48,7 @@ namespace shrpx { class Upstream; class DownstreamConnection; +struct BlockedLink; class Downstream { public: @@ -319,11 +320,27 @@ public: // true if retry attempt should not be done. bool no_more_retry() const; + int get_dispatch_state() const; + void set_dispatch_state(int s); + + void attach_blocked_link(BlockedLink *l); + void detach_blocked_link(BlockedLink *l); + enum { EVENT_ERROR = 0x1, EVENT_TIMEOUT = 0x2, }; + enum { + DISPATCH_NONE, + DISPATCH_PENDING, + DISPATCH_BLOCKED, + DISPATCH_ACTIVE, + DISPATCH_FAILURE, + }; + + Downstream *dlnext, *dlprev; + private: Headers request_headers_; Headers response_headers_; @@ -370,6 +387,9 @@ private: Upstream *upstream_; std::unique_ptr dconn_; + // only used by HTTP/2 or SPDY upstream + BlockedLink *blocked_link_; + size_t request_headers_sum_; size_t response_headers_sum_; @@ -396,6 +416,9 @@ private: int response_major_; int response_minor_; + // only used by HTTP/2 or SPDY upstream + int dispatch_state_; + http2::HeaderIndex request_hdidx_; http2::HeaderIndex response_hdidx_; diff --git a/src/shrpx_downstream_queue.cc b/src/shrpx_downstream_queue.cc index 1393aafa..32ff4e68 100644 --- a/src/shrpx_downstream_queue.cc +++ b/src/shrpx_downstream_queue.cc @@ -39,16 +39,21 @@ DownstreamQueue::DownstreamQueue(size_t conn_max_per_host, bool unified_host) : conn_max_per_host), unified_host_(unified_host) {} -DownstreamQueue::~DownstreamQueue() {} - -void DownstreamQueue::add_pending(std::unique_ptr downstream) { - auto stream_id = downstream->get_stream_id(); - pending_downstreams_[stream_id] = std::move(downstream); +DownstreamQueue::~DownstreamQueue() { + dlist_delete_all(downstreams_); + for (auto &p : host_entries_) { + auto &ent = p.second; + dlist_delete_all(ent.blocked); + } } -void DownstreamQueue::add_failure(std::unique_ptr downstream) { - auto stream_id = downstream->get_stream_id(); - failure_downstreams_[stream_id] = std::move(downstream); +void DownstreamQueue::add_pending(std::unique_ptr downstream) { + downstream->set_dispatch_state(Downstream::DISPATCH_PENDING); + downstreams_.append(downstream.release()); +} + +void DownstreamQueue::mark_failure(Downstream *downstream) { + downstream->set_dispatch_state(Downstream::DISPATCH_FAILURE); } DownstreamQueue::HostEntry & @@ -76,19 +81,21 @@ DownstreamQueue::make_host_key(Downstream *downstream) const { return make_host_key(downstream->get_request_http2_authority()); } -void DownstreamQueue::add_active(std::unique_ptr downstream) { - auto &ent = find_host_entry(make_host_key(downstream.get())); +void DownstreamQueue::mark_active(Downstream *downstream) { + auto &ent = find_host_entry(make_host_key(downstream)); ++ent.num_active; - auto stream_id = downstream->get_stream_id(); - active_downstreams_[stream_id] = std::move(downstream); + downstream->set_dispatch_state(Downstream::DISPATCH_ACTIVE); } -void DownstreamQueue::add_blocked(std::unique_ptr downstream) { - auto &ent = find_host_entry(make_host_key(downstream.get())); - auto stream_id = downstream->get_stream_id(); - ent.blocked.insert(stream_id); - blocked_downstreams_[stream_id] = std::move(downstream); +void DownstreamQueue::mark_blocked(Downstream *downstream) { + auto &ent = find_host_entry(make_host_key(downstream)); + + downstream->set_dispatch_state(Downstream::DISPATCH_BLOCKED); + + auto link = new BlockedLink{}; + downstream->attach_blocked_link(link); + ent.blocked.append(link); } bool DownstreamQueue::can_activate(const std::string &host) const { @@ -100,16 +107,6 @@ bool DownstreamQueue::can_activate(const std::string &host) const { return ent.num_active < conn_max_per_host_; } -namespace { -std::unique_ptr -pop_downstream(DownstreamQueue::DownstreamMap::iterator i, - DownstreamQueue::DownstreamMap &downstreams) { - auto downstream = std::move((*i).second); - downstreams.erase(i); - return downstream; -} -} // namespace - namespace { bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent, DownstreamQueue::HostEntryMap &host_entries, @@ -122,106 +119,49 @@ bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent, } } // namespace -std::unique_ptr DownstreamQueue::pop_pending(int32_t stream_id) { - auto itr = pending_downstreams_.find(stream_id); - if (itr == std::end(pending_downstreams_)) { +Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) { + // Delete downstream when this function returns. + auto delptr = std::unique_ptr(downstream); + + if (downstream->get_dispatch_state() != Downstream::DISPATCH_ACTIVE) { + assert(downstream->get_dispatch_state() != Downstream::DISPATCH_NONE); + downstreams_.remove(downstream); return nullptr; } - return pop_downstream(itr, pending_downstreams_); -} -std::unique_ptr -DownstreamQueue::remove_and_pop_blocked(int32_t stream_id) { - auto kv = active_downstreams_.find(stream_id); + downstreams_.remove(downstream); - if (kv != std::end(active_downstreams_)) { - auto downstream = pop_downstream(kv, active_downstreams_); - auto &host = make_host_key(downstream.get()); - auto &ent = find_host_entry(host); - --ent.num_active; + auto &host = make_host_key(downstream); + auto &ent = find_host_entry(host); + --ent.num_active; - if (remove_host_entry_if_empty(ent, host_entries_, host)) { - return nullptr; + if (remove_host_entry_if_empty(ent, host_entries_, host)) { + return nullptr; + } + + if (ent.num_active >= conn_max_per_host_) { + return nullptr; + } + + for (auto link = ent.blocked.head; link;) { + auto next = link->dlnext; + if (!link->downstream) { + ent.blocked.remove(link); + link = next; + continue; } - - if (ent.blocked.empty() || ent.num_active >= conn_max_per_host_) { - return nullptr; - } - - auto next_stream_id = *std::begin(ent.blocked); - ent.blocked.erase(std::begin(ent.blocked)); - - auto itr = blocked_downstreams_.find(next_stream_id); - assert(itr != std::end(blocked_downstreams_)); - - auto next_downstream = pop_downstream(itr, blocked_downstreams_); - + auto next_downstream = link->downstream; + next_downstream->detach_blocked_link(link); + ent.blocked.remove(link); + delete link; remove_host_entry_if_empty(ent, host_entries_, host); - return next_downstream; } - - kv = blocked_downstreams_.find(stream_id); - - if (kv != std::end(blocked_downstreams_)) { - auto downstream = pop_downstream(kv, blocked_downstreams_); - auto &host = make_host_key(downstream.get()); - auto &ent = find_host_entry(host); - ent.blocked.erase(stream_id); - - remove_host_entry_if_empty(ent, host_entries_, host); - - return nullptr; - } - - kv = pending_downstreams_.find(stream_id); - - if (kv != std::end(pending_downstreams_)) { - pop_downstream(kv, pending_downstreams_); - return nullptr; - } - - kv = failure_downstreams_.find(stream_id); - - if (kv != std::end(failure_downstreams_)) { - pop_downstream(kv, failure_downstreams_); - return nullptr; - } - return nullptr; } -Downstream *DownstreamQueue::find(int32_t stream_id) { - auto kv = active_downstreams_.find(stream_id); - - if (kv != std::end(active_downstreams_)) { - return (*kv).second.get(); - } - - kv = blocked_downstreams_.find(stream_id); - - if (kv != std::end(blocked_downstreams_)) { - return (*kv).second.get(); - } - - kv = pending_downstreams_.find(stream_id); - - if (kv != std::end(pending_downstreams_)) { - return (*kv).second.get(); - } - - kv = failure_downstreams_.find(stream_id); - - if (kv != std::end(failure_downstreams_)) { - return (*kv).second.get(); - } - - return nullptr; -} - -const DownstreamQueue::DownstreamMap & -DownstreamQueue::get_active_downstreams() const { - return active_downstreams_; +Downstream *DownstreamQueue::get_downstreams() const { + return downstreams_.head; } } // namespace shrpx diff --git a/src/shrpx_downstream_queue.h b/src/shrpx_downstream_queue.h index 17b5bce0..ee5028bd 100644 --- a/src/shrpx_downstream_queue.h +++ b/src/shrpx_downstream_queue.h @@ -33,17 +33,27 @@ #include #include +#include "template.h" + +using namespace nghttp2; + namespace shrpx { class Downstream; +// Link entry in HostEntry.blocked and downstream because downstream +// could be deleted in anytime and we'd like to find Downstream in +// O(1). Downstream has field to link back to this object. +struct BlockedLink { + Downstream *downstream; + BlockedLink *dlnext, *dlprev; +}; + class DownstreamQueue { public: - typedef std::map> DownstreamMap; - struct HostEntry { // Set of stream ID that blocked by conn_max_per_host_. - std::set blocked; + DList blocked; // The number of connections currently made to this host. size_t num_active; HostEntry(); @@ -54,52 +64,38 @@ public: // conn_max_per_host == 0 means no limit for downstream connection. DownstreamQueue(size_t conn_max_per_host = 0, bool unified_host = true); ~DownstreamQueue(); + // Add |downstream| to this queue. This is entry point for + // Downstream object. void add_pending(std::unique_ptr downstream); - void add_failure(std::unique_ptr downstream); - // Adds |downstream| to active_downstreams_, which means that - // downstream connection has been started. - void add_active(std::unique_ptr downstream); - // Adds |downstream| to blocked_downstreams_, which means that - // download connection was blocked because conn_max_per_host_ limit. - void add_blocked(std::unique_ptr downstream); + // Set |downstream| to failure state, which means that downstream + // failed to connect to backend. + void mark_failure(Downstream *downstream); + // Set |downstream| to active state, which means that downstream + // connection has started. + void mark_active(Downstream *downstream); + // Set |downstream| to blocked state, which means that download + // connection was blocked because conn_max_per_host_ limit. + void mark_blocked(Downstream *downstream); // Returns true if we can make downstream connection to given // |host|. bool can_activate(const std::string &host) const; - // Removes pending Downstream object whose stream ID is |stream_id| - // from pending_downstreams_ and returns it. - std::unique_ptr pop_pending(int32_t stream_id); - // Removes Downstream object whose stream ID is |stream_id| from - // either pending_downstreams_, active_downstreams_, - // blocked_downstreams_ or failure_downstreams_. If a Downstream - // object is removed from active_downstreams_, this function may - // return Downstream object with the same target host in - // blocked_downstreams_ if its connection is now not blocked by - // conn_max_per_host_ limit. - std::unique_ptr remove_and_pop_blocked(int32_t stream_id); - // Finds Downstream object denoted by |stream_id| either in - // pending_downstreams_, active_downstreams_, blocked_downstreams_ - // or failure_downstreams_. - Downstream *find(int32_t stream_id); - const DownstreamMap &get_active_downstreams() const; + // Removes and frees |downstream| object. If |downstream| is in + // Downstream::DISPATCH_ACTIVE, this function may return Downstream + // object with the same target host in Downstream::DISPATCH_BLOCKED + // if its connection is now not blocked by conn_max_per_host_ limit. + Downstream *remove_and_get_blocked(Downstream *downstream); + Downstream *get_downstreams() const; HostEntry &find_host_entry(const std::string &host); const std::string &make_host_key(const std::string &host) const; const std::string &make_host_key(Downstream *downstream) const; - // Maximum number of concurrent connections to the same host. - size_t conn_max_per_host_; - private: // Per target host structure to keep track of the number of // connections to the same host. std::map host_entries_; - // Downstream objects, not processed yet - DownstreamMap pending_downstreams_; - // Downstream objects, failed to connect to downstream server - DownstreamMap failure_downstreams_; - // Downstream objects, downstream connection started - DownstreamMap active_downstreams_; - // Downstream objects, blocked by conn_max_per_host_ - DownstreamMap blocked_downstreams_; + DList downstreams_; + // Maximum number of concurrent connections to the same host. + size_t conn_max_per_host_; // true if downstream host is treated as the same. Used for reverse // proxying. bool unified_host_; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 25fe4c59..2727e8b6 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -55,7 +55,8 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, << " is being closed"; } - auto downstream = upstream->find_downstream(stream_id); + auto downstream = static_cast( + nghttp2_session_get_stream_user_data(session, stream_id)); if (!downstream) { return 0; @@ -161,7 +162,9 @@ int Http2Upstream::upgrade_upstream(HttpsUpstream *http) { downstream->set_request_http2_authority(authority); downstream->set_request_http2_scheme(scheme); - downstream_queue_.add_active(std::move(downstream)); + auto ptr = downstream.get(); + downstream_queue_.add_pending(std::move(downstream)); + downstream_queue_.mark_active(ptr); if (LOG_ENABLED(INFO)) { ULOG(INFO, this) << "Connection upgraded to HTTP/2"; @@ -191,7 +194,8 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame, return 0; } auto upstream = static_cast(user_data); - auto downstream = upstream->find_downstream(frame->hd.stream_id); + auto downstream = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if (!downstream) { return 0; } @@ -254,6 +258,8 @@ int on_begin_headers_callback(nghttp2_session *session, // TODO Use priority 0 for now auto downstream = make_unique(upstream, frame->hd.stream_id, 0); + nghttp2_session_set_stream_user_data(session, frame->hd.stream_id, + downstream.get()); downstream->reset_upstream_rtimer(); @@ -268,9 +274,8 @@ int on_begin_headers_callback(nghttp2_session *session, } } // namespace -namespace { -int on_request_headers(Http2Upstream *upstream, Downstream *downstream, - nghttp2_session *session, const nghttp2_frame *frame) { +int Http2Upstream::on_request_headers(Downstream *downstream, + const nghttp2_frame *frame) { if (downstream->get_response_state() == Downstream::MSG_COMPLETE) { return 0; } @@ -282,8 +287,8 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream, for (auto &nv : nva) { ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n"; } - ULOG(INFO, upstream) << "HTTP request headers. stream_id=" - << downstream->get_stream_id() << "\n" << ss.str(); + ULOG(INFO, this) << "HTTP request headers. stream_id=" + << downstream->get_stream_id() << "\n" << ss.str(); } if (get_config()->http2_upstream_dump_request_header) { @@ -299,7 +304,7 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream, // For HTTP/2 proxy, we request :authority. if (method->value != "CONNECT" && get_config()->http2_proxy && !authority) { - upstream->rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR); + rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR); return 0; } @@ -321,57 +326,51 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream, downstream->set_request_state(Downstream::MSG_COMPLETE); } - upstream->start_downstream(downstream); + start_downstream(downstream); return 0; } -} // namespace void Http2Upstream::start_downstream(Downstream *downstream) { - auto next_downstream = - downstream_queue_.pop_pending(downstream->get_stream_id()); - assert(next_downstream); - if (downstream_queue_.can_activate( downstream->get_request_http2_authority())) { - initiate_downstream(std::move(next_downstream)); + initiate_downstream(downstream); return; } - downstream_queue_.add_blocked(std::move(next_downstream)); + downstream_queue_.mark_blocked(downstream); } -void -Http2Upstream::initiate_downstream(std::unique_ptr downstream) { +void Http2Upstream::initiate_downstream(Downstream *downstream) { int rv; rv = downstream->attach_downstream_connection( handler_->get_downstream_connection()); if (rv != 0) { // downstream connection fails, send error page - if (error_reply(downstream.get(), 503) != 0) { - rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR); + if (error_reply(downstream, 503) != 0) { + rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); } downstream->set_request_state(Downstream::CONNECT_FAIL); - downstream_queue_.add_failure(std::move(downstream)); + downstream_queue_.mark_failure(downstream); return; } rv = downstream->push_request_headers(); if (rv != 0) { - if (error_reply(downstream.get(), 503) != 0) { - rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR); + if (error_reply(downstream, 503) != 0) { + rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); } - downstream_queue_.add_failure(std::move(downstream)); + downstream_queue_.mark_failure(downstream); return; } - downstream_queue_.add_active(std::move(downstream)); + downstream_queue_.mark_active(downstream); return; } @@ -386,7 +385,8 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, switch (frame->hd.type) { case NGHTTP2_DATA: { - auto downstream = upstream->find_downstream(frame->hd.stream_id); + auto downstream = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if (!downstream) { return 0; } @@ -401,7 +401,8 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, return 0; } case NGHTTP2_HEADERS: { - auto downstream = upstream->find_downstream(frame->hd.stream_id); + auto downstream = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if (!downstream) { return 0; } @@ -409,7 +410,7 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) { downstream->reset_upstream_rtimer(); - return on_request_headers(upstream, downstream, session, frame); + return upstream->on_request_headers(downstream, frame); } if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { @@ -449,7 +450,8 @@ int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { auto upstream = static_cast(user_data); - auto downstream = upstream->find_downstream(stream_id); + auto downstream = static_cast( + nghttp2_session_get_stream_user_data(session, stream_id)); if (!downstream || !downstream->get_downstream_connection()) { if (upstream->consume(stream_id, len) != 0) { @@ -566,7 +568,8 @@ int on_frame_not_send_callback(nghttp2_session *session, lib_error_code != NGHTTP2_ERR_STREAM_CLOSED && lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) { // To avoid stream hanging around, issue RST_STREAM. - auto downstream = upstream->find_downstream(frame->hd.stream_id); + auto downstream = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if (downstream) { upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); } @@ -1188,18 +1191,16 @@ void Http2Upstream::remove_downstream(Downstream *downstream) { handler_->write_accesslog(downstream); } - auto next_downstream = - downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id()); + nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(), + nullptr); + + auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream); if (next_downstream) { - initiate_downstream(std::move(next_downstream)); + initiate_downstream(next_downstream); } } -Downstream *Http2Upstream::find_downstream(int32_t stream_id) { - return downstream_queue_.find(stream_id); -} - // WARNING: Never call directly or indirectly nghttp2_session_send or // nghttp2_session_recv. These calls may delete downstream. int Http2Upstream::on_downstream_header_complete(Downstream *downstream) { @@ -1447,9 +1448,10 @@ int Http2Upstream::on_timeout(Downstream *downstream) { } void Http2Upstream::on_handler_delete() { - for (auto &ent : downstream_queue_.get_active_downstreams()) { - if (ent.second->accesslog_ready()) { - handler_->write_accesslog(ent.second.get()); + for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) { + if (d->get_dispatch_state() == Downstream::DISPATCH_ACTIVE && + d->accesslog_ready()) { + handler_->write_accesslog(d); } } } @@ -1457,8 +1459,12 @@ void Http2Upstream::on_handler_delete() { int Http2Upstream::on_downstream_reset(bool no_retry) { int rv; - for (auto &ent : downstream_queue_.get_active_downstreams()) { - auto downstream = ent.second.get(); + for (auto downstream = downstream_queue_.get_downstreams(); downstream; + downstream = downstream->dlnext) { + if (downstream->get_dispatch_state() != Downstream::DISPATCH_ACTIVE) { + continue; + } + if (!downstream->request_submission_ready()) { rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); downstream->pop_downstream_connection(); diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 5b4c087c..81b42563 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -62,7 +62,6 @@ public: void add_pending_downstream(std::unique_ptr downstream); void remove_downstream(Downstream *downstream); - Downstream *find_downstream(int32_t stream_id); int rst_stream(Downstream *downstream, uint32_t error_code); int terminate_session(uint32_t error_code); @@ -93,7 +92,7 @@ public: void log_response_headers(Downstream *downstream, const std::vector &nva) const; void start_downstream(Downstream *downstream); - void initiate_downstream(std::unique_ptr downstream); + void initiate_downstream(Downstream *downstream); void submit_goaway(); void check_shutdown(); @@ -101,6 +100,8 @@ public: int prepare_push_promise(Downstream *downstream); int submit_push_promise(const std::string &path, Downstream *downstream); + int on_request_headers(Downstream *downstream, const nghttp2_frame *frame); + private: // must be put before downstream_queue_ std::unique_ptr pre_upstream_; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 5e35808f..4968a413 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -92,7 +92,8 @@ void on_stream_close_callback(spdylay_session *session, int32_t stream_id, ULOG(INFO, upstream) << "Stream stream_id=" << stream_id << " is being closed"; } - auto downstream = upstream->find_downstream(stream_id); + auto downstream = static_cast( + spdylay_session_get_stream_user_data(session, stream_id)); if (!downstream) { return; } @@ -223,41 +224,37 @@ void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type, } // namespace void SpdyUpstream::start_downstream(Downstream *downstream) { - auto next_downstream = - downstream_queue_.pop_pending(downstream->get_stream_id()); - assert(next_downstream); - if (downstream_queue_.can_activate( downstream->get_request_http2_authority())) { - initiate_downstream(std::move(next_downstream)); + initiate_downstream(downstream); return; } - downstream_queue_.add_blocked(std::move(next_downstream)); + downstream_queue_.mark_blocked(downstream); } -void SpdyUpstream::initiate_downstream(std::unique_ptr downstream) { +void SpdyUpstream::initiate_downstream(Downstream *downstream) { int rv = downstream->attach_downstream_connection( handler_->get_downstream_connection()); if (rv != 0) { // If downstream connection fails, issue RST_STREAM. - rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR); + rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); downstream->set_request_state(Downstream::CONNECT_FAIL); - downstream_queue_.add_failure(std::move(downstream)); + downstream_queue_.mark_failure(downstream); return; } rv = downstream->push_request_headers(); if (rv != 0) { - rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR); + rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - downstream_queue_.add_failure(std::move(downstream)); + downstream_queue_.mark_failure(downstream); return; } - downstream_queue_.add_active(std::move(downstream)); + downstream_queue_.mark_active(downstream); } namespace { @@ -265,7 +262,8 @@ void on_data_chunk_recv_callback(spdylay_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { auto upstream = static_cast(user_data); - auto downstream = upstream->find_downstream(stream_id); + auto downstream = static_cast( + spdylay_session_get_stream_user_data(session, stream_id)); if (!downstream) { upstream->consume(stream_id, len); @@ -323,7 +321,8 @@ namespace { void on_data_recv_callback(spdylay_session *session, uint8_t flags, int32_t stream_id, int32_t length, void *user_data) { auto upstream = static_cast(user_data); - auto downstream = upstream->find_downstream(stream_id); + auto downstream = static_cast( + spdylay_session_get_stream_user_data(session, stream_id)); if (downstream && (flags & SPDYLAY_DATA_FLAG_FIN)) { if (!downstream->validate_request_bodylen()) { upstream->rst_stream(downstream, SPDYLAY_PROTOCOL_ERROR); @@ -351,7 +350,9 @@ void on_ctrl_not_send_callback(spdylay_session *session, error_code != SPDYLAY_ERR_STREAM_CLOSING) { // To avoid stream hanging around, issue RST_STREAM. auto stream_id = frame->syn_reply.stream_id; - auto downstream = upstream->find_downstream(stream_id); + // TODO Could be always nullptr + auto downstream = static_cast( + spdylay_session_get_stream_user_data(session, stream_id)); if (downstream) { upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); } @@ -797,6 +798,7 @@ int SpdyUpstream::error_reply(Downstream *downstream, Downstream *SpdyUpstream::add_pending_downstream(int32_t stream_id, int32_t priority) { auto downstream = make_unique(this, stream_id, priority); + spdylay_session_set_stream_user_data(session_, stream_id, downstream.get()); auto res = downstream.get(); downstream_queue_.add_pending(std::move(downstream)); @@ -809,18 +811,16 @@ void SpdyUpstream::remove_downstream(Downstream *downstream) { handler_->write_accesslog(downstream); } - auto next_downstream = - downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id()); + spdylay_session_set_stream_user_data(session_, downstream->get_stream_id(), + nullptr); + + auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream); if (next_downstream) { - initiate_downstream(std::move(next_downstream)); + initiate_downstream(next_downstream); } } -Downstream *SpdyUpstream::find_downstream(int32_t stream_id) { - return downstream_queue_.find(stream_id); -} - // WARNING: Never call directly or indirectly spdylay_session_send or // spdylay_session_recv. These calls may delete downstream. int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) { @@ -1026,9 +1026,10 @@ int SpdyUpstream::on_timeout(Downstream *downstream) { } void SpdyUpstream::on_handler_delete() { - for (auto &ent : downstream_queue_.get_active_downstreams()) { - if (ent.second->accesslog_ready()) { - handler_->write_accesslog(ent.second.get()); + for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) { + if (d->get_dispatch_state() == Downstream::DISPATCH_ACTIVE && + d->accesslog_ready()) { + handler_->write_accesslog(d); } } } @@ -1036,8 +1037,12 @@ void SpdyUpstream::on_handler_delete() { int SpdyUpstream::on_downstream_reset(bool no_retry) { int rv; - for (auto &ent : downstream_queue_.get_active_downstreams()) { - auto downstream = ent.second.get(); + for (auto downstream = downstream_queue_.get_downstreams(); downstream; + downstream = downstream->dlnext) { + if (downstream->get_dispatch_state() != Downstream::DISPATCH_ACTIVE) { + continue; + } + if (!downstream->request_submission_ready()) { rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); downstream->pop_downstream_connection(); diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index d35fadc1..2a6e37e1 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -58,7 +58,6 @@ public: virtual int downstream_error(DownstreamConnection *dconn, int events); Downstream *add_pending_downstream(int32_t stream_id, int32_t priority); void remove_downstream(Downstream *downstream); - Downstream *find_downstream(int32_t stream_id); int rst_stream(Downstream *downstream, int status_code); int error_reply(Downstream *downstream, unsigned int status_code); @@ -82,7 +81,7 @@ public: int consume(int32_t stream_id, size_t len); void start_downstream(Downstream *downstream); - void initiate_downstream(std::unique_ptr downstream); + void initiate_downstream(Downstream *downstream); private: // must be put before downstream_queue_ diff --git a/src/template.h b/src/template.h index 4bade2b5..144aed1b 100644 --- a/src/template.h +++ b/src/template.h @@ -132,9 +132,19 @@ template struct DList { t->dlprev = t->dlnext = nullptr; } + bool empty() const { return head == nullptr; } + T *head, *tail; }; +template void dlist_delete_all(DList &dl) { + for (auto e = dl.head; e;) { + auto next = e->dlnext; + delete e; + e = next; + } +} + } // namespace nghttp2 #endif // TEMPLATE_H