Added SPDY/3 flow control
This commit is contained in:
parent
c2785955ca
commit
c29dd0b80f
|
@ -56,7 +56,8 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
|
||||||
chunked_response_(false),
|
chunked_response_(false),
|
||||||
response_connection_close_(false),
|
response_connection_close_(false),
|
||||||
response_htp_(htparser_new()),
|
response_htp_(htparser_new()),
|
||||||
response_body_buf_(0)
|
response_body_buf_(0),
|
||||||
|
recv_window_size_(0)
|
||||||
{
|
{
|
||||||
htparser_init(response_htp_, htp_type_response);
|
htparser_init(response_htp_, htp_type_response);
|
||||||
htparser_set_userdata(response_htp_, this);
|
htparser_set_userdata(response_htp_, this);
|
||||||
|
@ -531,4 +532,19 @@ void Downstream::set_priority(int pri)
|
||||||
priority_ = pri;
|
priority_ = pri;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t Downstream::get_recv_window_size() const
|
||||||
|
{
|
||||||
|
return recv_window_size_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Downstream::inc_recv_window_size(int32_t amount)
|
||||||
|
{
|
||||||
|
recv_window_size_ += amount;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Downstream::set_recv_window_size(int32_t new_size)
|
||||||
|
{
|
||||||
|
recv_window_size_ = new_size;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -63,6 +63,9 @@ public:
|
||||||
// Returns true if output buffer is full. If underlying dconn_ is
|
// Returns true if output buffer is full. If underlying dconn_ is
|
||||||
// NULL, this function always returns false.
|
// NULL, this function always returns false.
|
||||||
bool get_output_buffer_full();
|
bool get_output_buffer_full();
|
||||||
|
int32_t get_recv_window_size() const;
|
||||||
|
void inc_recv_window_size(int32_t amount);
|
||||||
|
void set_recv_window_size(int32_t new_size);
|
||||||
// downstream request API
|
// downstream request API
|
||||||
const Headers& get_request_headers() const;
|
const Headers& get_request_headers() const;
|
||||||
void add_request_header(const std::string& name, const std::string& value);
|
void add_request_header(const std::string& name, const std::string& value);
|
||||||
|
@ -132,6 +135,7 @@ private:
|
||||||
// This buffer is used to temporarily store downstream response
|
// This buffer is used to temporarily store downstream response
|
||||||
// body. Spdylay reads data from this in the callback.
|
// body. Spdylay reads data from this in the callback.
|
||||||
evbuffer *response_body_buf_;
|
evbuffer *response_body_buf_;
|
||||||
|
int32_t recv_window_size_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -195,14 +195,24 @@ void on_data_chunk_recv_callback(spdylay_session *session,
|
||||||
const uint8_t *data, size_t len,
|
const uint8_t *data, size_t len,
|
||||||
void *user_data)
|
void *user_data)
|
||||||
{
|
{
|
||||||
if(ENABLE_LOG) {
|
|
||||||
LOG(INFO) << "Upstream spdy received upstream DATA data stream_id="
|
|
||||||
<< stream_id;
|
|
||||||
}
|
|
||||||
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
|
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
|
||||||
Downstream *downstream = upstream->find_downstream(stream_id);
|
Downstream *downstream = upstream->find_downstream(stream_id);
|
||||||
if(downstream) {
|
if(downstream) {
|
||||||
downstream->push_upload_data_chunk(data, len);
|
downstream->push_upload_data_chunk(data, len);
|
||||||
|
if(upstream->get_flow_control()) {
|
||||||
|
downstream->inc_recv_window_size(len);
|
||||||
|
if(downstream->get_recv_window_size() >
|
||||||
|
upstream->get_initial_window_size()) {
|
||||||
|
if(ENABLE_LOG) {
|
||||||
|
LOG(INFO) << "Flow control error: recv_window_size="
|
||||||
|
<< downstream->get_recv_window_size()
|
||||||
|
<< ", initial_window_size="
|
||||||
|
<< upstream->get_initial_window_size();
|
||||||
|
}
|
||||||
|
upstream->rst_stream(downstream, SPDYLAY_FLOW_CONTROL_ERROR);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
if(flags & SPDYLAY_DATA_FLAG_FIN) {
|
if(flags & SPDYLAY_DATA_FLAG_FIN) {
|
||||||
if(ENABLE_LOG) {
|
if(ENABLE_LOG) {
|
||||||
LOG(INFO) << "Upstream spdy "
|
LOG(INFO) << "Upstream spdy "
|
||||||
|
@ -234,13 +244,32 @@ SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
|
||||||
int rv;
|
int rv;
|
||||||
rv = spdylay_session_server_new(&session_, version, &callbacks, this);
|
rv = spdylay_session_server_new(&session_, version, &callbacks, this);
|
||||||
assert(rv == 0);
|
assert(rv == 0);
|
||||||
|
|
||||||
|
if(version == SPDYLAY_PROTO_SPDY3) {
|
||||||
|
int val = 1;
|
||||||
|
flow_control_ = true;
|
||||||
|
initial_window_size_ = 64*1024; // specified by SPDY/3 spec.
|
||||||
|
rv = spdylay_session_set_option(session_,
|
||||||
|
SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE, &val,
|
||||||
|
sizeof(val));
|
||||||
|
assert(rv == 0);
|
||||||
|
} else {
|
||||||
|
flow_control_ = false;
|
||||||
|
initial_window_size_ = 0;
|
||||||
|
}
|
||||||
// TODO Maybe call from outside?
|
// TODO Maybe call from outside?
|
||||||
spdylay_settings_entry entry;
|
spdylay_settings_entry entry[2];
|
||||||
entry.settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS;
|
entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS;
|
||||||
entry.value = get_config()->spdy_max_concurrent_streams;
|
entry[0].value = get_config()->spdy_max_concurrent_streams;
|
||||||
entry.flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
|
entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
|
||||||
rv = spdylay_submit_settings(session_, SPDYLAY_FLAG_SETTINGS_NONE,
|
|
||||||
&entry, 1);
|
entry[1].settings_id = SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE;
|
||||||
|
entry[1].value = initial_window_size_;
|
||||||
|
entry[1].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
|
||||||
|
|
||||||
|
rv = spdylay_submit_settings
|
||||||
|
(session_, SPDYLAY_FLAG_SETTINGS_NONE,
|
||||||
|
entry, sizeof(entry)/sizeof(spdylay_settings_entry));
|
||||||
assert(rv == 0);
|
assert(rv == 0);
|
||||||
// TODO Maybe call from outside?
|
// TODO Maybe call from outside?
|
||||||
send();
|
send();
|
||||||
|
@ -332,6 +361,16 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr)
|
||||||
namespace {
|
namespace {
|
||||||
void spdy_downstream_writecb(bufferevent *bev, void *ptr)
|
void spdy_downstream_writecb(bufferevent *bev, void *ptr)
|
||||||
{
|
{
|
||||||
|
DownstreamConnection *dconn = reinterpret_cast<DownstreamConnection*>(ptr);
|
||||||
|
Downstream *downstream = dconn->get_downstream();
|
||||||
|
SpdyUpstream *upstream;
|
||||||
|
upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
|
||||||
|
if(upstream->get_flow_control()) {
|
||||||
|
if(downstream->get_recv_window_size() >=
|
||||||
|
upstream->get_initial_window_size()/2) {
|
||||||
|
upstream->window_update(downstream);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
@ -414,6 +453,10 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
|
||||||
|
|
||||||
int SpdyUpstream::rst_stream(Downstream *downstream, int status_code)
|
int SpdyUpstream::rst_stream(Downstream *downstream, int status_code)
|
||||||
{
|
{
|
||||||
|
if(ENABLE_LOG) {
|
||||||
|
LOG(INFO) << "RST_STREAM stream_id="
|
||||||
|
<< downstream->get_stream_id();
|
||||||
|
}
|
||||||
int rv;
|
int rv;
|
||||||
rv = spdylay_submit_rst_stream(session_, downstream->get_stream_id(),
|
rv = spdylay_submit_rst_stream(session_, downstream->get_stream_id(),
|
||||||
status_code);
|
status_code);
|
||||||
|
@ -424,6 +467,19 @@ int SpdyUpstream::rst_stream(Downstream *downstream, int status_code)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int SpdyUpstream::window_update(Downstream *downstream)
|
||||||
|
{
|
||||||
|
int rv;
|
||||||
|
rv = spdylay_submit_window_update(session_, downstream->get_stream_id(),
|
||||||
|
downstream->get_recv_window_size());
|
||||||
|
downstream->set_recv_window_size(0);
|
||||||
|
if(rv < SPDYLAY_ERR_FATAL) {
|
||||||
|
DIE();
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
ssize_t spdy_data_read_callback(spdylay_session *session,
|
ssize_t spdy_data_read_callback(spdylay_session *session,
|
||||||
int32_t stream_id,
|
int32_t stream_id,
|
||||||
|
@ -595,4 +651,14 @@ int SpdyUpstream::on_downstream_body_complete(Downstream *downstream)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool SpdyUpstream::get_flow_control() const
|
||||||
|
{
|
||||||
|
return flow_control_;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t SpdyUpstream::get_initial_window_size() const
|
||||||
|
{
|
||||||
|
return initial_window_size_;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -55,15 +55,21 @@ public:
|
||||||
spdylay_session* get_spdy_session();
|
spdylay_session* get_spdy_session();
|
||||||
|
|
||||||
int rst_stream(Downstream *downstream, int status_code);
|
int rst_stream(Downstream *downstream, int status_code);
|
||||||
|
int window_update(Downstream *downstream);
|
||||||
int error_reply(Downstream *downstream, int status_code);
|
int error_reply(Downstream *downstream, int status_code);
|
||||||
|
|
||||||
virtual int on_downstream_header_complete(Downstream *downstream);
|
virtual int on_downstream_header_complete(Downstream *downstream);
|
||||||
virtual int on_downstream_body(Downstream *downstream,
|
virtual int on_downstream_body(Downstream *downstream,
|
||||||
const uint8_t *data, size_t len);
|
const uint8_t *data, size_t len);
|
||||||
virtual int on_downstream_body_complete(Downstream *downstream);
|
virtual int on_downstream_body_complete(Downstream *downstream);
|
||||||
|
|
||||||
|
bool get_flow_control() const;
|
||||||
|
int32_t get_initial_window_size() const;
|
||||||
private:
|
private:
|
||||||
ClientHandler *handler_;
|
ClientHandler *handler_;
|
||||||
spdylay_session *session_;
|
spdylay_session *session_;
|
||||||
|
bool flow_control_;
|
||||||
|
int32_t initial_window_size_;
|
||||||
DownstreamQueue downstream_queue_;
|
DownstreamQueue downstream_queue_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue