shrpx: Implement downstream SPDY flow control

This commit is contained in:
Tatsuhiro Tsujikawa 2012-11-21 23:47:48 +09:00
parent 0bf15a7694
commit 81adb6bc7f
13 changed files with 159 additions and 15 deletions

View File

@ -325,9 +325,10 @@ void fill_default_config()
// Timeout for pooled (idle) connections
mod_config()->downstream_idle_read_timeout.tv_sec = 60;
// window bits for SPDY upstream connection
// 2**16 = 64KiB, which is SPDY/3 default.
// window bits for SPDY upstream/downstream connection. 2**16 =
// 64KiB, which is SPDY/3 default.
mod_config()->spdy_upstream_window_bits = 16;
mod_config()->spdy_downstream_window_bits = 16;
set_config_str(&mod_config()->downstream_host, "127.0.0.1");
mod_config()->downstream_port = 80;
@ -443,6 +444,11 @@ void print_help(std::ostream& out)
<< " frontend connection to 2**<N>.\n"
<< " Default: "
<< get_config()->spdy_upstream_window_bits << "\n"
<< " --backend-spdy-window-bits=<N>\n"
<< " Sets the initial window size of SPDY\n"
<< " backend connection to 2**<N>.\n"
<< " Default: "
<< get_config()->spdy_downstream_window_bits << "\n"
<< " --pid-file=<PATH> Set path to save PID of this program.\n"
<< " --user=<USER> Run this program as USER. This option is\n"
<< " intended to be used to drop root privileges.\n"
@ -499,6 +505,7 @@ int main(int argc, char **argv)
{"backlog", required_argument, &flag, 15 },
{"ciphers", required_argument, &flag, 16 },
{"client", no_argument, &flag, 17 },
{"backend-spdy-window-bits", required_argument, &flag, 18 },
{"help", no_argument, 0, 'h' },
{0, 0, 0, 0 }
};
@ -614,6 +621,11 @@ int main(int argc, char **argv)
// --client
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_CLIENT, "yes"));
break;
case 18:
// --backend-spdy-window-bits
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS,
optarg));
break;
default:
break;
}

View File

@ -63,6 +63,7 @@ const char SHRPX_OPT_ACCESSLOG[] = "accesslog";
const char
SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT[] = "backend-keep-alive-timeout";
const char SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS[] = "frontend-spdy-window-bits";
const char SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS[] = "backend-spdy-window-bits";
const char SHRPX_OPT_PID_FILE[] = "pid-file";
const char SHRPX_OPT_USER[] = "user";
const char SHRPX_OPT_SYSLOG[] = "syslog";
@ -91,6 +92,7 @@ Config::Config()
add_x_forwarded_for(false),
accesslog(false),
spdy_upstream_window_bits(0),
spdy_downstream_window_bits(0),
pid_file(0),
uid(0),
gid(0),
@ -216,13 +218,24 @@ int parse_config(const char *opt, const char *optarg)
} else if(util::strieq(opt, SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT)) {
timeval tv = {strtol(optarg, 0, 10), 0};
mod_config()->downstream_idle_read_timeout = tv;
} else if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS)) {
} else if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS) ||
util::strieq(opt, SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS)) {
size_t *resp;
const char *optname;
if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS)) {
resp = &mod_config()->spdy_upstream_window_bits;
optname = SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS;
} else {
resp = &mod_config()->spdy_downstream_window_bits;
optname = SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS;
}
errno = 0;
unsigned long int n = strtoul(optarg, 0, 10);
if(errno == 0 && n < 31) {
mod_config()->spdy_upstream_window_bits = n;
*resp = n;
} else {
LOG(ERROR) << "-w: specify the integer in the range [0, 30], inclusive";
LOG(ERROR) << "--" << optname
<< " specify the integer in the range [0, 30], inclusive";
return -1;
}
} else if(util::strieq(opt, SHRPX_OPT_PID_FILE)) {

View File

@ -55,6 +55,7 @@ extern const char SHRPX_OPT_BACKEND_WRITE_TIMEOUT[];
extern const char SHRPX_OPT_ACCESSLOG[];
extern const char SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT[];
extern const char SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS[];
extern const char SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS[];
extern const char SHRPX_OPT_PID_FILE[];
extern const char SHRPX_OPT_USER[];
extern const char SHRPX_OPT_SYSLOG[];
@ -97,6 +98,7 @@ struct Config {
bool add_x_forwarded_for;
bool accesslog;
size_t spdy_upstream_window_bits;
size_t spdy_downstream_window_bits;
char *pid_file;
uid_t uid;
gid_t gid;

View File

@ -487,4 +487,13 @@ int32_t Downstream::get_downstream_stream_id() const
return downstream_stream_id_;
}
int Downstream::on_upstream_write()
{
if(dconn_) {
return dconn_->on_upstream_write();
} else {
return 0;
}
}
} // namespace shrpx

View File

@ -132,6 +132,7 @@ public:
// Call this method when there is incoming data in downstream
// connection.
int on_read();
int on_upstream_write();
static const size_t OUTPUT_UPPER_THRES = 64*1024;
private:

View File

@ -53,6 +53,7 @@ public:
virtual int on_read() = 0;
virtual int on_write() = 0;
virtual int on_upstream_write() = 0;
ClientHandler* get_client_handler();
Downstream* get_downstream();

View File

@ -447,4 +447,9 @@ int HttpDownstreamConnection::on_write()
return 0;
}
int HttpDownstreamConnection::on_upstream_write()
{
return 0;
}
} // namespace shrpx

View File

@ -56,6 +56,7 @@ public:
virtual int on_read();
virtual int on_write();
virtual int on_upstream_write();
bufferevent* get_bev();
private:

View File

@ -311,11 +311,13 @@ int HttpsUpstream::on_read()
int HttpsUpstream::on_write()
{
int rv = 0;
Downstream *downstream = get_downstream();
if(downstream) {
downstream->resume_read(SHRPX_NO_BUFFER);
rv = downstream->on_upstream_write();
}
return 0;
return rv;
}
int HttpsUpstream::on_event()

View File

@ -50,7 +50,8 @@ SpdyDownstreamConnection::SpdyDownstreamConnection
: DownstreamConnection(client_handler),
spdy_(client_handler->get_spdy_session()),
request_body_buf_(0),
sd_(0)
sd_(0),
recv_window_size_(0)
{}
SpdyDownstreamConnection::~SpdyDownstreamConnection()
@ -101,7 +102,7 @@ int SpdyDownstreamConnection::attach_downstream(Downstream *downstream)
}
downstream->set_downstream_connection(this);
downstream_ = downstream;
recv_window_size_ = 0;
return 0;
}
@ -362,6 +363,23 @@ int SpdyDownstreamConnection::on_write()
return 0;
}
int SpdyDownstreamConnection::on_upstream_write()
{
int rv;
if(spdy_->get_state() == SpdySession::CONNECTED &&
spdy_->get_flow_control() &&
downstream_ && downstream_->get_downstream_stream_id() != -1 &&
recv_window_size_ >= spdy_->get_initial_window_size()/2) {
rv = spdy_->submit_window_update(this, recv_window_size_);
if(rv == -1) {
return -1;
}
spdy_->notify();
recv_window_size_ = 0;
}
return 0;
}
evbuffer* SpdyDownstreamConnection::get_request_body_buf() const
{
return request_body_buf_;
@ -396,4 +414,14 @@ bool SpdyDownstreamConnection::get_output_buffer_full()
}
}
int32_t SpdyDownstreamConnection::get_recv_window_size() const
{
return recv_window_size_;
}
void SpdyDownstreamConnection::inc_recv_window_size(int32_t amount)
{
recv_window_size_ += amount;
}
} // namespace shrpx

View File

@ -57,6 +57,7 @@ public:
virtual int on_read();
virtual int on_write();
virtual int on_upstream_write();
int send();
int init_request_body_buf();
@ -64,10 +65,14 @@ public:
void attach_stream_data(StreamData *sd);
StreamData* detach_stream_data();
int32_t get_recv_window_size() const;
void inc_recv_window_size(int32_t amount);
private:
SpdySession *spdy_;
evbuffer *request_body_buf_;
StreamData *sd_;
int32_t recv_window_size_;
};
} // namespace shrpx

View File

@ -52,7 +52,8 @@ SpdySession::SpdySession(event_base *evbase, SSL_CTX *ssl_ctx)
state_(DISCONNECTED),
notified_(false),
wrbev_(0),
rdbev_(0)
rdbev_(0),
flow_control_(false)
{}
SpdySession::~SpdySession()
@ -313,7 +314,7 @@ int SpdySession::submit_request(SpdyDownstreamConnection *dconn,
return 0;
}
int SpdySession::submit_rst_stream(SpdyDownstreamConnection *docnn,
int SpdySession::submit_rst_stream(SpdyDownstreamConnection *dconn,
int32_t stream_id, uint32_t status_code)
{
assert(state_ == CONNECTED);
@ -326,6 +327,32 @@ int SpdySession::submit_rst_stream(SpdyDownstreamConnection *docnn,
return 0;
}
int SpdySession::submit_window_update(SpdyDownstreamConnection *dconn,
int32_t amount)
{
assert(state_ == CONNECTED);
int rv;
int32_t stream_id;
stream_id = dconn->get_downstream()->get_downstream_stream_id();
rv = spdylay_submit_window_update(session_, stream_id, amount);
if(rv < SPDYLAY_ERR_FATAL) {
LOG(FATAL) << "spdylay_submit_window_update() failed: "
<< spdylay_strerror(rv);
return -1;
}
return 0;
}
int32_t SpdySession::get_initial_window_size() const
{
return 1 << get_config()->spdy_downstream_window_bits;
}
bool SpdySession::get_flow_control() const
{
return flow_control_;
}
int SpdySession::resume_data(SpdyDownstreamConnection *dconn)
{
assert(state_ == CONNECTED);
@ -585,7 +612,24 @@ void on_data_chunk_recv_callback(spdylay_session *session,
spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR);
return;
}
// TODO No manual flow control at the moment.
if(spdy->get_flow_control()) {
sd->dconn->inc_recv_window_size(len);
if(sd->dconn->get_recv_window_size() > spdy->get_initial_window_size()) {
if(ENABLE_LOG) {
LOG(INFO) << "Flow control error: recv_window_size="
<< sd->dconn->get_recv_window_size()
<< ", initial_window_size="
<< spdy->get_initial_window_size();
}
spdylay_submit_rst_stream(session, stream_id,
SPDYLAY_FLOW_CONTROL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
call_downstream_readcb(spdy, downstream);
return;
}
}
Upstream *upstream = downstream->get_upstream();
rv = upstream->on_downstream_body(downstream, data, len);
if(rv != 0) {
@ -719,12 +763,26 @@ int SpdySession::on_connect()
return -1;
}
// TODO Send initial window size when manual flow control is
// implemented.
spdylay_settings_entry entry[1];
if(version == SPDYLAY_PROTO_SPDY3) {
int val = 1;
flow_control_ = true;
rv = spdylay_session_set_option(session_,
SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE, &val,
sizeof(val));
assert(rv == 0);
} else {
flow_control_ = false;
}
spdylay_settings_entry entry[2];
entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[0].value = get_config()->spdy_max_concurrent_streams;
entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
entry[1].settings_id = SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE;
entry[1].value = get_initial_window_size();
entry[1].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
rv = spdylay_submit_settings
(session_, SPDYLAY_FLAG_SETTINGS_NONE,
entry, sizeof(entry)/sizeof(spdylay_settings_entry));

View File

@ -64,9 +64,15 @@ public:
uint8_t pri, const char **nv,
const spdylay_data_provider *data_prd);
int submit_rst_stream(SpdyDownstreamConnection *docnn,
int submit_rst_stream(SpdyDownstreamConnection *dconn,
int32_t stream_id, uint32_t status_code);
int submit_window_update(SpdyDownstreamConnection *dconn, int32_t amount);
int32_t get_initial_window_size() const;
bool get_flow_control() const;
int resume_data(SpdyDownstreamConnection *dconn);
int on_connect();
@ -99,6 +105,7 @@ private:
bool notified_;
bufferevent *wrbev_;
bufferevent *rdbev_;
bool flow_control_;
};
} // namespace shrpx