From 81adb6bc7fd3a5c8ce9234fee0931ac40ebe9267 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 21 Nov 2012 23:47:48 +0900 Subject: [PATCH] shrpx: Implement downstream SPDY flow control --- src/shrpx.cc | 16 +++++- src/shrpx_config.cc | 19 +++++-- src/shrpx_config.h | 2 + src/shrpx_downstream.cc | 9 ++++ src/shrpx_downstream.h | 1 + src/shrpx_downstream_connection.h | 1 + src/shrpx_http_downstream_connection.cc | 5 ++ src/shrpx_http_downstream_connection.h | 1 + src/shrpx_https_upstream.cc | 4 +- src/shrpx_spdy_downstream_connection.cc | 32 ++++++++++- src/shrpx_spdy_downstream_connection.h | 5 ++ src/shrpx_spdy_session.cc | 70 ++++++++++++++++++++++--- src/shrpx_spdy_session.h | 9 +++- 13 files changed, 159 insertions(+), 15 deletions(-) diff --git a/src/shrpx.cc b/src/shrpx.cc index 0d165a71..8f3a8cca 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -325,9 +325,10 @@ void fill_default_config() // Timeout for pooled (idle) connections mod_config()->downstream_idle_read_timeout.tv_sec = 60; - // window bits for SPDY upstream connection - // 2**16 = 64KiB, which is SPDY/3 default. + // window bits for SPDY upstream/downstream connection. 2**16 = + // 64KiB, which is SPDY/3 default. mod_config()->spdy_upstream_window_bits = 16; + mod_config()->spdy_downstream_window_bits = 16; set_config_str(&mod_config()->downstream_host, "127.0.0.1"); mod_config()->downstream_port = 80; @@ -443,6 +444,11 @@ void print_help(std::ostream& out) << " frontend connection to 2**.\n" << " Default: " << get_config()->spdy_upstream_window_bits << "\n" + << " --backend-spdy-window-bits=\n" + << " Sets the initial window size of SPDY\n" + << " backend connection to 2**.\n" + << " Default: " + << get_config()->spdy_downstream_window_bits << "\n" << " --pid-file= Set path to save PID of this program.\n" << " --user= Run this program as USER. This option is\n" << " intended to be used to drop root privileges.\n" @@ -499,6 +505,7 @@ int main(int argc, char **argv) {"backlog", required_argument, &flag, 15 }, {"ciphers", required_argument, &flag, 16 }, {"client", no_argument, &flag, 17 }, + {"backend-spdy-window-bits", required_argument, &flag, 18 }, {"help", no_argument, 0, 'h' }, {0, 0, 0, 0 } }; @@ -614,6 +621,11 @@ int main(int argc, char **argv) // --client cmdcfgs.push_back(std::make_pair(SHRPX_OPT_CLIENT, "yes")); break; + case 18: + // --backend-spdy-window-bits + cmdcfgs.push_back(std::make_pair(SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS, + optarg)); + break; default: break; } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 0035a526..69c69166 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -63,6 +63,7 @@ const char SHRPX_OPT_ACCESSLOG[] = "accesslog"; const char SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT[] = "backend-keep-alive-timeout"; const char SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS[] = "frontend-spdy-window-bits"; +const char SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS[] = "backend-spdy-window-bits"; const char SHRPX_OPT_PID_FILE[] = "pid-file"; const char SHRPX_OPT_USER[] = "user"; const char SHRPX_OPT_SYSLOG[] = "syslog"; @@ -91,6 +92,7 @@ Config::Config() add_x_forwarded_for(false), accesslog(false), spdy_upstream_window_bits(0), + spdy_downstream_window_bits(0), pid_file(0), uid(0), gid(0), @@ -216,13 +218,24 @@ int parse_config(const char *opt, const char *optarg) } else if(util::strieq(opt, SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT)) { timeval tv = {strtol(optarg, 0, 10), 0}; mod_config()->downstream_idle_read_timeout = tv; - } else if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS)) { + } else if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS) || + util::strieq(opt, SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS)) { + size_t *resp; + const char *optname; + if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS)) { + resp = &mod_config()->spdy_upstream_window_bits; + optname = SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS; + } else { + resp = &mod_config()->spdy_downstream_window_bits; + optname = SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS; + } errno = 0; unsigned long int n = strtoul(optarg, 0, 10); if(errno == 0 && n < 31) { - mod_config()->spdy_upstream_window_bits = n; + *resp = n; } else { - LOG(ERROR) << "-w: specify the integer in the range [0, 30], inclusive"; + LOG(ERROR) << "--" << optname + << " specify the integer in the range [0, 30], inclusive"; return -1; } } else if(util::strieq(opt, SHRPX_OPT_PID_FILE)) { diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 5eb1934a..4413f279 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -55,6 +55,7 @@ extern const char SHRPX_OPT_BACKEND_WRITE_TIMEOUT[]; extern const char SHRPX_OPT_ACCESSLOG[]; extern const char SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT[]; extern const char SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS[]; +extern const char SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS[]; extern const char SHRPX_OPT_PID_FILE[]; extern const char SHRPX_OPT_USER[]; extern const char SHRPX_OPT_SYSLOG[]; @@ -97,6 +98,7 @@ struct Config { bool add_x_forwarded_for; bool accesslog; size_t spdy_upstream_window_bits; + size_t spdy_downstream_window_bits; char *pid_file; uid_t uid; gid_t gid; diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 147c424e..c967dcfb 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -487,4 +487,13 @@ int32_t Downstream::get_downstream_stream_id() const return downstream_stream_id_; } +int Downstream::on_upstream_write() +{ + if(dconn_) { + return dconn_->on_upstream_write(); + } else { + return 0; + } +} + } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 254a9d92..2cdbe05f 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -132,6 +132,7 @@ public: // Call this method when there is incoming data in downstream // connection. int on_read(); + int on_upstream_write(); static const size_t OUTPUT_UPPER_THRES = 64*1024; private: diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index 30bf7326..98082f1b 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -53,6 +53,7 @@ public: virtual int on_read() = 0; virtual int on_write() = 0; + virtual int on_upstream_write() = 0; ClientHandler* get_client_handler(); Downstream* get_downstream(); diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 5140ab26..116bcfe3 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -447,4 +447,9 @@ int HttpDownstreamConnection::on_write() return 0; } +int HttpDownstreamConnection::on_upstream_write() +{ + return 0; +} + } // namespace shrpx diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index f99bf80f..7fc9d53f 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -56,6 +56,7 @@ public: virtual int on_read(); virtual int on_write(); + virtual int on_upstream_write(); bufferevent* get_bev(); private: diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 651e9e9a..2ff6bbe2 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -311,11 +311,13 @@ int HttpsUpstream::on_read() int HttpsUpstream::on_write() { + int rv = 0; Downstream *downstream = get_downstream(); if(downstream) { downstream->resume_read(SHRPX_NO_BUFFER); + rv = downstream->on_upstream_write(); } - return 0; + return rv; } int HttpsUpstream::on_event() diff --git a/src/shrpx_spdy_downstream_connection.cc b/src/shrpx_spdy_downstream_connection.cc index b6c78c49..b74e72f8 100644 --- a/src/shrpx_spdy_downstream_connection.cc +++ b/src/shrpx_spdy_downstream_connection.cc @@ -50,7 +50,8 @@ SpdyDownstreamConnection::SpdyDownstreamConnection : DownstreamConnection(client_handler), spdy_(client_handler->get_spdy_session()), request_body_buf_(0), - sd_(0) + sd_(0), + recv_window_size_(0) {} SpdyDownstreamConnection::~SpdyDownstreamConnection() @@ -101,7 +102,7 @@ int SpdyDownstreamConnection::attach_downstream(Downstream *downstream) } downstream->set_downstream_connection(this); downstream_ = downstream; - + recv_window_size_ = 0; return 0; } @@ -362,6 +363,23 @@ int SpdyDownstreamConnection::on_write() return 0; } +int SpdyDownstreamConnection::on_upstream_write() +{ + int rv; + if(spdy_->get_state() == SpdySession::CONNECTED && + spdy_->get_flow_control() && + downstream_ && downstream_->get_downstream_stream_id() != -1 && + recv_window_size_ >= spdy_->get_initial_window_size()/2) { + rv = spdy_->submit_window_update(this, recv_window_size_); + if(rv == -1) { + return -1; + } + spdy_->notify(); + recv_window_size_ = 0; + } + return 0; +} + evbuffer* SpdyDownstreamConnection::get_request_body_buf() const { return request_body_buf_; @@ -396,4 +414,14 @@ bool SpdyDownstreamConnection::get_output_buffer_full() } } +int32_t SpdyDownstreamConnection::get_recv_window_size() const +{ + return recv_window_size_; +} + +void SpdyDownstreamConnection::inc_recv_window_size(int32_t amount) +{ + recv_window_size_ += amount; +} + } // namespace shrpx diff --git a/src/shrpx_spdy_downstream_connection.h b/src/shrpx_spdy_downstream_connection.h index 450bd08a..f273bc6b 100644 --- a/src/shrpx_spdy_downstream_connection.h +++ b/src/shrpx_spdy_downstream_connection.h @@ -57,6 +57,7 @@ public: virtual int on_read(); virtual int on_write(); + virtual int on_upstream_write(); int send(); int init_request_body_buf(); @@ -64,10 +65,14 @@ public: void attach_stream_data(StreamData *sd); StreamData* detach_stream_data(); + + int32_t get_recv_window_size() const; + void inc_recv_window_size(int32_t amount); private: SpdySession *spdy_; evbuffer *request_body_buf_; StreamData *sd_; + int32_t recv_window_size_; }; } // namespace shrpx diff --git a/src/shrpx_spdy_session.cc b/src/shrpx_spdy_session.cc index 8a650a6b..ba4693db 100644 --- a/src/shrpx_spdy_session.cc +++ b/src/shrpx_spdy_session.cc @@ -52,7 +52,8 @@ SpdySession::SpdySession(event_base *evbase, SSL_CTX *ssl_ctx) state_(DISCONNECTED), notified_(false), wrbev_(0), - rdbev_(0) + rdbev_(0), + flow_control_(false) {} SpdySession::~SpdySession() @@ -313,7 +314,7 @@ int SpdySession::submit_request(SpdyDownstreamConnection *dconn, return 0; } -int SpdySession::submit_rst_stream(SpdyDownstreamConnection *docnn, +int SpdySession::submit_rst_stream(SpdyDownstreamConnection *dconn, int32_t stream_id, uint32_t status_code) { assert(state_ == CONNECTED); @@ -326,6 +327,32 @@ int SpdySession::submit_rst_stream(SpdyDownstreamConnection *docnn, return 0; } +int SpdySession::submit_window_update(SpdyDownstreamConnection *dconn, + int32_t amount) +{ + assert(state_ == CONNECTED); + int rv; + int32_t stream_id; + stream_id = dconn->get_downstream()->get_downstream_stream_id(); + rv = spdylay_submit_window_update(session_, stream_id, amount); + if(rv < SPDYLAY_ERR_FATAL) { + LOG(FATAL) << "spdylay_submit_window_update() failed: " + << spdylay_strerror(rv); + return -1; + } + return 0; +} + +int32_t SpdySession::get_initial_window_size() const +{ + return 1 << get_config()->spdy_downstream_window_bits; +} + +bool SpdySession::get_flow_control() const +{ + return flow_control_; +} + int SpdySession::resume_data(SpdyDownstreamConnection *dconn) { assert(state_ == CONNECTED); @@ -585,7 +612,24 @@ void on_data_chunk_recv_callback(spdylay_session *session, spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR); return; } - // TODO No manual flow control at the moment. + + if(spdy->get_flow_control()) { + sd->dconn->inc_recv_window_size(len); + if(sd->dconn->get_recv_window_size() > spdy->get_initial_window_size()) { + if(ENABLE_LOG) { + LOG(INFO) << "Flow control error: recv_window_size=" + << sd->dconn->get_recv_window_size() + << ", initial_window_size=" + << spdy->get_initial_window_size(); + } + spdylay_submit_rst_stream(session, stream_id, + SPDYLAY_FLOW_CONTROL_ERROR); + downstream->set_response_state(Downstream::MSG_RESET); + call_downstream_readcb(spdy, downstream); + return; + } + } + Upstream *upstream = downstream->get_upstream(); rv = upstream->on_downstream_body(downstream, data, len); if(rv != 0) { @@ -719,12 +763,26 @@ int SpdySession::on_connect() return -1; } - // TODO Send initial window size when manual flow control is - // implemented. - spdylay_settings_entry entry[1]; + if(version == SPDYLAY_PROTO_SPDY3) { + int val = 1; + flow_control_ = true; + rv = spdylay_session_set_option(session_, + SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE, &val, + sizeof(val)); + assert(rv == 0); + } else { + flow_control_ = false; + } + + spdylay_settings_entry entry[2]; entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS; entry[0].value = get_config()->spdy_max_concurrent_streams; entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; + + entry[1].settings_id = SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE; + entry[1].value = get_initial_window_size(); + entry[1].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; + rv = spdylay_submit_settings (session_, SPDYLAY_FLAG_SETTINGS_NONE, entry, sizeof(entry)/sizeof(spdylay_settings_entry)); diff --git a/src/shrpx_spdy_session.h b/src/shrpx_spdy_session.h index 83a4ef99..57870446 100644 --- a/src/shrpx_spdy_session.h +++ b/src/shrpx_spdy_session.h @@ -64,9 +64,15 @@ public: uint8_t pri, const char **nv, const spdylay_data_provider *data_prd); - int submit_rst_stream(SpdyDownstreamConnection *docnn, + int submit_rst_stream(SpdyDownstreamConnection *dconn, int32_t stream_id, uint32_t status_code); + int submit_window_update(SpdyDownstreamConnection *dconn, int32_t amount); + + int32_t get_initial_window_size() const; + + bool get_flow_control() const; + int resume_data(SpdyDownstreamConnection *dconn); int on_connect(); @@ -99,6 +105,7 @@ private: bool notified_; bufferevent *wrbev_; bufferevent *rdbev_; + bool flow_control_; }; } // namespace shrpx