From c29dd0b80f75dc8cdd6bede5cf2f6ceaa3a386d6 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sun, 10 Jun 2012 01:36:30 +0900 Subject: [PATCH] Added SPDY/3 flow control --- examples/shrpx_downstream.cc | 18 ++++++- examples/shrpx_downstream.h | 4 ++ examples/shrpx_spdy_upstream.cc | 86 +++++++++++++++++++++++++++++---- examples/shrpx_spdy_upstream.h | 6 +++ 4 files changed, 103 insertions(+), 11 deletions(-) diff --git a/examples/shrpx_downstream.cc b/examples/shrpx_downstream.cc index 3c804b29..9411f050 100644 --- a/examples/shrpx_downstream.cc +++ b/examples/shrpx_downstream.cc @@ -56,7 +56,8 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) chunked_response_(false), response_connection_close_(false), response_htp_(htparser_new()), - response_body_buf_(0) + response_body_buf_(0), + recv_window_size_(0) { htparser_init(response_htp_, htp_type_response); htparser_set_userdata(response_htp_, this); @@ -531,4 +532,19 @@ void Downstream::set_priority(int pri) priority_ = pri; } +int32_t Downstream::get_recv_window_size() const +{ + return recv_window_size_; +} + +void Downstream::inc_recv_window_size(int32_t amount) +{ + recv_window_size_ += amount; +} + +void Downstream::set_recv_window_size(int32_t new_size) +{ + recv_window_size_ = new_size; +} + } // namespace shrpx diff --git a/examples/shrpx_downstream.h b/examples/shrpx_downstream.h index a0c0c4a8..5fd7b29a 100644 --- a/examples/shrpx_downstream.h +++ b/examples/shrpx_downstream.h @@ -63,6 +63,9 @@ public: // Returns true if output buffer is full. If underlying dconn_ is // NULL, this function always returns false. bool get_output_buffer_full(); + int32_t get_recv_window_size() const; + void inc_recv_window_size(int32_t amount); + void set_recv_window_size(int32_t new_size); // downstream request API const Headers& get_request_headers() const; void add_request_header(const std::string& name, const std::string& value); @@ -132,6 +135,7 @@ private: // This buffer is used to temporarily store downstream response // body. Spdylay reads data from this in the callback. evbuffer *response_body_buf_; + int32_t recv_window_size_; }; } // namespace shrpx diff --git a/examples/shrpx_spdy_upstream.cc b/examples/shrpx_spdy_upstream.cc index d7c853d9..16b2c509 100644 --- a/examples/shrpx_spdy_upstream.cc +++ b/examples/shrpx_spdy_upstream.cc @@ -195,14 +195,24 @@ void on_data_chunk_recv_callback(spdylay_session *session, const uint8_t *data, size_t len, void *user_data) { - if(ENABLE_LOG) { - LOG(INFO) << "Upstream spdy received upstream DATA data stream_id=" - << stream_id; - } SpdyUpstream *upstream = reinterpret_cast(user_data); Downstream *downstream = upstream->find_downstream(stream_id); if(downstream) { downstream->push_upload_data_chunk(data, len); + if(upstream->get_flow_control()) { + downstream->inc_recv_window_size(len); + if(downstream->get_recv_window_size() > + upstream->get_initial_window_size()) { + if(ENABLE_LOG) { + LOG(INFO) << "Flow control error: recv_window_size=" + << downstream->get_recv_window_size() + << ", initial_window_size=" + << upstream->get_initial_window_size(); + } + upstream->rst_stream(downstream, SPDYLAY_FLOW_CONTROL_ERROR); + return; + } + } if(flags & SPDYLAY_DATA_FLAG_FIN) { if(ENABLE_LOG) { LOG(INFO) << "Upstream spdy " @@ -234,13 +244,32 @@ SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler) int rv; rv = spdylay_session_server_new(&session_, version, &callbacks, this); assert(rv == 0); + + if(version == SPDYLAY_PROTO_SPDY3) { + int val = 1; + flow_control_ = true; + initial_window_size_ = 64*1024; // specified by SPDY/3 spec. + rv = spdylay_session_set_option(session_, + SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE, &val, + sizeof(val)); + assert(rv == 0); + } else { + flow_control_ = false; + initial_window_size_ = 0; + } // TODO Maybe call from outside? - spdylay_settings_entry entry; - entry.settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS; - entry.value = get_config()->spdy_max_concurrent_streams; - entry.flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; - rv = spdylay_submit_settings(session_, SPDYLAY_FLAG_SETTINGS_NONE, - &entry, 1); + spdylay_settings_entry entry[2]; + entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS; + entry[0].value = get_config()->spdy_max_concurrent_streams; + entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; + + entry[1].settings_id = SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE; + entry[1].value = initial_window_size_; + entry[1].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; + + rv = spdylay_submit_settings + (session_, SPDYLAY_FLAG_SETTINGS_NONE, + entry, sizeof(entry)/sizeof(spdylay_settings_entry)); assert(rv == 0); // TODO Maybe call from outside? send(); @@ -332,6 +361,16 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr) namespace { void spdy_downstream_writecb(bufferevent *bev, void *ptr) { + DownstreamConnection *dconn = reinterpret_cast(ptr); + Downstream *downstream = dconn->get_downstream(); + SpdyUpstream *upstream; + upstream = static_cast(downstream->get_upstream()); + if(upstream->get_flow_control()) { + if(downstream->get_recv_window_size() >= + upstream->get_initial_window_size()/2) { + upstream->window_update(downstream); + } + } } } // namespace @@ -414,6 +453,10 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) int SpdyUpstream::rst_stream(Downstream *downstream, int status_code) { + if(ENABLE_LOG) { + LOG(INFO) << "RST_STREAM stream_id=" + << downstream->get_stream_id(); + } int rv; rv = spdylay_submit_rst_stream(session_, downstream->get_stream_id(), status_code); @@ -424,6 +467,19 @@ int SpdyUpstream::rst_stream(Downstream *downstream, int status_code) } } +int SpdyUpstream::window_update(Downstream *downstream) +{ + int rv; + rv = spdylay_submit_window_update(session_, downstream->get_stream_id(), + downstream->get_recv_window_size()); + downstream->set_recv_window_size(0); + if(rv < SPDYLAY_ERR_FATAL) { + DIE(); + } else { + return 0; + } +} + namespace { ssize_t spdy_data_read_callback(spdylay_session *session, int32_t stream_id, @@ -595,4 +651,14 @@ int SpdyUpstream::on_downstream_body_complete(Downstream *downstream) return 0; } +bool SpdyUpstream::get_flow_control() const +{ + return flow_control_; +} + +int32_t SpdyUpstream::get_initial_window_size() const +{ + return initial_window_size_; +} + } // namespace shrpx diff --git a/examples/shrpx_spdy_upstream.h b/examples/shrpx_spdy_upstream.h index 90ffc858..d2400e3f 100644 --- a/examples/shrpx_spdy_upstream.h +++ b/examples/shrpx_spdy_upstream.h @@ -55,15 +55,21 @@ public: spdylay_session* get_spdy_session(); int rst_stream(Downstream *downstream, int status_code); + int window_update(Downstream *downstream); int error_reply(Downstream *downstream, int status_code); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, const uint8_t *data, size_t len); virtual int on_downstream_body_complete(Downstream *downstream); + + bool get_flow_control() const; + int32_t get_initial_window_size() const; private: ClientHandler *handler_; spdylay_session *session_; + bool flow_control_; + int32_t initial_window_size_; DownstreamQueue downstream_queue_; };