h2load: Distribute MAX_SAMPLES across workers

This commit is contained in:
Tatsuhiro Tsujikawa 2016-01-07 22:41:37 +09:00
parent 425c794f89
commit 027256d0b1
2 changed files with 37 additions and 22 deletions

View File

@ -102,13 +102,10 @@ constexpr size_t MAX_SAMPLES = 1000000;
} // namespace } // namespace
Stats::Stats(size_t req_todo, size_t nclients) Stats::Stats(size_t req_todo, size_t nclients)
: req_todo(0), req_started(0), req_done(0), req_success(0), : req_todo(req_todo), req_started(0), req_done(0), req_success(0),
req_status_success(0), req_failed(0), req_error(0), req_timedout(0), req_status_success(0), req_failed(0), req_error(0), req_timedout(0),
bytes_total(0), bytes_head(0), bytes_head_decomp(0), bytes_body(0), bytes_total(0), bytes_head(0), bytes_head_decomp(0), bytes_body(0),
status() { status() {}
req_stats.reserve(std::min(req_todo, MAX_SAMPLES));
client_stats.reserve(std::min(nclients, MAX_SAMPLES));
}
Stream::Stream() : req_stat{}, status_success(-1) {} Stream::Stream() : req_stat{}, status_success(-1) {}
@ -1092,13 +1089,12 @@ void Client::signal_write() { ev_io_start(worker->loop, &wev); }
void Client::try_new_connection() { new_connection_requested = true; } void Client::try_new_connection() { new_connection_requested = true; }
Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
size_t rate, Config *config) size_t rate, size_t max_samples, Config *config)
: stats(req_todo, nclients), loop(ev_loop_new(0)), ssl_ctx(ssl_ctx), : stats(req_todo, nclients), loop(ev_loop_new(0)), ssl_ctx(ssl_ctx),
config(config), id(id), tls_info_report_done(false), config(config), id(id), tls_info_report_done(false),
app_info_report_done(false), nconns_made(0), nclients(nclients), app_info_report_done(false), nconns_made(0), nclients(nclients),
nreqs_per_client(req_todo / nclients), nreqs_rem(req_todo % nclients), nreqs_per_client(req_todo / nclients), nreqs_rem(req_todo % nclients),
rate(rate), next_client_id(0) { rate(rate), max_samples(max_samples), next_client_id(0) {
stats.req_todo = req_todo;
if (!config->is_rate_mode()) { if (!config->is_rate_mode()) {
progress_interval = std::max(static_cast<size_t>(1), req_todo / 10); progress_interval = std::max(static_cast<size_t>(1), req_todo / 10);
} else { } else {
@ -1110,8 +1106,11 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
config->rate_period); config->rate_period);
timeout_watcher.data = this; timeout_watcher.data = this;
sampling_init(request_times_smp, req_todo, MAX_SAMPLES); stats.req_stats.reserve(std::min(req_todo, max_samples));
sampling_init(client_smp, nclients, MAX_SAMPLES); stats.client_stats.reserve(std::min(nclients, max_samples));
sampling_init(request_times_smp, req_todo, max_samples);
sampling_init(client_smp, nclients, max_samples);
} }
Worker::~Worker() { Worker::~Worker() {
@ -1146,12 +1145,12 @@ void Worker::run() {
void Worker::sample_req_stat(RequestStat *req_stat) { void Worker::sample_req_stat(RequestStat *req_stat) {
stats.req_stats.push_back(*req_stat); stats.req_stats.push_back(*req_stat);
assert(stats.req_stats.size() <= MAX_SAMPLES); assert(stats.req_stats.size() <= max_samples);
} }
void Worker::sample_client_stat(ClientStat *cstat) { void Worker::sample_client_stat(ClientStat *cstat) {
stats.client_stats.push_back(*cstat); stats.client_stats.push_back(*cstat);
assert(stats.client_stats.size() <= MAX_SAMPLES); assert(stats.client_stats.size() <= max_samples);
} }
void Worker::report_progress() { void Worker::report_progress() {
@ -1228,21 +1227,28 @@ namespace {
SDStats SDStats
process_time_stats(const std::vector<std::unique_ptr<Worker>> &workers) { process_time_stats(const std::vector<std::unique_ptr<Worker>> &workers) {
auto request_times_sampling = false; auto request_times_sampling = false;
auto client_times_sampling = false;
size_t nrequest_times = 0; size_t nrequest_times = 0;
size_t nclient_times = 0;
for (const auto &w : workers) { for (const auto &w : workers) {
nrequest_times += w->stats.req_stats.size(); nrequest_times += w->stats.req_stats.size();
if (w->request_times_smp.interval != 0.) { if (w->request_times_smp.interval != 0.) {
request_times_sampling = true; request_times_sampling = true;
} }
nclient_times += w->stats.client_stats.size();
if (w->client_smp.interval != 0.) {
client_times_sampling = true;
}
} }
std::vector<double> request_times; std::vector<double> request_times;
request_times.reserve(nrequest_times); request_times.reserve(nrequest_times);
std::vector<double> connect_times, ttfb_times, rps_values; std::vector<double> connect_times, ttfb_times, rps_values;
connect_times.reserve(config.nclients); connect_times.reserve(nclient_times);
ttfb_times.reserve(config.nclients); ttfb_times.reserve(nclient_times);
rps_values.reserve(config.nclients); rps_values.reserve(nclient_times);
for (const auto &w : workers) { for (const auto &w : workers) {
for (const auto &req_stat : w->stats.req_stats) { for (const auto &req_stat : w->stats.req_stats) {
@ -1287,8 +1293,9 @@ process_time_stats(const std::vector<std::unique_ptr<Worker>> &workers) {
} }
return {compute_time_stat(request_times, request_times_sampling), return {compute_time_stat(request_times, request_times_sampling),
compute_time_stat(connect_times), compute_time_stat(ttfb_times), compute_time_stat(connect_times, client_times_sampling),
compute_time_stat(rps_values)}; compute_time_stat(ttfb_times, client_times_sampling),
compute_time_stat(rps_values, client_times_sampling)};
} }
} // namespace } // namespace
@ -1466,7 +1473,7 @@ void read_script_from_file(std::istream &infile,
namespace { namespace {
std::unique_ptr<Worker> create_worker(uint32_t id, SSL_CTX *ssl_ctx, std::unique_ptr<Worker> create_worker(uint32_t id, SSL_CTX *ssl_ctx,
size_t nreqs, size_t nclients, size_t nreqs, size_t nclients,
size_t rate) { size_t rate, size_t max_samples) {
std::stringstream rate_report; std::stringstream rate_report;
if (config.is_rate_mode() && nclients > rate) { if (config.is_rate_mode() && nclients > rate) {
rate_report << "Up to " << rate << " client(s) will be created every " rate_report << "Up to " << rate << " client(s) will be created every "
@ -1477,7 +1484,8 @@ std::unique_ptr<Worker> create_worker(uint32_t id, SSL_CTX *ssl_ctx,
<< " total client(s). " << rate_report.str() << nreqs << " total client(s). " << rate_report.str() << nreqs
<< " total requests" << std::endl; << " total requests" << std::endl;
return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, rate, &config); return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, rate, max_samples,
&config);
} }
} // namespace } // namespace
@ -2183,6 +2191,9 @@ int main(int argc, char **argv) {
size_t rate_per_thread = config.rate / config.nthreads; size_t rate_per_thread = config.rate / config.nthreads;
ssize_t rate_per_thread_rem = config.rate % config.nthreads; ssize_t rate_per_thread_rem = config.rate % config.nthreads;
size_t max_samples_per_thread =
std::max(static_cast<size_t>(256), MAX_SAMPLES / config.nthreads);
std::mutex mu; std::mutex mu;
std::condition_variable cv; std::condition_variable cv;
auto ready = false; auto ready = false;
@ -2215,7 +2226,8 @@ int main(int argc, char **argv) {
} }
} }
workers.push_back(create_worker(i, ssl_ctx, nreqs, nclients, rate)); workers.push_back(create_worker(i, ssl_ctx, nreqs, nclients, rate,
max_samples_per_thread));
auto &worker = workers.back(); auto &worker = workers.back();
futures.push_back( futures.push_back(
std::async(std::launch::async, [&worker, &mu, &cv, &ready]() { std::async(std::launch::async, [&worker, &mu, &cv, &ready]() {
@ -2245,7 +2257,8 @@ int main(int argc, char **argv) {
auto nreqs = auto nreqs =
config.timing_script ? config.nreqs * config.nclients : config.nreqs; config.timing_script ? config.nreqs * config.nclients : config.nreqs;
workers.push_back(create_worker(0, ssl_ctx, nreqs, nclients, rate)); workers.push_back(
create_worker(0, ssl_ctx, nreqs, nclients, rate, MAX_SAMPLES));
auto start = std::chrono::steady_clock::now(); auto start = std::chrono::steady_clock::now();

View File

@ -237,12 +237,14 @@ struct Worker {
// at most nreqs_rem clients get an extra request // at most nreqs_rem clients get an extra request
size_t nreqs_rem; size_t nreqs_rem;
size_t rate; size_t rate;
// maximum number of samples in this worker thread
size_t max_samples;
ev_timer timeout_watcher; ev_timer timeout_watcher;
// The next client ID this worker assigns // The next client ID this worker assigns
uint32_t next_client_id; uint32_t next_client_id;
Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients, Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients,
size_t rate, Config *config); size_t rate, size_t max_samples, Config *config);
~Worker(); ~Worker();
Worker(Worker &&o) = default; Worker(Worker &&o) = default;
void run(); void run();