add qps mode for h2load

This commit is contained in:
antJack 2019-08-08 19:33:11 +08:00
parent 3980678d24
commit 90fc66fa79
2 changed files with 104 additions and 1 deletions

View File

@ -116,6 +116,7 @@ Config::~Config() {
bool Config::is_rate_mode() const { return (this->rate != 0); }
bool Config::is_timing_based_mode() const { return (this->duration > 0); }
bool Config::has_base_uri() const { return (!this->base_uri.empty()); }
bool Config::is_qps_mode() const { return (this->qps != 0); }
Config config;
namespace {
@ -246,6 +247,10 @@ void duration_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker->current_phase = Phase::DURATION_OVER;
if (worker->config->is_qps_mode()) {
ev_periodic_stop(worker->loop, &worker->qps_watcher);
}
std::cout << "Main benchmark duration is over for thread #" << worker->id
<< ". Stopping all clients." << std::endl;
worker->stop_all_clients();
@ -282,6 +287,9 @@ void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
worker->current_phase = Phase::MAIN_DURATION;
ev_timer_start(worker->loop, &worker->duration_watcher);
if (worker->config->is_qps_mode()) {
ev_periodic_start(worker->loop, &worker->qps_watcher);
}
}
} // namespace
@ -577,6 +585,15 @@ void Client::disconnect() {
}
int Client::submit_request() {
if (config.is_qps_mode()) {
if (worker->qps_left == 0) {
worker->clients_blocked_due_to_qps.push(this);
return 0;
} else {
--worker->qps_left;
}
}
if (session->submit_request() != 0) {
return -1;
}
@ -1262,6 +1279,33 @@ int get_ev_loop_flags() {
}
} // namespace
namespace {
void update_worker_qpsLeft(struct ev_loop *loop, ev_periodic *w, int revents) {
auto worker = static_cast<Worker *>(w->data);
// update qpsLeft
worker->qps_left += worker->qps_counts[worker->qps_count_index];
worker->qps_count_index =
(worker->qps_count_index + 1) % worker->qps_counts.size();
// wake up clients which are blocked due to qps limit
while (worker->qps_left && !worker->clients_blocked_due_to_qps.empty()) {
Client *c = worker->clients_blocked_due_to_qps.front();
worker->clients_blocked_due_to_qps.pop();
if (c->submit_request() != 0) {
c->process_request_failure();
}
c->signal_write();
}
}
} // namespace
namespace {
// update 'qpsLeft' every 5ms
constexpr size_t qps_update_period_ms = 5;
constexpr size_t qps_update_per_second = 1000 / qps_update_period_ms;
} // namespace
Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
size_t rate, size_t max_samples, Config *config)
: stats(req_todo, nclients),
@ -1277,7 +1321,9 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
nreqs_rem(req_todo % nclients),
rate(rate),
max_samples(max_samples),
next_client_id(0) {
next_client_id(0),
qps_left(0),
qps_count_index(0) {
if (!config->is_rate_mode() && !config->is_timing_based_mode()) {
progress_interval = std::max(static_cast<size_t>(1), req_todo / 10);
} else {
@ -1307,6 +1353,12 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
ev_timer_init(&warmup_watcher, warmup_timeout_cb, config->warm_up_time, 0.);
warmup_watcher.data = this;
if (config->is_qps_mode()) {
ev_periodic_init(&qps_watcher, update_worker_qpsLeft, 0.,
(double)qps_update_period_ms / 1000.0, 0);
qps_watcher.data = this;
}
if (config->is_timing_based_mode()) {
current_phase = Phase::INITIAL_IDLE;
} else {
@ -1413,6 +1465,8 @@ void Worker::report_rate_progress() {
<< "% of clients started" << std::endl;
}
void Worker::set_qps_counts(std::vector<size_t> qc) { qps_counts = qc; }
namespace {
// Returns percentage of number of samples within mean +/- sd.
double within_sd(const std::vector<double> &samples, double mean, double sd) {
@ -1905,6 +1959,9 @@ Options:
Specifies the time period before starting the actual
measurements, in case of timing-based benchmarking.
Needs to provided along with -D option.
--qps=<QPS>
Specifies the query-per-second for benchmarking. Needs
to provided alone with -D option.
-T, --connection-active-timeout=<DURATION>
Specifies the maximum time that h2load is willing to
keep a connection open, regardless of the activity on
@ -2037,6 +2094,7 @@ int main(int argc, char **argv) {
{"encoder-header-table-size", required_argument, &flag, 8},
{"warm-up-time", required_argument, &flag, 9},
{"log-file", required_argument, &flag, 10},
{"qps", required_argument, &flag, 11},
{nullptr, 0, nullptr, 0}};
int option_index = 0;
auto c = getopt_long(argc, argv,
@ -2264,6 +2322,14 @@ int main(int argc, char **argv) {
// --log-file
logfile = optarg;
break;
case 11:
// --qps
config.qps = strtoul(optarg, nullptr, 10);
if (config.qps == 0) {
std::cerr << "--qps: the qps must be positive" << std::endl;
exit(EXIT_FAILURE);
}
break;
}
break;
default:
@ -2354,6 +2420,11 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
if (config.is_qps_mode() && !config.is_timing_based_mode()) {
std::cerr << "--qps needs to provided alone with -D option." << std::endl;
exit(EXIT_FAILURE);
}
if (config.nreqs == 0 && !config.is_timing_based_mode()) {
std::cerr << "-n: the number of requests must be strictly greater than 0 "
"if timing-based test is not being run."
@ -2644,6 +2715,19 @@ int main(int argc, char **argv) {
workers.push_back(create_worker(i, ssl_ctx, nreqs, nclients, rate,
max_samples_per_thread));
auto &worker = workers.back();
if (config.is_qps_mode()) {
size_t nqps = config.qps / config.nthreads;
if (i < config.qps % config.nthreads)
++nqps;
// distribute request among qps update period
std::vector<size_t> qps_counts(qps_update_per_second, 0);
for (size_t q = 0; q < nqps; q++) {
qps_counts[std::rand() % qps_counts.size()]++;
}
worker->set_qps_counts(qps_counts);
}
futures.push_back(
std::async(std::launch::async, [&worker, &mu, &cv, &ready]() {
{

View File

@ -37,6 +37,7 @@
#include <sys/un.h>
#include <vector>
#include <queue>
#include <string>
#include <unordered_map>
#include <memory>
@ -113,12 +114,15 @@ struct Config {
// preference.
std::vector<std::string> npn_list;
size_t qps;
Config();
~Config();
bool is_rate_mode() const;
bool is_timing_based_mode() const;
bool has_base_uri() const;
bool is_qps_mode() const;
};
struct RequestStat {
@ -273,6 +277,19 @@ struct Worker {
ev_timer duration_watcher;
ev_timer warmup_watcher;
// number of requests to send in qps mode
size_t qps_left;
// periodic timer for updating 'qps_left'
ev_periodic qps_watcher;
// clients blocked due to qps limit
std::queue<Client *> clients_blocked_due_to_qps;
// we randomly distribute qps request among each second for smoother request
// flow especially when --qps is small
size_t qps_count_index;
std::vector<size_t> qps_counts;
Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients,
size_t rate, size_t max_samples, Config *config);
~Worker();
@ -286,6 +303,8 @@ struct Worker {
void stop_all_clients();
// This function frees a client from the list of clients for this Worker.
void free_client(Client *);
void set_qps_counts(std::vector<size_t> qps_counts);
};
struct Stream {