h2load: Handle request submission failure

If request submission is failed, make all remaining requests for that
client fail.
This commit is contained in:
Tatsuhiro Tsujikawa 2015-10-29 22:31:03 +09:00
parent e3878b619f
commit ad395f0603
9 changed files with 74 additions and 19 deletions

View File

@ -198,7 +198,11 @@ namespace {
void client_request_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { void client_request_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto client = static_cast<Client *>(w->data); auto client = static_cast<Client *>(w->data);
client->submit_request(); if (client->submit_request() != 0) {
ev_timer_stop(client->worker->loop, w);
client->process_request_failure();
return;
}
client->signal_write(); client->signal_write();
if (check_stop_client_request_timeout(client, w)) { if (check_stop_client_request_timeout(client, w)) {
@ -209,7 +213,11 @@ void client_request_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
config.timings[client->reqidx] - config.timings[client->reqidx - 1]; config.timings[client->reqidx] - config.timings[client->reqidx - 1];
while (duration < 1e-9) { while (duration < 1e-9) {
client->submit_request(); if (client->submit_request() != 0) {
ev_timer_stop(client->worker->loop, w);
client->process_request_failure();
return;
}
client->signal_write(); client->signal_write();
if (check_stop_client_request_timeout(client, w)) { if (check_stop_client_request_timeout(client, w)) {
return; return;
@ -388,9 +396,13 @@ void Client::disconnect() {
} }
} }
void Client::submit_request() { int Client::submit_request() {
auto req_stat = &worker->stats.req_stats[worker->stats.req_started++]; auto req_stat = &worker->stats.req_stats[worker->stats.req_started++];
session->submit_request(req_stat);
if (session->submit_request(req_stat) != 0) {
return -1;
}
++req_started; ++req_started;
// if an active timeout is set and this is the last request to be submitted // if an active timeout is set and this is the last request to be submitted
@ -398,6 +410,8 @@ void Client::submit_request() {
if (worker->config->conn_active_timeout > 0. && req_started >= req_todo) { if (worker->config->conn_active_timeout > 0. && req_started >= req_todo) {
ev_timer_start(worker->loop, &conn_active_watcher); ev_timer_start(worker->loop, &conn_active_watcher);
} }
return 0;
} }
void Client::process_timedout_streams() { void Client::process_timedout_streams() {
@ -423,6 +437,21 @@ void Client::process_abandoned_streams() {
req_done = req_todo; req_done = req_todo;
} }
void Client::process_request_failure() {
auto req_abandoned = req_todo - req_started;
worker->stats.req_failed += req_abandoned;
worker->stats.req_error += req_abandoned;
worker->stats.req_done += req_abandoned;
req_done += req_abandoned;
if (req_done == req_todo) {
terminate_session();
return;
}
}
void Client::report_progress() { void Client::report_progress() {
if (!worker->config->is_rate_mode() && worker->id == 0 && if (!worker->config->is_rate_mode() && worker->id == 0 &&
worker->stats.req_done % worker->progress_interval == 0) { worker->stats.req_done % worker->progress_interval == 0) {
@ -488,7 +517,11 @@ void Client::report_app_info() {
} }
} }
void Client::terminate_session() { session->terminate(); } void Client::terminate_session() {
session->terminate();
// http1 session needs writecb to tear down session.
signal_write();
}
void Client::on_request(int32_t stream_id) { streams[stream_id] = Stream(); } void Client::on_request(int32_t stream_id) { streams[stream_id] = Stream(); }
@ -579,7 +612,9 @@ void Client::on_stream_close(int32_t stream_id, bool success,
if (!config.timing_script && !final) { if (!config.timing_script && !final) {
if (req_started < req_todo) { if (req_started < req_todo) {
submit_request(); if (submit_request() != 0) {
process_request_failure();
}
return; return;
} }
} }
@ -692,14 +727,20 @@ int Client::connection_made() {
std::min(req_todo - req_started, (size_t)config.max_concurrent_streams); std::min(req_todo - req_started, (size_t)config.max_concurrent_streams);
for (; nreq > 0; --nreq) { for (; nreq > 0; --nreq) {
submit_request(); if (submit_request() != 0) {
process_request_failure();
break;
}
} }
} else { } else {
ev_tstamp duration = config.timings[reqidx]; ev_tstamp duration = config.timings[reqidx];
while (duration < 1e-9) { while (duration < 1e-9) {
submit_request(); if (submit_request() != 0) {
process_request_failure();
break;
}
duration = config.timings[reqidx]; duration = config.timings[reqidx];
} }

View File

@ -260,7 +260,8 @@ struct Client {
void fail(); void fail();
void timeout(); void timeout();
void restart_timeout(); void restart_timeout();
void submit_request(); int submit_request();
void process_request_failure();
void process_timedout_streams(); void process_timedout_streams();
void process_abandoned_streams(); void process_abandoned_streams();
void report_progress(); void report_progress();

View File

@ -140,7 +140,7 @@ http_parser_settings htp_hooks = {
void Http1Session::on_connect() { client_->signal_write(); } void Http1Session::on_connect() { client_->signal_write(); }
void Http1Session::submit_request(RequestStat *req_stat) { int Http1Session::submit_request(RequestStat *req_stat) {
auto config = client_->worker->config; auto config = client_->worker->config;
auto req = config->h1reqs[client_->reqidx]; auto req = config->h1reqs[client_->reqidx];
client_->reqidx++; client_->reqidx++;
@ -158,6 +158,8 @@ void Http1Session::submit_request(RequestStat *req_stat) {
// increment for next request // increment for next request
stream_req_counter_ += 2; stream_req_counter_ += 2;
return 0;
} }
int Http1Session::on_read(const uint8_t *data, size_t len) { int Http1Session::on_read(const uint8_t *data, size_t len) {

View File

@ -38,7 +38,7 @@ public:
Http1Session(Client *client); Http1Session(Client *client);
virtual ~Http1Session(); virtual ~Http1Session();
virtual void on_connect(); virtual void on_connect();
virtual void submit_request(RequestStat *req_stat); virtual int submit_request(RequestStat *req_stat);
virtual int on_read(const uint8_t *data, size_t len); virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write(); virtual int on_write();
virtual void terminate(); virtual void terminate();

View File

@ -209,7 +209,7 @@ void Http2Session::on_connect() {
client_->signal_write(); client_->signal_write();
} }
void Http2Session::submit_request(RequestStat *req_stat) { int Http2Session::submit_request(RequestStat *req_stat) {
auto config = client_->worker->config; auto config = client_->worker->config;
auto &nva = config->nva[client_->reqidx++]; auto &nva = config->nva[client_->reqidx++];
@ -222,7 +222,11 @@ void Http2Session::submit_request(RequestStat *req_stat) {
auto stream_id = auto stream_id =
nghttp2_submit_request(session_, nullptr, nva.data(), nva.size(), nghttp2_submit_request(session_, nullptr, nva.data(), nva.size(),
config->data_fd == -1 ? nullptr : &prd, req_stat); config->data_fd == -1 ? nullptr : &prd, req_stat);
assert(stream_id > 0); if (stream_id < 0) {
return -1;
}
return 0;
} }
int Http2Session::on_read(const uint8_t *data, size_t len) { int Http2Session::on_read(const uint8_t *data, size_t len) {

View File

@ -38,7 +38,7 @@ public:
Http2Session(Client *client); Http2Session(Client *client);
virtual ~Http2Session(); virtual ~Http2Session();
virtual void on_connect(); virtual void on_connect();
virtual void submit_request(RequestStat *req_stat); virtual int submit_request(RequestStat *req_stat);
virtual int on_read(const uint8_t *data, size_t len); virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write(); virtual int on_write();
virtual void terminate(); virtual void terminate();

View File

@ -41,7 +41,7 @@ public:
// Called when the connection was made. // Called when the connection was made.
virtual void on_connect() = 0; virtual void on_connect() = 0;
// Called when one request must be issued. // Called when one request must be issued.
virtual void submit_request(RequestStat *req_stat) = 0; virtual int submit_request(RequestStat *req_stat) = 0;
// Called when incoming bytes are available. The subclass has to // Called when incoming bytes are available. The subclass has to
// return the number of bytes read. // return the number of bytes read.
virtual int on_read(const uint8_t *data, size_t len) = 0; virtual int on_read(const uint8_t *data, size_t len) = 0;

View File

@ -178,7 +178,8 @@ void SpdySession::on_connect() {
client_->signal_write(); client_->signal_write();
} }
void SpdySession::submit_request(RequestStat *req_stat) { int SpdySession::submit_request(RequestStat *req_stat) {
int rv;
auto config = client_->worker->config; auto config = client_->worker->config;
auto &nv = config->nv[client_->reqidx++]; auto &nv = config->nv[client_->reqidx++];
@ -188,8 +189,14 @@ void SpdySession::submit_request(RequestStat *req_stat) {
spdylay_data_provider prd{{0}, file_read_callback}; spdylay_data_provider prd{{0}, file_read_callback};
spdylay_submit_request(session_, 0, nv.data(), rv = spdylay_submit_request(session_, 0, nv.data(),
config->data_fd == -1 ? nullptr : &prd, req_stat); config->data_fd == -1 ? nullptr : &prd, req_stat);
if (rv != 0) {
return -1;
}
return 0;
} }
int SpdySession::on_read(const uint8_t *data, size_t len) { int SpdySession::on_read(const uint8_t *data, size_t len) {

View File

@ -40,7 +40,7 @@ public:
SpdySession(Client *client, uint16_t spdy_version); SpdySession(Client *client, uint16_t spdy_version);
virtual ~SpdySession(); virtual ~SpdySession();
virtual void on_connect(); virtual void on_connect();
virtual void submit_request(RequestStat *req_stat); virtual int submit_request(RequestStat *req_stat);
virtual int on_read(const uint8_t *data, size_t len); virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write(); virtual int on_write();
virtual void terminate(); virtual void terminate();