* Adding timing-sensitive load test option in h2load.

* more checks added for parameters

* A worker thread can control its clients' warmup and main duration.

* Changed warmup to an enum variable.

* removed unnecessary call to ev_timer_stop

* assertion is done before starting main measurement phase

* phase variable is implemented only inside the Worker class

* enum to enum class

* else indentation corrected

* check added for timing-based test when duration CB is called explicitly

* New argument is introduced for timing-based benchmarking.

* styling corrections

* duration watcher initialization is pushed back into warmup timeout

* Warmup and Duration timer is moved to Worker instead of clients. Now both timers and phase belongs to the Workers.

* some client functions are modified to return if it's not main_duration phase. client is not destructed but sessions are terminated

* outputs are adjusted for thread.

* Needed to check if a session exist before terminating

* formatting

* more formatting

* formatting
This commit is contained in:
Soham Sinha 2017-07-28 17:08:20 -04:00 committed by GitHub
parent cc289972fc
commit 3c43e00d8a
2 changed files with 269 additions and 59 deletions

View File

@ -90,6 +90,8 @@ Config::Config()
connection_window_bits(30),
rate(0),
rate_period(1.0),
duration(0.0),
warm_up_time(0.0),
conn_active_timeout(0.),
conn_inactivity_timeout(0.),
no_tls_proto(PROTO_HTTP2),
@ -118,6 +120,7 @@ Config::~Config() {
}
bool Config::is_rate_mode() const { return (this->rate != 0); }
bool Config::is_timing_based_mode() const { return (this->duration > 0); }
bool Config::has_base_uri() const { return (!this->base_uri.empty()); }
Config config;
@ -225,6 +228,11 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto nclients_per_second = worker->rate;
auto conns_remaining = worker->nclients - worker->nconns_made;
auto nclients = std::min(nclients_per_second, conns_remaining);
if (worker->config->is_timing_based_mode() > 0) {
worker->current_phase = Phase::INITIAL_IDLE;
} else {
worker->current_phase = Phase::MAIN_DURATION;
}
for (size_t i = 0; i < nclients; ++i) {
auto req_todo = worker->nreqs_per_client;
@ -236,6 +244,7 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) {
make_unique<Client>(worker->next_client_id++, worker, req_todo);
++worker->nconns_made;
worker->clients.push_back(client.get());
if (client->connect() != 0) {
std::cerr << "client could not connect to host" << std::endl;
@ -251,6 +260,60 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) {
}
} // namespace
namespace {
// Called when the duration for infinite number of requests are over
void duration_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto worker = static_cast<Worker *>(w->data);
for (auto client: worker->clients) {
client->req_todo = client->req_done; // there was no finite "req_todo"
worker->stats.req_todo += client->req_todo;
client->req_inflight = 0;
client->req_left = 0;
}
worker->current_phase = Phase::DURATION_OVER;
std::cout << "Main benchmark duration is over for thread #" << worker->id
<< ". Stopping all clients." << std::endl;
worker->stop_all_clients();
std::cout << "Stopped all clients for thread #" << worker->id << std::endl;
}
} // namespace
namespace {
// Called when the warmup duration for infinite number of requests are over
void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto worker = static_cast<Worker *>(w->data);
std::cout << "Warm-up phase is over for thread #" << worker->id << "."
<< std::endl;
std::cout << "Main benchmark duration is started for thread #" << worker->id
<< "." << std::endl;
assert(worker->stats.req_started == 0);
assert(worker->stats.req_done == 0);
for (auto client : worker->clients) {
assert(client->req_todo == 0);
assert(client->req_left == 1);
assert(client->req_inflight == 0);
assert(client->req_started == 0);
assert(client->req_done == 0);
client->record_client_start_time();
client->clear_connect_times();
client->record_connect_start_time();
}
worker->current_phase = Phase::MAIN_DURATION;
ev_timer_init(&worker->duration_watcher, duration_timeout_cb,
worker->config->duration, 0.);
ev_timer_start(worker->loop, &worker->duration_watcher);
}
} // namespace
namespace {
// Called when an a connection has been inactive for a set period of time
// or a fixed amount of time after all requests have been made on a
@ -336,6 +399,11 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
fd(-1),
new_connection_requested(false),
final(false) {
if (req_todo == 0) { // this means infinite number of requests are to be made
// This ensures that number of requests are unbounded
// Just a positive number is fine, we chose the first positive number
req_left = 1;
}
ev_io_init(&wev, writecb, 0, EV_WRITE);
ev_io_init(&rev, readcb, 0, EV_READ);
@ -407,9 +475,17 @@ int Client::make_socket(addrinfo *addr) {
int Client::connect() {
int rv;
record_client_start_time();
clear_connect_times();
record_connect_start_time();
if (!worker->config->is_timing_based_mode()
|| worker->current_phase == Phase::MAIN_DURATION) {
record_client_start_time();
clear_connect_times();
record_connect_start_time();
} else if (worker->current_phase == Phase::INITIAL_IDLE) {
worker->current_phase = Phase::WARM_UP;
std::cout << "Warm-up started for thread #" << worker->id
<< "." << std::endl;
ev_timer_start(worker->loop, &worker->warmup_watcher);
}
if (worker->config->conn_inactivity_timeout > 0.) {
ev_timer_again(worker->loop, &conn_inactivity_watcher);
@ -467,13 +543,17 @@ int Client::try_again_or_fail() {
if (new_connection_requested) {
new_connection_requested = false;
if (req_left) {
// At the moment, we don't have a facility to re-start request
// already in in-flight. Make them fail.
worker->stats.req_failed += req_inflight;
worker->stats.req_error += req_inflight;
req_inflight = 0;
if (req_left) {
if (worker->current_phase == Phase::MAIN_DURATION) {
// At the moment, we don't have a facility to re-start request
// already in in-flight. Make them fail.
worker->stats.req_failed += req_inflight;
worker->stats.req_error += req_inflight;
req_inflight = 0;
}
// Keep using current address
if (connect() == 0) {
@ -529,11 +609,16 @@ int Client::submit_request() {
return -1;
}
if (worker->current_phase != Phase::MAIN_DURATION) {
return 0;
}
++worker->stats.req_started;
--req_left;
++req_started;
++req_inflight;
if (!worker->config->is_timing_based_mode()) {
--req_left;
}
// if an active timeout is set and this is the last request to be submitted
// on this connection, start the active timeout.
if (worker->config->conn_active_timeout > 0. && req_left == 0) {
@ -544,6 +629,10 @@ int Client::submit_request() {
}
void Client::process_timedout_streams() {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
for (auto &p : streams) {
auto &req_stat = p.second.req_stat;
if (!req_stat.completed) {
@ -557,6 +646,10 @@ void Client::process_timedout_streams() {
}
void Client::process_abandoned_streams() {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
auto req_abandoned = req_inflight + req_left;
worker->stats.req_failed += req_abandoned;
@ -567,6 +660,10 @@ void Client::process_abandoned_streams() {
}
void Client::process_request_failure() {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
worker->stats.req_failed += req_left;
worker->stats.req_error += req_left;
@ -575,6 +672,7 @@ void Client::process_request_failure() {
if (req_inflight == 0) {
terminate_session();
}
std::cout << "Process Request Failure:" << worker->stats.req_failed << std::endl;
}
namespace {
@ -648,6 +746,9 @@ void Client::on_request(int32_t stream_id) { streams[stream_id] = Stream(); }
void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen) {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
auto itr = streams.find(stream_id);
if (itr == std::end(streams)) {
return;
@ -668,7 +769,7 @@ void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen,
break;
}
}
if (status >= 200 && status < 300) {
++worker->stats.status[2];
stream.status_success = 1;
@ -685,6 +786,10 @@ void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen,
}
void Client::on_status_code(int32_t stream_id, uint16_t status) {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
auto itr = streams.find(stream_id);
if (itr == std::end(streams)) {
return;
@ -706,40 +811,42 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) {
}
void Client::on_stream_close(int32_t stream_id, bool success, bool final) {
++req_done;
--req_inflight;
if (worker->current_phase == Phase::MAIN_DURATION) {
++req_done;
if (req_inflight > 0) {
--req_inflight;
}
auto req_stat = get_req_stat(stream_id);
if (!req_stat) {
return;
}
auto req_stat = get_req_stat(stream_id);
if (!req_stat) {
return;
}
req_stat->stream_close_time = std::chrono::steady_clock::now();
if (success) {
req_stat->completed = true;
++worker->stats.req_success;
++cstat.req_success;
if (streams[stream_id].status_success == 1) {
++worker->stats.req_status_success;
} else {
++worker->stats.req_failed;
}
req_stat->stream_close_time = std::chrono::steady_clock::now();
if (success) {
req_stat->completed = true;
++worker->stats.req_success;
++cstat.req_success;
if (sampling_should_pick(worker->request_times_smp)) {
sampling_advance_point(worker->request_times_smp);
worker->sample_req_stat(req_stat);
}
if (streams[stream_id].status_success == 1) {
++worker->stats.req_status_success;
// Count up in successful cases only
++worker->request_times_smp.n;
} else {
++worker->stats.req_failed;
++worker->stats.req_error;
}
if (sampling_should_pick(worker->request_times_smp)) {
sampling_advance_point(worker->request_times_smp);
worker->sample_req_stat(req_stat);
}
// Count up in successful cases only
++worker->request_times_smp.n;
} else {
++worker->stats.req_failed;
++worker->stats.req_error;
++worker->stats.req_done;
}
++worker->stats.req_done;
worker->report_progress();
streams.erase(stream_id);
if (req_left == 0 && req_inflight == 0) {
@ -906,7 +1013,9 @@ int Client::on_read(const uint8_t *data, size_t len) {
if (rv != 0) {
return -1;
}
worker->stats.bytes_total += len;
if (worker->current_phase == Phase::MAIN_DURATION) {
worker->stats.bytes_total += len;
}
signal_write();
return 0;
}
@ -1136,7 +1245,7 @@ void Client::record_client_start_time() {
if (recorded(cstat.client_start_time)) {
return;
}
cstat.client_start_time = std::chrono::steady_clock::now();
}
@ -1176,12 +1285,13 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
rate(rate),
max_samples(max_samples),
next_client_id(0) {
if (!config->is_rate_mode()) {
if (!config->is_rate_mode() && !config->is_timing_based_mode()) {
progress_interval = std::max(static_cast<size_t>(1), req_todo / 10);
} else {
progress_interval = std::max(static_cast<size_t>(1), nclients / 10);
}
// Below timeout is not needed in case of timing-based benchmarking
// create timer that will go off every rate_period
ev_timer_init(&timeout_watcher, rate_period_timeout_w_cb, 0.,
config->rate_period);
@ -1192,6 +1302,14 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
sampling_init(request_times_smp, req_todo, max_samples);
sampling_init(client_smp, nclients, max_samples);
if (config->is_timing_based_mode()) {
duration_watcher.data = this;
ev_timer_init(&warmup_watcher, warmup_timeout_cb,
config->warm_up_time, 0.);
warmup_watcher.data = this;
}
}
Worker::~Worker() {
@ -1199,14 +1317,23 @@ Worker::~Worker() {
ev_loop_destroy(loop);
}
void Worker::stop_all_clients() {
for (auto client : clients) {
if (client->session) {
client->terminate_session();
}
}
}
void Worker::run() {
if (!config->is_rate_mode()) {
if (!config->is_rate_mode() && !config->is_timing_based_mode()) {
for (size_t i = 0; i < nclients; ++i) {
auto req_todo = nreqs_per_client;
if (nreqs_rem > 0) {
++req_todo;
--nreqs_rem;
}
auto client = make_unique<Client>(next_client_id++, this, req_todo);
if (client->connect() != 0) {
std::cerr << "client could not connect to host" << std::endl;
@ -1215,11 +1342,14 @@ void Worker::run() {
client.release();
}
}
} else {
} else if (config->is_rate_mode()){
ev_timer_again(loop, &timeout_watcher);
// call callback so that we don't waste the first rate_period
rate_period_timeout_w_cb(loop, &timeout_watcher, 0);
} else {
// call the callback to start for one single time
rate_period_timeout_w_cb(loop, &timeout_watcher, 0);
}
ev_run(loop, 0);
}
@ -1235,7 +1365,8 @@ void Worker::sample_client_stat(ClientStat *cstat) {
}
void Worker::report_progress() {
if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval) {
if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval
|| config->is_timing_based_mode()) {
return;
}
@ -1581,12 +1712,27 @@ std::unique_ptr<Worker> create_worker(uint32_t id, SSL_CTX *ssl_ctx,
<< util::duration_str(config.rate_period) << " ";
}
std::cout << "spawning thread #" << id << ": " << nclients
<< " total client(s). " << rate_report.str() << nreqs
<< " total requests" << std::endl;
if (config.is_timing_based_mode()) {
std::cout << "spawning thread #" << id << ": " << nclients
<< " total client(s). Timing-based test with "
<< config.warm_up_time << "s of warm-up time and "
<< config.duration << "s of main duration for measurements."
<< std::endl;
} else {
std::cout << "spawning thread #" << id << ": " << nclients
<< " total client(s). " << rate_report.str() << nreqs
<< " total requests" << std::endl;
}
return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, rate, max_samples,
&config);
if (config.is_rate_mode()) {
return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, rate, max_samples,
&config);
} else {
// Here rate is same as client because the rate_timeout callback
// will be called only once
return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, nclients, max_samples,
&config);
}
}
} // namespace
@ -1737,6 +1883,13 @@ Options:
length of the period in time. This option is ignored if
the rate option is not used. The default value for this
option is 1s.
-D, --duration=<N>
Specifies the main duration for the measurements in case
of timing-based benchmarking.
--warm-up-time=<DURATION>
Specifies the time period before starting the actual
measurements, in case of timing-based benchmarking.
Needs to provided along with -D option.
-T, --connection-active-timeout=<DURATION>
Specifies the maximum time that h2load is willing to
keep a connection open, regardless of the activity on
@ -1850,6 +2003,7 @@ int main(int argc, char **argv) {
{"rate", required_argument, nullptr, 'r'},
{"connection-active-timeout", required_argument, nullptr, 'T'},
{"connection-inactivity-timeout", required_argument, nullptr, 'N'},
{"duration", required_argument, &flag, 'D'},
{"timing-script-file", required_argument, &flag, 3},
{"base-uri", required_argument, nullptr, 'B'},
{"npn-list", required_argument, &flag, 4},
@ -1857,10 +2011,11 @@ int main(int argc, char **argv) {
{"h1", no_argument, &flag, 6},
{"header-table-size", required_argument, &flag, 7},
{"encoder-header-table-size", required_argument, &flag, 8},
{"warm-up-time", required_argument, &flag, 9},
{nullptr, 0, nullptr, 0}};
int option_index = 0;
auto c =
getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:B:", long_options,
getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:D:B:", long_options,
&option_index);
if (c == -1) {
break;
@ -2016,6 +2171,14 @@ int main(int argc, char **argv) {
config.base_uri = arg.str();
break;
}
case 'D':
config.duration = strtoul(optarg, nullptr, 10);
if (config.duration == 0) {
std::cerr << "-D: the main duration for timing-based benchmarking "
<< "must be positive." << std::endl;
exit(EXIT_FAILURE);
}
break;
case 'v':
config.verbose = true;
break;
@ -2072,6 +2235,14 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
break;
case 9:
// --warm-up-time
config.warm_up_time = util::parse_duration_with_unit(optarg);
if (!std::isfinite(config.warm_up_time)) {
std::cerr << "--warm-up-time: value error " << optarg << std::endl;
exit(EXIT_FAILURE);
}
break;
}
break;
default:
@ -2157,8 +2328,9 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
if (config.nreqs == 0) {
std::cerr << "-n: the number of requests must be strictly greater than 0."
if (config.nreqs == 0 && !config.is_timing_based_mode()) {
std::cerr << "-n: the number of requests must be strictly greater than 0,"
<< "timing-based test is not being run."
<< std::endl;
exit(EXIT_FAILURE);
}
@ -2182,7 +2354,7 @@ int main(int argc, char **argv) {
// With timing script, we don't distribute config.nreqs to each
// client or thread.
if (!config.timing_script && config.nreqs < config.nclients) {
if (!config.timing_script && config.nreqs < config.nclients && !config.is_timing_based_mode()) {
std::cerr << "-n, -c: the number of requests must be greater than or "
<< "equal to the clients." << std::endl;
exit(EXIT_FAILURE);
@ -2190,11 +2362,20 @@ int main(int argc, char **argv) {
if (config.nclients < config.nthreads) {
std::cerr << "-c, -t: the number of clients must be greater than or equal "
"to the number of threads."
<< "to the number of threads."
<< std::endl;
exit(EXIT_FAILURE);
}
if (config.is_timing_based_mode()) {
if (config.nreqs != 0) {
std::cerr << "-n: the number of requests needs to be zero (0) for timing-"
<< "based test. Default value is 1."
<< std::endl;
exit(EXIT_FAILURE);
}
}
if (config.is_rate_mode()) {
if (config.rate < config.nthreads) {
std::cerr << "-r, -t: the connection rate must be greater than or equal "
@ -2528,7 +2709,7 @@ int main(int argc, char **argv) {
// Requests which have not been issued due to connection errors, are
// counted towards req_failed and req_error.
auto req_not_issued =
stats.req_todo - stats.req_status_success - stats.req_failed;
(stats.req_todo - stats.req_status_success - stats.req_failed);
stats.req_failed += req_not_issued;
stats.req_error += req_not_issued;
@ -2539,10 +2720,16 @@ int main(int argc, char **argv) {
double rps = 0;
int64_t bps = 0;
if (duration.count() > 0) {
auto secd = std::chrono::duration_cast<
std::chrono::duration<double, std::chrono::seconds::period>>(duration);
rps = stats.req_success / secd.count();
bps = stats.bytes_total / secd.count();
if (config.is_timing_based_mode()) {
// we only want to consider the main duration if warm-up is given
rps = stats.req_success / config.duration;
bps = stats.bytes_total / config.duration;
} else {
auto secd = std::chrono::duration_cast<
std::chrono::duration<double, std::chrono::seconds::period>>(duration);
rps = stats.req_success / secd.count();
bps = stats.bytes_total / secd.count();
}
}
double header_space_savings = 0.;

View File

@ -85,6 +85,10 @@ struct Config {
// rate at which connections should be made
size_t rate;
ev_tstamp rate_period;
// amount of time for main measurements in timing-based test
ev_tstamp duration;
// amount of time to wait before starting measurements in timing-based test
ev_tstamp warm_up_time;
// amount of time to wait for activity on a given connection
ev_tstamp conn_active_timeout;
// amount of time to wait after the last request is made on a connection
@ -118,6 +122,7 @@ struct Config {
~Config();
bool is_rate_mode() const;
bool is_timing_based_mode() const;
bool has_base_uri() const;
};
@ -215,6 +220,15 @@ struct Stats {
enum ClientState { CLIENT_IDLE, CLIENT_CONNECTED };
// This type tells whether the client is in warmup phase or not or is over
enum class Phase {
INITIAL_IDLE, // Initial idle state before warm-up phase
WARM_UP, // Warm up phase when no measurements are done
MAIN_DURATION, // Main measurement phase; if timing-based
// test is not run, this is the default phase
DURATION_OVER // This phase occurs after the measurements are over
};
struct Client;
// We use systematic sampling method
@ -253,6 +267,13 @@ struct Worker {
ev_timer timeout_watcher;
// The next client ID this worker assigns
uint32_t next_client_id;
// Keeps track of the current phase (for timing-based experiment) for the worker
Phase current_phase;
// We need to keep track of the clients in order to stop them when needed
std::vector<Client*> clients;
// This is only active when there is not a bounded number of requests specified
ev_timer duration_watcher;
ev_timer warmup_watcher;
Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients,
size_t rate, size_t max_samples, Config *config);
@ -263,6 +284,8 @@ struct Worker {
void sample_client_stat(ClientStat *cstat);
void report_progress();
void report_rate_progress();
// This function calls the destructors of all the clients.
void stop_all_clients();
};
struct Stream {