diff --git a/src/h2load.cc b/src/h2load.cc index 6e6a9cf9..7e0238ed 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -117,6 +117,7 @@ Config::~Config() { bool Config::is_rate_mode() const { return (this->rate != 0); } bool Config::is_timing_based_mode() const { return (this->duration > 0); } bool Config::has_base_uri() const { return (!this->base_uri.empty()); } +bool Config::rps_enabled() const { return this->rps > 0.0; } Config config; namespace { @@ -286,6 +287,51 @@ void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { } } // namespace +namespace { +void rps_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto client = static_cast(w->data); + auto &session = client->session; + + assert(!config.timing_script); + + if (client->req_left == 0) { + ev_timer_stop(loop, w); + return; + } + + auto now = ev_now(loop); + auto d = now - client->rps_duration_started; + auto n = static_cast(round(d * config.rps)); + client->rps_req_pending += n; + client->rps_duration_started = now - d + static_cast(n) / config.rps; + + if (client->rps_req_pending == 0) { + return; + } + + auto nreq = session->max_concurrent_streams() - client->rps_req_inflight; + if (nreq == 0) { + return; + } + + nreq = config.is_timing_based_mode() ? std::max(nreq, client->req_left) + : std::min(nreq, client->req_left); + nreq = std::min(nreq, client->rps_req_pending); + + client->rps_req_inflight += nreq; + client->rps_req_pending -= nreq; + + for (; nreq > 0; --nreq) { + if (client->submit_request() != 0) { + client->process_request_failure(); + break; + } + } + + client->signal_write(); +} +} // namespace + namespace { // Called when an a connection has been inactive for a set period of time // or a fixed amount of time after all requests have been made on a @@ -374,7 +420,10 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo) id(id), fd(-1), new_connection_requested(false), - final(false) { + final(false), + rps_duration_started(0), + rps_req_pending(0), + rps_req_inflight(0) { if (req_todo == 0) { // this means infinite number of requests are to be made // This ensures that number of requests are unbounded // Just a positive number is fine, we chose the first positive number @@ -396,6 +445,9 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo) ev_timer_init(&request_timeout_watcher, client_request_timeout_cb, 0., 0.); request_timeout_watcher.data = this; + + ev_timer_init(&rps_watcher, rps_cb, 0., 0.); + rps_watcher.data = this; } Client::~Client() { @@ -552,6 +604,7 @@ void Client::disconnect() { ev_timer_stop(worker->loop, &conn_inactivity_watcher); ev_timer_stop(worker->loop, &conn_active_watcher); + ev_timer_stop(worker->loop, &rps_watcher); ev_timer_stop(worker->loop, &request_timeout_watcher); streams.clear(); session.reset(); @@ -866,8 +919,18 @@ void Client::on_stream_close(int32_t stream_id, bool success, bool final) { if (!ev_is_active(&request_timeout_watcher)) { ev_feed_event(worker->loop, &request_timeout_watcher, EV_TIMER); } - } else if (submit_request() != 0) { - process_request_failure(); + } else if (!config.rps_enabled()) { + if (submit_request() != 0) { + process_request_failure(); + } + } else if (rps_req_pending) { + --rps_req_pending; + if (submit_request() != 0) { + process_request_failure(); + } + } else { + assert(rps_req_inflight); + --rps_req_inflight; } } } @@ -962,10 +1025,25 @@ int Client::connection_made() { record_connect_time(); - if (!config.timing_script) { + if (config.rps_enabled()) { + rps_watcher.repeat = std::max(0.01, 1. / config.rps); + ev_timer_again(worker->loop, &rps_watcher); + rps_duration_started = ev_now(worker->loop); + } + + if (config.rps_enabled()) { + assert(req_left); + + ++rps_req_inflight; + + if (submit_request() != 0) { + process_request_failure(); + } + } else if (!config.timing_script) { auto nreq = config.is_timing_based_mode() ? std::max(req_left, session->max_concurrent_streams()) : std::min(req_left, session->max_concurrent_streams()); + for (; nreq > 0; --nreq) { if (submit_request() != 0) { process_request_failure(); @@ -1943,7 +2021,8 @@ Options: port defined in the first URI are used solely. Values contained in other URIs, if present, are ignored. Definition of a base URI overrides all scheme, host or - port values. + port values. --timing-script-file and --rps are + mutually exclusive. -B, --base-uri=(|unix:) Specify URI from which the scheme, host and port will be used for all requests. The base URI overrides all @@ -1988,6 +2067,8 @@ Options: --connect-to=[:] Host and port to connect instead of using the authority in . + --rps= Specify request per second for each client. --rps and + --timing-script-file are mutually exclusive. -v, --verbose Output debug information. --version Display version information and exit. @@ -2047,6 +2128,7 @@ int main(int argc, char **argv) { {"warm-up-time", required_argument, &flag, 9}, {"log-file", required_argument, &flag, 10}, {"connect-to", required_argument, &flag, 11}, + {"rps", required_argument, &flag, 12}, {nullptr, 0, nullptr, 0}}; int option_index = 0; auto c = getopt_long(argc, argv, @@ -2286,6 +2368,17 @@ int main(int argc, char **argv) { config.connect_to_port = port; break; } + case 12: { + char *end; + auto v = std::strtod(optarg, &end); + if (end == optarg || *end != '\0' || !std::isfinite(v) || + 1. / v < 1e-6) { + std::cerr << "--rps: Invalid value " << optarg << std::endl; + exit(EXIT_FAILURE); + } + config.rps = v; + break; + } } break; default: @@ -2376,6 +2469,12 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } + if (config.timing_script && config.rps_enabled()) { + std::cerr << "--timing-script-file, --rps: they are mutually exclusive." + << std::endl; + exit(EXIT_FAILURE); + } + if (config.nreqs == 0 && !config.is_timing_based_mode()) { std::cerr << "-n: the number of requests must be strictly greater than 0 " "if timing-based test is not being run." diff --git a/src/h2load.h b/src/h2load.h index ca689976..89100e96 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -114,6 +114,8 @@ struct Config { // list of supported NPN/ALPN protocol strings in the order of // preference. std::vector npn_list; + // The number of request per second for each client. + double rps; Config(); ~Config(); @@ -121,6 +123,7 @@ struct Config { bool is_rate_mode() const; bool is_timing_based_mode() const; bool has_base_uri() const; + bool rps_enabled() const; }; struct RequestStat { @@ -336,6 +339,20 @@ struct Client { // true if the current connection will be closed, and no more new // request cannot be processed. bool final; + // rps_watcher is a timer to invoke callback periodically to + // generate a new request. + ev_timer rps_watcher; + // The timestamp that starts the period which contributes to the + // next request generation. + ev_tstamp rps_duration_started; + // The number of requests allowed by rps, but limited by stream + // concurrency. + size_t rps_req_pending; + // The number of in-flight streams. req_inflight has similar value + // but it only measures requests made during Phase::MAIN_DURATION. + // rps_req_inflight measures the number of requests in all phases, + // and it is only used if --rps is given. + size_t rps_req_inflight; enum { ERR_CONNECT_FAIL = -100 };