diff --git a/src/h2load.cc b/src/h2load.cc index 492d8d9f..8840fca8 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -90,6 +90,8 @@ Config::Config() connection_window_bits(30), rate(0), rate_period(1.0), + duration(0.0), + warm_up_time(0.0), conn_active_timeout(0.), conn_inactivity_timeout(0.), no_tls_proto(PROTO_HTTP2), @@ -118,6 +120,7 @@ Config::~Config() { } bool Config::is_rate_mode() const { return (this->rate != 0); } +bool Config::is_timing_based_mode() const { return (this->duration > 0); } bool Config::has_base_uri() const { return (!this->base_uri.empty()); } Config config; @@ -225,6 +228,11 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) { auto nclients_per_second = worker->rate; auto conns_remaining = worker->nclients - worker->nconns_made; auto nclients = std::min(nclients_per_second, conns_remaining); + if (worker->config->is_timing_based_mode() > 0) { + worker->current_phase = Phase::INITIAL_IDLE; + } else { + worker->current_phase = Phase::MAIN_DURATION; + } for (size_t i = 0; i < nclients; ++i) { auto req_todo = worker->nreqs_per_client; @@ -236,6 +244,7 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) { make_unique(worker->next_client_id++, worker, req_todo); ++worker->nconns_made; + worker->clients.push_back(client.get()); if (client->connect() != 0) { std::cerr << "client could not connect to host" << std::endl; @@ -251,6 +260,60 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) { } } // namespace +namespace { +// Called when the duration for infinite number of requests are over +void duration_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto worker = static_cast(w->data); + + for (auto client: worker->clients) { + client->req_todo = client->req_done; // there was no finite "req_todo" + worker->stats.req_todo += client->req_todo; + client->req_inflight = 0; + client->req_left = 0; + } + + worker->current_phase = Phase::DURATION_OVER; + + std::cout << "Main benchmark duration is over for thread #" << worker->id + << ". Stopping all clients." << std::endl; + worker->stop_all_clients(); + std::cout << "Stopped all clients for thread #" << worker->id << std::endl; +} +} // namespace + +namespace { +// Called when the warmup duration for infinite number of requests are over +void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto worker = static_cast(w->data); + + std::cout << "Warm-up phase is over for thread #" << worker->id << "." + << std::endl; + std::cout << "Main benchmark duration is started for thread #" << worker->id + << "." << std::endl; + assert(worker->stats.req_started == 0); + assert(worker->stats.req_done == 0); + + for (auto client : worker->clients) { + assert(client->req_todo == 0); + assert(client->req_left == 1); + assert(client->req_inflight == 0); + assert(client->req_started == 0); + assert(client->req_done == 0); + + client->record_client_start_time(); + client->clear_connect_times(); + client->record_connect_start_time(); + } + + worker->current_phase = Phase::MAIN_DURATION; + + ev_timer_init(&worker->duration_watcher, duration_timeout_cb, + worker->config->duration, 0.); + + ev_timer_start(worker->loop, &worker->duration_watcher); +} +} // namespace + namespace { // Called when an a connection has been inactive for a set period of time // or a fixed amount of time after all requests have been made on a @@ -336,6 +399,11 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo) fd(-1), new_connection_requested(false), final(false) { + if (req_todo == 0) { // this means infinite number of requests are to be made + // This ensures that number of requests are unbounded + // Just a positive number is fine, we chose the first positive number + req_left = 1; + } ev_io_init(&wev, writecb, 0, EV_WRITE); ev_io_init(&rev, readcb, 0, EV_READ); @@ -407,9 +475,17 @@ int Client::make_socket(addrinfo *addr) { int Client::connect() { int rv; - record_client_start_time(); - clear_connect_times(); - record_connect_start_time(); + if (!worker->config->is_timing_based_mode() + || worker->current_phase == Phase::MAIN_DURATION) { + record_client_start_time(); + clear_connect_times(); + record_connect_start_time(); + } else if (worker->current_phase == Phase::INITIAL_IDLE) { + worker->current_phase = Phase::WARM_UP; + std::cout << "Warm-up started for thread #" << worker->id + << "." << std::endl; + ev_timer_start(worker->loop, &worker->warmup_watcher); + } if (worker->config->conn_inactivity_timeout > 0.) { ev_timer_again(worker->loop, &conn_inactivity_watcher); @@ -467,13 +543,17 @@ int Client::try_again_or_fail() { if (new_connection_requested) { new_connection_requested = false; - if (req_left) { - // At the moment, we don't have a facility to re-start request - // already in in-flight. Make them fail. - worker->stats.req_failed += req_inflight; - worker->stats.req_error += req_inflight; - req_inflight = 0; + if (req_left) { + + if (worker->current_phase == Phase::MAIN_DURATION) { + // At the moment, we don't have a facility to re-start request + // already in in-flight. Make them fail. + worker->stats.req_failed += req_inflight; + worker->stats.req_error += req_inflight; + + req_inflight = 0; + } // Keep using current address if (connect() == 0) { @@ -529,11 +609,16 @@ int Client::submit_request() { return -1; } + if (worker->current_phase != Phase::MAIN_DURATION) { + return 0; + } + ++worker->stats.req_started; - --req_left; ++req_started; ++req_inflight; - + if (!worker->config->is_timing_based_mode()) { + --req_left; + } // if an active timeout is set and this is the last request to be submitted // on this connection, start the active timeout. if (worker->config->conn_active_timeout > 0. && req_left == 0) { @@ -544,6 +629,10 @@ int Client::submit_request() { } void Client::process_timedout_streams() { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } + for (auto &p : streams) { auto &req_stat = p.second.req_stat; if (!req_stat.completed) { @@ -557,6 +646,10 @@ void Client::process_timedout_streams() { } void Client::process_abandoned_streams() { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } + auto req_abandoned = req_inflight + req_left; worker->stats.req_failed += req_abandoned; @@ -567,6 +660,10 @@ void Client::process_abandoned_streams() { } void Client::process_request_failure() { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } + worker->stats.req_failed += req_left; worker->stats.req_error += req_left; @@ -575,6 +672,7 @@ void Client::process_request_failure() { if (req_inflight == 0) { terminate_session(); } + std::cout << "Process Request Failure:" << worker->stats.req_failed << std::endl; } namespace { @@ -648,6 +746,9 @@ void Client::on_request(int32_t stream_id) { streams[stream_id] = Stream(); } void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen) { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } auto itr = streams.find(stream_id); if (itr == std::end(streams)) { return; @@ -668,7 +769,7 @@ void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen, break; } } - + if (status >= 200 && status < 300) { ++worker->stats.status[2]; stream.status_success = 1; @@ -685,6 +786,10 @@ void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen, } void Client::on_status_code(int32_t stream_id, uint16_t status) { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } + auto itr = streams.find(stream_id); if (itr == std::end(streams)) { return; @@ -706,40 +811,42 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) { } void Client::on_stream_close(int32_t stream_id, bool success, bool final) { - ++req_done; - --req_inflight; + if (worker->current_phase == Phase::MAIN_DURATION) { + ++req_done; + if (req_inflight > 0) { + --req_inflight; + } + auto req_stat = get_req_stat(stream_id); + if (!req_stat) { + return; + } - auto req_stat = get_req_stat(stream_id); - if (!req_stat) { - return; - } + req_stat->stream_close_time = std::chrono::steady_clock::now(); + if (success) { + req_stat->completed = true; + ++worker->stats.req_success; + ++cstat.req_success; + + if (streams[stream_id].status_success == 1) { + ++worker->stats.req_status_success; + } else { + ++worker->stats.req_failed; + } - req_stat->stream_close_time = std::chrono::steady_clock::now(); - if (success) { - req_stat->completed = true; - ++worker->stats.req_success; - ++cstat.req_success; + if (sampling_should_pick(worker->request_times_smp)) { + sampling_advance_point(worker->request_times_smp); + worker->sample_req_stat(req_stat); + } - if (streams[stream_id].status_success == 1) { - ++worker->stats.req_status_success; + // Count up in successful cases only + ++worker->request_times_smp.n; } else { ++worker->stats.req_failed; + ++worker->stats.req_error; } - - if (sampling_should_pick(worker->request_times_smp)) { - sampling_advance_point(worker->request_times_smp); - worker->sample_req_stat(req_stat); - } - - // Count up in successful cases only - ++worker->request_times_smp.n; - } else { - ++worker->stats.req_failed; - ++worker->stats.req_error; + ++worker->stats.req_done; } - ++worker->stats.req_done; - worker->report_progress(); streams.erase(stream_id); if (req_left == 0 && req_inflight == 0) { @@ -906,7 +1013,9 @@ int Client::on_read(const uint8_t *data, size_t len) { if (rv != 0) { return -1; } - worker->stats.bytes_total += len; + if (worker->current_phase == Phase::MAIN_DURATION) { + worker->stats.bytes_total += len; + } signal_write(); return 0; } @@ -1136,7 +1245,7 @@ void Client::record_client_start_time() { if (recorded(cstat.client_start_time)) { return; } - + cstat.client_start_time = std::chrono::steady_clock::now(); } @@ -1176,12 +1285,13 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, rate(rate), max_samples(max_samples), next_client_id(0) { - if (!config->is_rate_mode()) { + if (!config->is_rate_mode() && !config->is_timing_based_mode()) { progress_interval = std::max(static_cast(1), req_todo / 10); } else { progress_interval = std::max(static_cast(1), nclients / 10); } + // Below timeout is not needed in case of timing-based benchmarking // create timer that will go off every rate_period ev_timer_init(&timeout_watcher, rate_period_timeout_w_cb, 0., config->rate_period); @@ -1192,6 +1302,14 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, sampling_init(request_times_smp, req_todo, max_samples); sampling_init(client_smp, nclients, max_samples); + + if (config->is_timing_based_mode()) { + duration_watcher.data = this; + + ev_timer_init(&warmup_watcher, warmup_timeout_cb, + config->warm_up_time, 0.); + warmup_watcher.data = this; + } } Worker::~Worker() { @@ -1199,14 +1317,23 @@ Worker::~Worker() { ev_loop_destroy(loop); } +void Worker::stop_all_clients() { + for (auto client : clients) { + if (client->session) { + client->terminate_session(); + } + } +} + void Worker::run() { - if (!config->is_rate_mode()) { + if (!config->is_rate_mode() && !config->is_timing_based_mode()) { for (size_t i = 0; i < nclients; ++i) { auto req_todo = nreqs_per_client; if (nreqs_rem > 0) { ++req_todo; --nreqs_rem; } + auto client = make_unique(next_client_id++, this, req_todo); if (client->connect() != 0) { std::cerr << "client could not connect to host" << std::endl; @@ -1215,11 +1342,14 @@ void Worker::run() { client.release(); } } - } else { + } else if (config->is_rate_mode()){ ev_timer_again(loop, &timeout_watcher); // call callback so that we don't waste the first rate_period rate_period_timeout_w_cb(loop, &timeout_watcher, 0); + } else { + // call the callback to start for one single time + rate_period_timeout_w_cb(loop, &timeout_watcher, 0); } ev_run(loop, 0); } @@ -1235,7 +1365,8 @@ void Worker::sample_client_stat(ClientStat *cstat) { } void Worker::report_progress() { - if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval) { + if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval + || config->is_timing_based_mode()) { return; } @@ -1581,12 +1712,27 @@ std::unique_ptr create_worker(uint32_t id, SSL_CTX *ssl_ctx, << util::duration_str(config.rate_period) << " "; } - std::cout << "spawning thread #" << id << ": " << nclients - << " total client(s). " << rate_report.str() << nreqs - << " total requests" << std::endl; + if (config.is_timing_based_mode()) { + std::cout << "spawning thread #" << id << ": " << nclients + << " total client(s). Timing-based test with " + << config.warm_up_time << "s of warm-up time and " + << config.duration << "s of main duration for measurements." + << std::endl; + } else { + std::cout << "spawning thread #" << id << ": " << nclients + << " total client(s). " << rate_report.str() << nreqs + << " total requests" << std::endl; + } - return make_unique(id, ssl_ctx, nreqs, nclients, rate, max_samples, - &config); + if (config.is_rate_mode()) { + return make_unique(id, ssl_ctx, nreqs, nclients, rate, max_samples, + &config); + } else { + // Here rate is same as client because the rate_timeout callback + // will be called only once + return make_unique(id, ssl_ctx, nreqs, nclients, nclients, max_samples, + &config); + } } } // namespace @@ -1737,6 +1883,13 @@ Options: length of the period in time. This option is ignored if the rate option is not used. The default value for this option is 1s. + -D, --duration= + Specifies the main duration for the measurements in case + of timing-based benchmarking. + --warm-up-time= + Specifies the time period before starting the actual + measurements, in case of timing-based benchmarking. + Needs to provided along with -D option. -T, --connection-active-timeout= Specifies the maximum time that h2load is willing to keep a connection open, regardless of the activity on @@ -1850,6 +2003,7 @@ int main(int argc, char **argv) { {"rate", required_argument, nullptr, 'r'}, {"connection-active-timeout", required_argument, nullptr, 'T'}, {"connection-inactivity-timeout", required_argument, nullptr, 'N'}, + {"duration", required_argument, &flag, 'D'}, {"timing-script-file", required_argument, &flag, 3}, {"base-uri", required_argument, nullptr, 'B'}, {"npn-list", required_argument, &flag, 4}, @@ -1857,10 +2011,11 @@ int main(int argc, char **argv) { {"h1", no_argument, &flag, 6}, {"header-table-size", required_argument, &flag, 7}, {"encoder-header-table-size", required_argument, &flag, 8}, + {"warm-up-time", required_argument, &flag, 9}, {nullptr, 0, nullptr, 0}}; int option_index = 0; auto c = - getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:B:", long_options, + getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:D:B:", long_options, &option_index); if (c == -1) { break; @@ -2016,6 +2171,14 @@ int main(int argc, char **argv) { config.base_uri = arg.str(); break; } + case 'D': + config.duration = strtoul(optarg, nullptr, 10); + if (config.duration == 0) { + std::cerr << "-D: the main duration for timing-based benchmarking " + << "must be positive." << std::endl; + exit(EXIT_FAILURE); + } + break; case 'v': config.verbose = true; break; @@ -2072,6 +2235,14 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } break; + case 9: + // --warm-up-time + config.warm_up_time = util::parse_duration_with_unit(optarg); + if (!std::isfinite(config.warm_up_time)) { + std::cerr << "--warm-up-time: value error " << optarg << std::endl; + exit(EXIT_FAILURE); + } + break; } break; default: @@ -2157,8 +2328,9 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } - if (config.nreqs == 0) { - std::cerr << "-n: the number of requests must be strictly greater than 0." + if (config.nreqs == 0 && !config.is_timing_based_mode()) { + std::cerr << "-n: the number of requests must be strictly greater than 0," + << "timing-based test is not being run." << std::endl; exit(EXIT_FAILURE); } @@ -2182,7 +2354,7 @@ int main(int argc, char **argv) { // With timing script, we don't distribute config.nreqs to each // client or thread. - if (!config.timing_script && config.nreqs < config.nclients) { + if (!config.timing_script && config.nreqs < config.nclients && !config.is_timing_based_mode()) { std::cerr << "-n, -c: the number of requests must be greater than or " << "equal to the clients." << std::endl; exit(EXIT_FAILURE); @@ -2190,11 +2362,20 @@ int main(int argc, char **argv) { if (config.nclients < config.nthreads) { std::cerr << "-c, -t: the number of clients must be greater than or equal " - "to the number of threads." + << "to the number of threads." << std::endl; exit(EXIT_FAILURE); } + if (config.is_timing_based_mode()) { + if (config.nreqs != 0) { + std::cerr << "-n: the number of requests needs to be zero (0) for timing-" + << "based test. Default value is 1." + << std::endl; + exit(EXIT_FAILURE); + } + } + if (config.is_rate_mode()) { if (config.rate < config.nthreads) { std::cerr << "-r, -t: the connection rate must be greater than or equal " @@ -2528,7 +2709,7 @@ int main(int argc, char **argv) { // Requests which have not been issued due to connection errors, are // counted towards req_failed and req_error. auto req_not_issued = - stats.req_todo - stats.req_status_success - stats.req_failed; + (stats.req_todo - stats.req_status_success - stats.req_failed); stats.req_failed += req_not_issued; stats.req_error += req_not_issued; @@ -2539,10 +2720,16 @@ int main(int argc, char **argv) { double rps = 0; int64_t bps = 0; if (duration.count() > 0) { - auto secd = std::chrono::duration_cast< - std::chrono::duration>(duration); - rps = stats.req_success / secd.count(); - bps = stats.bytes_total / secd.count(); + if (config.is_timing_based_mode()) { + // we only want to consider the main duration if warm-up is given + rps = stats.req_success / config.duration; + bps = stats.bytes_total / config.duration; + } else { + auto secd = std::chrono::duration_cast< + std::chrono::duration>(duration); + rps = stats.req_success / secd.count(); + bps = stats.bytes_total / secd.count(); + } } double header_space_savings = 0.; diff --git a/src/h2load.h b/src/h2load.h index db50472d..077f5752 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -85,6 +85,10 @@ struct Config { // rate at which connections should be made size_t rate; ev_tstamp rate_period; + // amount of time for main measurements in timing-based test + ev_tstamp duration; + // amount of time to wait before starting measurements in timing-based test + ev_tstamp warm_up_time; // amount of time to wait for activity on a given connection ev_tstamp conn_active_timeout; // amount of time to wait after the last request is made on a connection @@ -118,6 +122,7 @@ struct Config { ~Config(); bool is_rate_mode() const; + bool is_timing_based_mode() const; bool has_base_uri() const; }; @@ -215,6 +220,15 @@ struct Stats { enum ClientState { CLIENT_IDLE, CLIENT_CONNECTED }; +// This type tells whether the client is in warmup phase or not or is over +enum class Phase { + INITIAL_IDLE, // Initial idle state before warm-up phase + WARM_UP, // Warm up phase when no measurements are done + MAIN_DURATION, // Main measurement phase; if timing-based + // test is not run, this is the default phase + DURATION_OVER // This phase occurs after the measurements are over +}; + struct Client; // We use systematic sampling method @@ -253,6 +267,13 @@ struct Worker { ev_timer timeout_watcher; // The next client ID this worker assigns uint32_t next_client_id; + // Keeps track of the current phase (for timing-based experiment) for the worker + Phase current_phase; + // We need to keep track of the clients in order to stop them when needed + std::vector clients; + // This is only active when there is not a bounded number of requests specified + ev_timer duration_watcher; + ev_timer warmup_watcher; Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients, size_t rate, size_t max_samples, Config *config); @@ -263,6 +284,8 @@ struct Worker { void sample_client_stat(ClientStat *cstat); void report_progress(); void report_rate_progress(); + // This function calls the destructors of all the clients. + void stop_all_clients(); }; struct Stream {