diff --git a/src/h2load.cc b/src/h2load.cc index 334b1e2b..74832bc2 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -116,6 +116,7 @@ Config::~Config() { bool Config::is_rate_mode() const { return (this->rate != 0); } bool Config::is_timing_based_mode() const { return (this->duration > 0); } bool Config::has_base_uri() const { return (!this->base_uri.empty()); } +bool Config::is_qps_mode() const { return (this->qps != 0); } Config config; namespace { @@ -246,6 +247,10 @@ void duration_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { worker->current_phase = Phase::DURATION_OVER; + if (worker->config->is_qps_mode()) { + ev_periodic_stop(worker->loop, &worker->qps_watcher); + } + std::cout << "Main benchmark duration is over for thread #" << worker->id << ". Stopping all clients." << std::endl; worker->stop_all_clients(); @@ -282,6 +287,9 @@ void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { worker->current_phase = Phase::MAIN_DURATION; ev_timer_start(worker->loop, &worker->duration_watcher); + if (worker->config->is_qps_mode()) { + ev_periodic_start(worker->loop, &worker->qps_watcher); + } } } // namespace @@ -577,6 +585,15 @@ void Client::disconnect() { } int Client::submit_request() { + if (config.is_qps_mode()) { + if (worker->qps_left == 0) { + worker->clients_blocked_due_to_qps.push(this); + return 0; + } else { + --worker->qps_left; + } + } + if (session->submit_request() != 0) { return -1; } @@ -1262,6 +1279,33 @@ int get_ev_loop_flags() { } } // namespace +namespace { +void update_worker_qpsLeft(struct ev_loop *loop, ev_periodic *w, int revents) { + auto worker = static_cast(w->data); + + // update qpsLeft + worker->qps_left += worker->qps_counts[worker->qps_count_index]; + worker->qps_count_index = + (worker->qps_count_index + 1) % worker->qps_counts.size(); + + // wake up clients which are blocked due to qps limit + while (worker->qps_left && !worker->clients_blocked_due_to_qps.empty()) { + Client *c = worker->clients_blocked_due_to_qps.front(); + worker->clients_blocked_due_to_qps.pop(); + if (c->submit_request() != 0) { + c->process_request_failure(); + } + c->signal_write(); + } +} +} // namespace + +namespace { +// update 'qpsLeft' every 5ms +constexpr size_t qps_update_period_ms = 5; +constexpr size_t qps_update_per_second = 1000 / qps_update_period_ms; +} // namespace + Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, size_t rate, size_t max_samples, Config *config) : stats(req_todo, nclients), @@ -1277,7 +1321,9 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, nreqs_rem(req_todo % nclients), rate(rate), max_samples(max_samples), - next_client_id(0) { + next_client_id(0), + qps_left(0), + qps_count_index(0) { if (!config->is_rate_mode() && !config->is_timing_based_mode()) { progress_interval = std::max(static_cast(1), req_todo / 10); } else { @@ -1307,6 +1353,12 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, ev_timer_init(&warmup_watcher, warmup_timeout_cb, config->warm_up_time, 0.); warmup_watcher.data = this; + if (config->is_qps_mode()) { + ev_periodic_init(&qps_watcher, update_worker_qpsLeft, 0., + (double)qps_update_period_ms / 1000.0, 0); + qps_watcher.data = this; + } + if (config->is_timing_based_mode()) { current_phase = Phase::INITIAL_IDLE; } else { @@ -1413,6 +1465,8 @@ void Worker::report_rate_progress() { << "% of clients started" << std::endl; } +void Worker::set_qps_counts(std::vector qc) { qps_counts = qc; } + namespace { // Returns percentage of number of samples within mean +/- sd. double within_sd(const std::vector &samples, double mean, double sd) { @@ -1905,6 +1959,9 @@ Options: Specifies the time period before starting the actual measurements, in case of timing-based benchmarking. Needs to provided along with -D option. + --qps= + Specifies the query-per-second for benchmarking. Needs + to provided alone with -D option. -T, --connection-active-timeout= Specifies the maximum time that h2load is willing to keep a connection open, regardless of the activity on @@ -2037,6 +2094,7 @@ int main(int argc, char **argv) { {"encoder-header-table-size", required_argument, &flag, 8}, {"warm-up-time", required_argument, &flag, 9}, {"log-file", required_argument, &flag, 10}, + {"qps", required_argument, &flag, 11}, {nullptr, 0, nullptr, 0}}; int option_index = 0; auto c = getopt_long(argc, argv, @@ -2264,6 +2322,14 @@ int main(int argc, char **argv) { // --log-file logfile = optarg; break; + case 11: + // --qps + config.qps = strtoul(optarg, nullptr, 10); + if (config.qps == 0) { + std::cerr << "--qps: the qps must be positive" << std::endl; + exit(EXIT_FAILURE); + } + break; } break; default: @@ -2354,6 +2420,11 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } + if (config.is_qps_mode() && !config.is_timing_based_mode()) { + std::cerr << "--qps needs to provided alone with -D option." << std::endl; + exit(EXIT_FAILURE); + } + if (config.nreqs == 0 && !config.is_timing_based_mode()) { std::cerr << "-n: the number of requests must be strictly greater than 0 " "if timing-based test is not being run." @@ -2644,6 +2715,19 @@ int main(int argc, char **argv) { workers.push_back(create_worker(i, ssl_ctx, nreqs, nclients, rate, max_samples_per_thread)); auto &worker = workers.back(); + + if (config.is_qps_mode()) { + size_t nqps = config.qps / config.nthreads; + if (i < config.qps % config.nthreads) + ++nqps; + // distribute request among qps update period + std::vector qps_counts(qps_update_per_second, 0); + for (size_t q = 0; q < nqps; q++) { + qps_counts[std::rand() % qps_counts.size()]++; + } + worker->set_qps_counts(qps_counts); + } + futures.push_back( std::async(std::launch::async, [&worker, &mu, &cv, &ready]() { { diff --git a/src/h2load.h b/src/h2load.h index a5de4613..6d9efbdc 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -37,6 +37,7 @@ #include #include +#include #include #include #include @@ -113,12 +114,15 @@ struct Config { // preference. std::vector npn_list; + size_t qps; + Config(); ~Config(); bool is_rate_mode() const; bool is_timing_based_mode() const; bool has_base_uri() const; + bool is_qps_mode() const; }; struct RequestStat { @@ -273,6 +277,19 @@ struct Worker { ev_timer duration_watcher; ev_timer warmup_watcher; + // number of requests to send in qps mode + size_t qps_left; + // periodic timer for updating 'qps_left' + ev_periodic qps_watcher; + + // clients blocked due to qps limit + std::queue clients_blocked_due_to_qps; + + // we randomly distribute qps request among each second for smoother request + // flow especially when --qps is small + size_t qps_count_index; + std::vector qps_counts; + Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients, size_t rate, size_t max_samples, Config *config); ~Worker(); @@ -286,6 +303,8 @@ struct Worker { void stop_all_clients(); // This function frees a client from the list of clients for this Worker. void free_client(Client *); + + void set_qps_counts(std::vector qps_counts); }; struct Stream {