nghttpx: Limit # of downstream connections per host when h2 proxy is used

This commit limits the number of concurrent HTTP/1 downstream
connections to same host.  By defualt, it is limited to 8 connections.
--backend-connections-per-frontend option was replaced with
--backend-http1-connections-per-host, which changes the maximum number
of connections per host.  This limitation only kicks in when h2 proxy
is used (-s option).
This commit is contained in:
Tatsuhiro Tsujikawa 2014-12-05 01:07:00 +09:00
parent f178b78816
commit 9614611969
9 changed files with 247 additions and 143 deletions

View File

@ -776,7 +776,7 @@ void fill_default_config() {
mod_config()->no_location_rewrite = false; mod_config()->no_location_rewrite = false;
mod_config()->argc = 0; mod_config()->argc = 0;
mod_config()->argv = nullptr; mod_config()->argv = nullptr;
mod_config()->max_downstream_connections = 100; mod_config()->downstream_connections_per_host = 8;
mod_config()->listener_disable_timeout = {0, 0}; mod_config()->listener_disable_timeout = {0, 0};
} }
} // namespace } // namespace
@ -899,13 +899,12 @@ Performance:
Set maximum number of simultaneous connections Set maximum number of simultaneous connections
frontend accepts. Setting 0 means unlimited. frontend accepts. Setting 0 means unlimited.
Default: 0 Default: 0
--backend-connections-per-frontend=<NUM> --backend-http1-connections-per-host=<NUM>
Set maximum number of backend simultaneous Set maximum number of backend concurrent HTTP/1
connections per frontend. This option is connections per host. This option is meaningful
meaningful when the combination of HTTP/2 or SPDY when -s option is used.
frontend and HTTP/1 backend is used. Default: )"
Default: )" << get_config()->max_downstream_connections << get_config()->downstream_connections_per_host << R"(
<< R"(
Timeout: Timeout:
--frontend-http2-read-timeout=<SEC> --frontend-http2-read-timeout=<SEC>
@ -1276,7 +1275,7 @@ int main(int argc, char **argv) {
{"stream-read-timeout", required_argument, &flag, 60}, {"stream-read-timeout", required_argument, &flag, 60},
{"stream-write-timeout", required_argument, &flag, 61}, {"stream-write-timeout", required_argument, &flag, 61},
{"no-location-rewrite", no_argument, &flag, 62}, {"no-location-rewrite", no_argument, &flag, 62},
{"backend-connections-per-frontend", required_argument, &flag, 63}, {"backend-http1-connections-per-host", required_argument, &flag, 63},
{"listener-disable-timeout", required_argument, &flag, 64}, {"listener-disable-timeout", required_argument, &flag, 64},
{"strip-incoming-x-forwarded-for", no_argument, &flag, 65}, {"strip-incoming-x-forwarded-for", no_argument, &flag, 65},
{"accesslog-format", required_argument, &flag, 66}, {"accesslog-format", required_argument, &flag, 66},
@ -1568,8 +1567,8 @@ int main(int argc, char **argv) {
cmdcfgs.emplace_back(SHRPX_OPT_NO_LOCATION_REWRITE, "yes"); cmdcfgs.emplace_back(SHRPX_OPT_NO_LOCATION_REWRITE, "yes");
break; break;
case 63: case 63:
// --backend-connections-per-frontend // --backend-http1-connections-per-host
cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND, cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST,
optarg); optarg);
break; break;
case 64: case 64:

View File

@ -132,8 +132,8 @@ const char SHRPX_OPT_ADD_RESPONSE_HEADER[] = "add-response-header";
const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[] = const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[] =
"worker-frontend-connections"; "worker-frontend-connections";
const char SHRPX_OPT_NO_LOCATION_REWRITE[] = "no-location-rewrite"; const char SHRPX_OPT_NO_LOCATION_REWRITE[] = "no-location-rewrite";
const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[] = const char SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST[] =
"backend-connections-per-frontend"; "backend-http1-connections-per-host";
const char SHRPX_OPT_LISTENER_DISABLE_TIMEOUT[] = "listener-disable-timeout"; const char SHRPX_OPT_LISTENER_DISABLE_TIMEOUT[] = "listener-disable-timeout";
namespace { namespace {
@ -1006,7 +1006,7 @@ int parse_config(const char *opt, const char *optarg) {
return 0; return 0;
} }
if (util::strieq(opt, SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND)) { if (util::strieq(opt, SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST)) {
int n; int n;
if (parse_uint(&n, opt, optarg) != 0) { if (parse_uint(&n, opt, optarg) != 0) {
@ -1019,7 +1019,7 @@ int parse_config(const char *opt, const char *optarg) {
return -1; return -1;
} }
mod_config()->max_downstream_connections = n; mod_config()->downstream_connections_per_host = n;
return 0; return 0;
} }

View File

@ -123,7 +123,7 @@ extern const char SHRPX_OPT_ALTSVC[];
extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[]; extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[];
extern const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[]; extern const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[];
extern const char SHRPX_OPT_NO_LOCATION_REWRITE[]; extern const char SHRPX_OPT_NO_LOCATION_REWRITE[];
extern const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[]; extern const char SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST[];
extern const char SHRPX_OPT_LISTENER_DISABLE_TIMEOUT[]; extern const char SHRPX_OPT_LISTENER_DISABLE_TIMEOUT[];
union sockaddr_union { union sockaddr_union {
@ -222,7 +222,7 @@ struct Config {
size_t http2_downstream_window_bits; size_t http2_downstream_window_bits;
size_t http2_upstream_connection_window_bits; size_t http2_upstream_connection_window_bits;
size_t http2_downstream_connection_window_bits; size_t http2_downstream_connection_window_bits;
size_t max_downstream_connections; size_t downstream_connections_per_host;
// actual size of downstream_http_proxy_addr // actual size of downstream_http_proxy_addr
size_t downstream_http_proxy_addrlen; size_t downstream_http_proxy_addrlen;
size_t read_rate; size_t read_rate;

View File

@ -25,12 +25,18 @@
#include "shrpx_downstream_queue.h" #include "shrpx_downstream_queue.h"
#include <cassert> #include <cassert>
#include <limits>
#include "shrpx_downstream.h" #include "shrpx_downstream.h"
namespace shrpx { namespace shrpx {
DownstreamQueue::DownstreamQueue() {} DownstreamQueue::HostEntry::HostEntry() : num_active(0) {}
DownstreamQueue::DownstreamQueue(size_t conn_max_per_host)
: conn_max_per_host_(conn_max_per_host == 0
? std::numeric_limits<size_t>::max()
: conn_max_per_host) {}
DownstreamQueue::~DownstreamQueue() {} DownstreamQueue::~DownstreamQueue() {}
@ -44,49 +50,146 @@ void DownstreamQueue::add_failure(std::unique_ptr<Downstream> downstream) {
failure_downstreams_[stream_id] = std::move(downstream); failure_downstreams_[stream_id] = std::move(downstream);
} }
DownstreamQueue::HostEntry &
DownstreamQueue::find_host_entry(const std::string &host) {
auto itr = host_entries_.find(host);
if (itr == std::end(host_entries_)) {
std::tie(itr, std::ignore) = host_entries_.emplace(host, HostEntry());
}
return (*itr).second;
}
void DownstreamQueue::add_active(std::unique_ptr<Downstream> downstream) { void DownstreamQueue::add_active(std::unique_ptr<Downstream> downstream) {
auto &ent = find_host_entry(downstream->get_request_http2_authority());
++ent.num_active;
auto stream_id = downstream->get_stream_id(); auto stream_id = downstream->get_stream_id();
active_downstreams_[stream_id] = std::move(downstream); active_downstreams_[stream_id] = std::move(downstream);
} }
std::unique_ptr<Downstream> DownstreamQueue::remove(int32_t stream_id) { void DownstreamQueue::add_blocked(std::unique_ptr<Downstream> downstream) {
auto kv = pending_downstreams_.find(stream_id); auto &ent = find_host_entry(downstream->get_request_http2_authority());
auto stream_id = downstream->get_stream_id();
ent.blocked.insert(stream_id);
blocked_downstreams_[stream_id] = std::move(downstream);
}
if (kv != std::end(pending_downstreams_)) { bool DownstreamQueue::can_activate(const std::string &host) const {
auto downstream = std::move((*kv).second); auto itr = host_entries_.find(host);
pending_downstreams_.erase(kv); if (itr == std::end(host_entries_)) {
return downstream; return true;
} }
auto &ent = (*itr).second;
return ent.num_active < conn_max_per_host_;
}
kv = active_downstreams_.find(stream_id); namespace {
std::unique_ptr<Downstream>
pop_downstream(DownstreamQueue::DownstreamMap::iterator i,
DownstreamQueue::DownstreamMap &downstreams) {
auto downstream = std::move((*i).second);
downstreams.erase(i);
return downstream;
}
} // namespace
namespace {
bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent,
DownstreamQueue::HostEntryMap &host_entries,
const std::string &host) {
if (ent.blocked.empty() && ent.num_active == 0) {
host_entries.erase(host);
return true;
}
return false;
}
} // namespace
std::unique_ptr<Downstream> DownstreamQueue::pop_pending(int32_t stream_id) {
auto itr = pending_downstreams_.find(stream_id);
if (itr == std::end(pending_downstreams_)) {
return nullptr;
}
return pop_downstream(itr, pending_downstreams_);
}
std::unique_ptr<Downstream>
DownstreamQueue::remove_and_pop_blocked(int32_t stream_id) {
auto kv = active_downstreams_.find(stream_id);
if (kv != std::end(active_downstreams_)) { if (kv != std::end(active_downstreams_)) {
auto downstream = std::move((*kv).second); auto downstream = pop_downstream(kv, active_downstreams_);
active_downstreams_.erase(kv); auto &host = downstream->get_request_http2_authority();
return downstream; auto &ent = find_host_entry(host);
--ent.num_active;
if (remove_host_entry_if_empty(ent, host_entries_, host)) {
return nullptr;
}
if (ent.blocked.empty() || ent.num_active >= conn_max_per_host_) {
return nullptr;
}
auto next_stream_id = *std::begin(ent.blocked);
ent.blocked.erase(std::begin(ent.blocked));
auto itr = blocked_downstreams_.find(next_stream_id);
assert(itr != std::end(blocked_downstreams_));
auto next_downstream = pop_downstream(itr, blocked_downstreams_);
remove_host_entry_if_empty(ent, host_entries_, host);
return next_downstream;
}
kv = blocked_downstreams_.find(stream_id);
if (kv != std::end(blocked_downstreams_)) {
auto downstream = pop_downstream(kv, blocked_downstreams_);
auto &host = downstream->get_request_http2_authority();
auto &ent = find_host_entry(host);
ent.blocked.erase(stream_id);
remove_host_entry_if_empty(ent, host_entries_, host);
return nullptr;
}
kv = pending_downstreams_.find(stream_id);
if (kv != std::end(pending_downstreams_)) {
pop_downstream(kv, pending_downstreams_);
return nullptr;
} }
kv = failure_downstreams_.find(stream_id); kv = failure_downstreams_.find(stream_id);
if (kv != std::end(failure_downstreams_)) { if (kv != std::end(failure_downstreams_)) {
auto downstream = std::move((*kv).second); pop_downstream(kv, failure_downstreams_);
failure_downstreams_.erase(kv); return nullptr;
return downstream;
} }
return nullptr; return nullptr;
} }
Downstream *DownstreamQueue::find(int32_t stream_id) { Downstream *DownstreamQueue::find(int32_t stream_id) {
auto kv = pending_downstreams_.find(stream_id); auto kv = active_downstreams_.find(stream_id);
if (kv != std::end(pending_downstreams_)) { if (kv != std::end(active_downstreams_)) {
return (*kv).second.get(); return (*kv).second.get();
} }
kv = active_downstreams_.find(stream_id); kv = blocked_downstreams_.find(stream_id);
if (kv != std::end(active_downstreams_)) { if (kv != std::end(blocked_downstreams_)) {
return (*kv).second.get();
}
kv = pending_downstreams_.find(stream_id);
if (kv != std::end(pending_downstreams_)) {
return (*kv).second.get(); return (*kv).second.get();
} }
@ -99,41 +202,14 @@ Downstream *DownstreamQueue::find(int32_t stream_id) {
return nullptr; return nullptr;
} }
std::unique_ptr<Downstream> DownstreamQueue::pop_pending() { const DownstreamQueue::DownstreamMap &
auto i = std::begin(pending_downstreams_);
if (i == std::end(pending_downstreams_)) {
return nullptr;
}
auto downstream = std::move((*i).second);
pending_downstreams_.erase(i);
return downstream;
}
Downstream *DownstreamQueue::pending_top() const {
auto i = std::begin(pending_downstreams_);
if (i == std::end(pending_downstreams_)) {
return nullptr;
}
return (*i).second.get();
}
size_t DownstreamQueue::num_active() const {
return active_downstreams_.size();
}
bool DownstreamQueue::pending_empty() const {
return pending_downstreams_.empty();
}
const std::map<int32_t, std::unique_ptr<Downstream>> &
DownstreamQueue::get_active_downstreams() const { DownstreamQueue::get_active_downstreams() const {
return active_downstreams_; return active_downstreams_;
} }
const DownstreamQueue::DownstreamMap &
DownstreamQueue::get_blocked_downstreams() const {
return blocked_downstreams_;
}
} // namespace shrpx } // namespace shrpx

View File

@ -30,6 +30,7 @@
#include <stdint.h> #include <stdint.h>
#include <map> #include <map>
#include <set>
#include <memory> #include <memory>
namespace shrpx { namespace shrpx {
@ -38,39 +39,67 @@ class Downstream;
class DownstreamQueue { class DownstreamQueue {
public: public:
DownstreamQueue(); typedef std::map<int32_t, std::unique_ptr<Downstream>> DownstreamMap;
struct HostEntry {
// Set of stream ID that blocked by conn_max_per_host_.
std::set<int32_t> blocked;
// The number of connections currently made to this host.
size_t num_active;
HostEntry();
};
typedef std::map<std::string, HostEntry> HostEntryMap;
// conn_max_per_host == 0 means no limit for downstream connection.
DownstreamQueue(size_t conn_max_per_host = 0);
~DownstreamQueue(); ~DownstreamQueue();
void add_pending(std::unique_ptr<Downstream> downstream); void add_pending(std::unique_ptr<Downstream> downstream);
void add_failure(std::unique_ptr<Downstream> downstream); void add_failure(std::unique_ptr<Downstream> downstream);
// Adds |downstream| to active_downstreams_, which means that
// downstream connection has been started.
void add_active(std::unique_ptr<Downstream> downstream); void add_active(std::unique_ptr<Downstream> downstream);
// Removes |downstream| from either pending_downstreams_, // Adds |downstream| to blocked_downstreams_, which means that
// active_downstreams_ or failure_downstreams_ and returns it // download connection was blocked because conn_max_per_host_ limit.
// wrapped in std::unique_ptr. void add_blocked(std::unique_ptr<Downstream> downstream);
std::unique_ptr<Downstream> remove(int32_t stream_id); // Returns true if we can make downstream connection to given
// |host|.
bool can_activate(const std::string &host) const;
// Removes pending Downstream object whose stream ID is |stream_id|
// from pending_downstreams_ and returns it.
std::unique_ptr<Downstream> pop_pending(int32_t stream_id);
// Removes Downstream object whose stream ID is |stream_id| from
// either pending_downstreams_, active_downstreams_,
// blocked_downstreams_ or failure_downstreams_. If a Downstream
// object is removed from active_downstreams_, this function may
// return Downstream object with the same target host in
// blocked_downstreams_ if its connection is now not blocked by
// conn_max_per_host_ limit.
std::unique_ptr<Downstream> remove_and_pop_blocked(int32_t stream_id);
// Finds Downstream object denoted by |stream_id| either in // Finds Downstream object denoted by |stream_id| either in
// pending_downstreams_, active_downstreams_ or // pending_downstreams_, active_downstreams_, blocked_downstreams_
// failure_downstreams_. // or failure_downstreams_.
Downstream *find(int32_t stream_id); Downstream *find(int32_t stream_id);
// Returns the number of active Downstream objects. const DownstreamMap &get_active_downstreams() const;
size_t num_active() const; const DownstreamMap &get_blocked_downstreams() const;
// Returns true if pending_downstreams_ is empty.
bool pending_empty() const; HostEntry &find_host_entry(const std::string &host);
// Pops first Downstream object in pending_downstreams_ and returns
// it. // Maximum number of concurrent connections to the same host.
std::unique_ptr<Downstream> pop_pending(); size_t conn_max_per_host_;
// Returns first Downstream object in pending_downstreams_. This
// does not pop the first one. If queue is empty, returns nullptr.
Downstream *pending_top() const;
const std::map<int32_t, std::unique_ptr<Downstream>> &
get_active_downstreams() const;
private: private:
// Per target host structure to keep track of the number of
// connections to the same host.
std::map<std::string, HostEntry> host_entries_;
// Downstream objects, not processed yet // Downstream objects, not processed yet
std::map<int32_t, std::unique_ptr<Downstream>> pending_downstreams_; DownstreamMap pending_downstreams_;
// Downstream objects in use, consuming downstream concurrency limit
std::map<int32_t, std::unique_ptr<Downstream>> active_downstreams_;
// Downstream objects, failed to connect to downstream server // Downstream objects, failed to connect to downstream server
std::map<int32_t, std::unique_ptr<Downstream>> failure_downstreams_; DownstreamMap failure_downstreams_;
// Downstream objects, downstream connection started
DownstreamMap active_downstreams_;
// Downstream objects, blocked by conn_max_per_host_
DownstreamMap blocked_downstreams_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -352,35 +352,24 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream,
downstream->set_request_state(Downstream::MSG_COMPLETE); downstream->set_request_state(Downstream::MSG_COMPLETE);
} }
upstream->maintain_downstream_concurrency(); upstream->start_downstream(downstream);
return 0; return 0;
} }
} // namespace } // namespace
void Http2Upstream::maintain_downstream_concurrency() { void Http2Upstream::start_downstream(Downstream *downstream) {
while (get_config()->max_downstream_connections > auto next_downstream =
downstream_queue_.num_active()) { downstream_queue_.pop_pending(downstream->get_stream_id());
if (downstream_queue_.pending_empty()) { assert(next_downstream);
break;
if (downstream_queue_.can_activate(
downstream->get_request_http2_authority())) {
initiate_downstream(std::move(next_downstream));
return;
} }
{ downstream_queue_.add_blocked(std::move(next_downstream));
auto downstream = downstream_queue_.pending_top();
if (downstream->get_request_state() != Downstream::HEADER_COMPLETE &&
downstream->get_request_state() != Downstream::MSG_COMPLETE) {
break;
}
}
auto downstream = downstream_queue_.pop_pending();
if (!downstream) {
break;
}
initiate_downstream(std::move(downstream));
}
} }
void void
@ -610,7 +599,10 @@ uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
} // namespace } // namespace
Http2Upstream::Http2Upstream(ClientHandler *handler) Http2Upstream::Http2Upstream(ClientHandler *handler)
: handler_(handler), session_(nullptr), settings_timerev_(nullptr) { : downstream_queue_(get_config()->http2_proxy
? get_config()->downstream_connections_per_host
: 0),
handler_(handler), session_(nullptr), settings_timerev_(nullptr) {
reset_timeouts(); reset_timeouts();
int rv; int rv;
@ -1154,9 +1146,12 @@ void Http2Upstream::remove_downstream(Downstream *downstream) {
handler_->write_accesslog(downstream); handler_->write_accesslog(downstream);
} }
downstream_queue_.remove(downstream->get_stream_id()); auto next_downstream =
downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id());
maintain_downstream_concurrency(); if (next_downstream) {
initiate_downstream(std::move(next_downstream));
}
} }
Downstream *Http2Upstream::find_downstream(int32_t stream_id) { Downstream *Http2Upstream::find_downstream(int32_t stream_id) {
@ -1400,6 +1395,11 @@ void Http2Upstream::on_handler_delete() {
handler_->write_accesslog(ent.second.get()); handler_->write_accesslog(ent.second.get());
} }
} }
for (auto &ent : downstream_queue_.get_blocked_downstreams()) {
if (ent.second->accesslog_ready()) {
handler_->write_accesslog(ent.second.get());
}
}
} }
} // namespace shrpx } // namespace shrpx

View File

@ -88,7 +88,7 @@ public:
int consume(int32_t stream_id, size_t len); int consume(int32_t stream_id, size_t len);
void log_response_headers(Downstream *downstream, void log_response_headers(Downstream *downstream,
const std::vector<nghttp2_nv> &nva) const; const std::vector<nghttp2_nv> &nva) const;
void maintain_downstream_concurrency(); void start_downstream(Downstream *downstream);
void initiate_downstream(std::unique_ptr<Downstream> downstream); void initiate_downstream(std::unique_ptr<Downstream> downstream);
nghttp2::util::EvbufferBuffer sendbuf; nghttp2::util::EvbufferBuffer sendbuf;

View File

@ -232,7 +232,7 @@ void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type,
downstream->set_request_state(Downstream::MSG_COMPLETE); downstream->set_request_state(Downstream::MSG_COMPLETE);
} }
upstream->maintain_downstream_concurrency(); upstream->start_downstream(downstream);
break; break;
} }
@ -242,29 +242,18 @@ void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type,
} }
} // namespace } // namespace
void SpdyUpstream::maintain_downstream_concurrency() { void SpdyUpstream::start_downstream(Downstream *downstream) {
while (get_config()->max_downstream_connections > auto next_downstream =
downstream_queue_.num_active()) { downstream_queue_.pop_pending(downstream->get_stream_id());
if (downstream_queue_.pending_empty()) { assert(next_downstream);
break;
if (downstream_queue_.can_activate(
downstream->get_request_http2_authority())) {
initiate_downstream(std::move(next_downstream));
return;
} }
{ downstream_queue_.add_blocked(std::move(next_downstream));
auto downstream = downstream_queue_.pending_top();
if (downstream->get_request_state() != Downstream::HEADER_COMPLETE &&
downstream->get_request_state() != Downstream::MSG_COMPLETE) {
break;
}
}
auto downstream = downstream_queue_.pop_pending();
if (!downstream) {
break;
}
initiate_downstream(std::move(downstream));
}
} }
void SpdyUpstream::initiate_downstream(std::unique_ptr<Downstream> downstream) { void SpdyUpstream::initiate_downstream(std::unique_ptr<Downstream> downstream) {
@ -425,7 +414,10 @@ uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_error_code) {
} // namespace } // namespace
SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler) SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
: handler_(handler), session_(nullptr) { : downstream_queue_(get_config()->http2_proxy
? get_config()->downstream_connections_per_host
: 0),
handler_(handler), session_(nullptr) {
// handler->set_bev_cb(spdy_readcb, 0, spdy_eventcb); // handler->set_bev_cb(spdy_readcb, 0, spdy_eventcb);
reset_timeouts(); reset_timeouts();
@ -875,9 +867,12 @@ void SpdyUpstream::remove_downstream(Downstream *downstream) {
handler_->write_accesslog(downstream); handler_->write_accesslog(downstream);
} }
downstream_queue_.remove(downstream->get_stream_id()); auto next_downstream =
downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id());
maintain_downstream_concurrency(); if (next_downstream) {
initiate_downstream(std::move(next_downstream));
}
} }
Downstream *SpdyUpstream::find_downstream(int32_t stream_id) { Downstream *SpdyUpstream::find_downstream(int32_t stream_id) {
@ -1094,6 +1089,11 @@ void SpdyUpstream::on_handler_delete() {
handler_->write_accesslog(ent.second.get()); handler_->write_accesslog(ent.second.get());
} }
} }
for (auto &ent : downstream_queue_.get_blocked_downstreams()) {
if (ent.second->accesslog_ready()) {
handler_->write_accesslog(ent.second.get());
}
}
} }
} // namespace shrpx } // namespace shrpx

View File

@ -81,7 +81,7 @@ public:
int consume(int32_t stream_id, size_t len); int consume(int32_t stream_id, size_t len);
void maintain_downstream_concurrency(); void start_downstream(Downstream *downstream);
void initiate_downstream(std::unique_ptr<Downstream> downstream); void initiate_downstream(std::unique_ptr<Downstream> downstream);
nghttp2::util::EvbufferBuffer sendbuf; nghttp2::util::EvbufferBuffer sendbuf;