diff --git a/src/h2load.cc b/src/h2load.cc index 8d72e3ba..f57c3d32 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -90,6 +90,8 @@ Config::Config() connection_window_bits(30), rate(0), rate_period(1.0), + duration(0.0), + warm_up_time(0.0), conn_active_timeout(0.), conn_inactivity_timeout(0.), no_tls_proto(PROTO_HTTP2), @@ -118,6 +120,7 @@ Config::~Config() { } bool Config::is_rate_mode() const { return (this->rate != 0); } +bool Config::is_timing_based_mode() const { return (this->duration > 0); } bool Config::has_base_uri() const { return (!this->base_uri.empty()); } Config config; @@ -169,6 +172,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { rv = client->connect(); if (rv != 0) { client->fail(); + client->worker->free_client(client); delete client; return; } @@ -176,6 +180,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { } if (rv != 0) { client->fail(); + client->worker->free_client(client); delete client; } } @@ -189,6 +194,7 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { if (client->try_again_or_fail() == 0) { return; } + client->worker->free_client(client); delete client; return; } @@ -220,14 +226,69 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) { std::cerr << "client could not connect to host" << std::endl; client->fail(); } else { - client.release(); + if (worker->config->is_timing_based_mode()) { + worker->clients.push_back(client.release()); + } else { + client.release(); + } } worker->report_rate_progress(); } - if (worker->nconns_made >= worker->nclients) { - ev_timer_stop(worker->loop, w); + if (!worker->config->is_timing_based_mode()) { + if (worker->nconns_made >= worker->nclients) { + ev_timer_stop(worker->loop, w); + } + } else { + // To check whether all created clients are pushed correctly + assert(worker->nclients == worker->clients.size()); } } +} // namespace + +namespace { +// Called when the duration for infinite number of requests are over +void duration_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto worker = static_cast(w->data); + + worker->current_phase = Phase::DURATION_OVER; + + std::cout << "Main benchmark duration is over for thread #" << worker->id + << ". Stopping all clients." << std::endl; + worker->stop_all_clients(); + std::cout << "Stopped all clients for thread #" << worker->id << std::endl; +} +} // namespace + +namespace { +// Called when the warmup duration for infinite number of requests are over +void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto worker = static_cast(w->data); + + std::cout << "Warm-up phase is over for thread #" << worker->id << "." + << std::endl; + std::cout << "Main benchmark duration is started for thread #" << worker->id + << "." << std::endl; + assert(worker->stats.req_started == 0); + assert(worker->stats.req_done == 0); + + for (auto client : worker->clients) { + if (client) { + assert(client->req_todo == 0); + assert(client->req_left == 1); + assert(client->req_inflight == 0); + assert(client->req_started == 0); + assert(client->req_done == 0); + + client->record_client_start_time(); + client->clear_connect_times(); + client->record_connect_start_time(); + } + } + + worker->current_phase = Phase::MAIN_DURATION; + + ev_timer_start(worker->loop, &worker->duration_watcher); +} } // namespace namespace { @@ -315,6 +376,11 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo) fd(-1), new_connection_requested(false), final(false) { + if (req_todo == 0) { // this means infinite number of requests are to be made + // This ensures that number of requests are unbounded + // Just a positive number is fine, we chose the first positive number + req_left = 1; + } ev_io_init(&wev, writecb, 0, EV_WRITE); ev_io_init(&rev, readcb, 0, EV_READ); @@ -383,9 +449,17 @@ int Client::make_socket(addrinfo *addr) { int Client::connect() { int rv; - record_client_start_time(); - clear_connect_times(); - record_connect_start_time(); + if (!worker->config->is_timing_based_mode() + || worker->current_phase == Phase::MAIN_DURATION) { + record_client_start_time(); + clear_connect_times(); + record_connect_start_time(); + } else if (worker->current_phase == Phase::INITIAL_IDLE) { + worker->current_phase = Phase::WARM_UP; + std::cout << "Warm-up started for thread #" << worker->id + << "." << std::endl; + ev_timer_start(worker->loop, &worker->warmup_watcher); + } if (worker->config->conn_inactivity_timeout > 0.) { ev_timer_again(worker->loop, &conn_inactivity_watcher); @@ -443,13 +517,17 @@ int Client::try_again_or_fail() { if (new_connection_requested) { new_connection_requested = false; - if (req_left) { - // At the moment, we don't have a facility to re-start request - // already in in-flight. Make them fail. - worker->stats.req_failed += req_inflight; - worker->stats.req_error += req_inflight; - req_inflight = 0; + if (req_left) { + + if (worker->current_phase == Phase::MAIN_DURATION) { + // At the moment, we don't have a facility to re-start request + // already in in-flight. Make them fail. + worker->stats.req_failed += req_inflight; + worker->stats.req_error += req_inflight; + + req_inflight = 0; + } // Keep using current address if (connect() == 0) { @@ -505,11 +583,16 @@ int Client::submit_request() { return -1; } + if (worker->current_phase != Phase::MAIN_DURATION) { + return 0; + } + ++worker->stats.req_started; - --req_left; ++req_started; ++req_inflight; - + if (!worker->config->is_timing_based_mode()) { + --req_left; + } // if an active timeout is set and this is the last request to be submitted // on this connection, start the active timeout. if (worker->config->conn_active_timeout > 0. && req_left == 0) { @@ -520,6 +603,10 @@ int Client::submit_request() { } void Client::process_timedout_streams() { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } + for (auto &p : streams) { auto &req_stat = p.second.req_stat; if (!req_stat.completed) { @@ -533,6 +620,10 @@ void Client::process_timedout_streams() { } void Client::process_abandoned_streams() { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } + auto req_abandoned = req_inflight + req_left; worker->stats.req_failed += req_abandoned; @@ -543,6 +634,10 @@ void Client::process_abandoned_streams() { } void Client::process_request_failure() { + if (worker->current_phase != Phase::MAIN_DURATION) { + return; + } + worker->stats.req_failed += req_left; worker->stats.req_error += req_left; @@ -551,6 +646,7 @@ void Client::process_request_failure() { if (req_inflight == 0) { terminate_session(); } + std::cout << "Process Request Failure:" << worker->stats.req_failed << std::endl; } namespace { @@ -629,6 +725,15 @@ void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen, return; } auto &stream = (*itr).second; + + if (worker->current_phase != Phase::MAIN_DURATION) { + // If the stream is for warm-up phase, then mark as a success + // But we do not update the count for 2xx, 3xx, etc status codes + // Same has been done in on_status_code function + stream.status_success = 1; + return; + } + if (stream.status_success == -1 && namelen == 7 && util::streq_l(":status", name, namelen)) { int status = 0; @@ -644,7 +749,7 @@ void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen, break; } } - + if (status >= 200 && status < 300) { ++worker->stats.status[2]; stream.status_success = 1; @@ -667,6 +772,11 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) { } auto &stream = (*itr).second; + if (worker->current_phase != Phase::MAIN_DURATION) { + stream.status_success = 1; + return; + } + if (status >= 200 && status < 300) { ++worker->stats.status[2]; stream.status_success = 1; @@ -682,37 +792,44 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) { } void Client::on_stream_close(int32_t stream_id, bool success, bool final) { - ++req_done; - --req_inflight; - - auto req_stat = get_req_stat(stream_id); - if (!req_stat) { - return; - } - - req_stat->stream_close_time = std::chrono::steady_clock::now(); - if (success) { - req_stat->completed = true; - ++worker->stats.req_success; - ++cstat.req_success; - - if (streams[stream_id].status_success == 1) { - ++worker->stats.req_status_success; - } else { - ++worker->stats.req_failed; + if (worker->current_phase == Phase::MAIN_DURATION) { + if (req_inflight > 0) { + --req_inflight; + } + auto req_stat = get_req_stat(stream_id); + if (!req_stat) { + return; } - worker->sample_req_stat(req_stat); + req_stat->stream_close_time = std::chrono::steady_clock::now(); + if (success) { + req_stat->completed = true; + ++worker->stats.req_success; + ++cstat.req_success; - // Count up in successful cases only - ++worker->request_times_smp.n; - } else { - ++worker->stats.req_failed; - ++worker->stats.req_error; + if (streams[stream_id].status_success == 1) { + ++worker->stats.req_status_success; + } else { + ++worker->stats.req_failed; + } + + if (sampling_should_pick(worker->request_times_smp)) { + sampling_advance_point(worker->request_times_smp); + worker->sample_req_stat(req_stat); + } + + // Count up in successful cases only + ++worker->request_times_smp.n; + } else { + ++worker->stats.req_failed; + ++worker->stats.req_error; + } + // To avoid overflow error + assert(worker->stats.req_done <= worker->max_samples); + ++worker->stats.req_done; + ++req_done; } - ++worker->stats.req_done; - worker->report_progress(); streams.erase(stream_id); if (req_left == 0 && req_inflight == 0) { @@ -838,7 +955,9 @@ int Client::connection_made() { record_connect_time(); if (!config.timing_script) { - auto nreq = std::min(req_left, session->max_concurrent_streams()); + auto nreq = config.is_timing_based_mode() + ? std::max(req_left, session->max_concurrent_streams()) + : std::min(req_left, session->max_concurrent_streams()); for (; nreq > 0; --nreq) { if (submit_request() != 0) { process_request_failure(); @@ -879,7 +998,9 @@ int Client::on_read(const uint8_t *data, size_t len) { if (rv != 0) { return -1; } - worker->stats.bytes_total += len; + if (worker->current_phase == Phase::MAIN_DURATION) { + worker->stats.bytes_total += len; + } signal_write(); return 0; } @@ -1109,7 +1230,7 @@ void Client::record_client_start_time() { if (recorded(cstat.client_start_time)) { return; } - + cstat.client_start_time = std::chrono::steady_clock::now(); } @@ -1149,37 +1270,78 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, rate(rate), max_samples(max_samples), next_client_id(0) { - if (!config->is_rate_mode()) { + if (!config->is_rate_mode() && !config->is_timing_based_mode()) { progress_interval = std::max(static_cast(1), req_todo / 10); } else { progress_interval = std::max(static_cast(1), nclients / 10); } + // Below timeout is not needed in case of timing-based benchmarking // create timer that will go off every rate_period ev_timer_init(&timeout_watcher, rate_period_timeout_w_cb, 0., config->rate_period); timeout_watcher.data = this; - stats.req_stats.reserve(std::min(req_todo, max_samples)); - stats.client_stats.reserve(std::min(nclients, max_samples)); + if (config->is_timing_based_mode()) { + stats.req_stats.reserve(std::max(req_todo, max_samples)); + stats.client_stats.reserve(std::max(nclients, max_samples)); + } else { + stats.req_stats.reserve(std::min(req_todo, max_samples)); + stats.client_stats.reserve(std::min(nclients, max_samples)); + } sampling_init(request_times_smp, max_samples); sampling_init(client_smp, max_samples); + + ev_timer_init(&duration_watcher, duration_timeout_cb, config->duration, 0.); + duration_watcher.data = this; + + ev_timer_init(&warmup_watcher, warmup_timeout_cb, config->warm_up_time, 0.); + warmup_watcher.data = this; + + if (config->is_timing_based_mode()) { + current_phase = Phase::INITIAL_IDLE; + } else { + current_phase = Phase::MAIN_DURATION; + } } Worker::~Worker() { ev_timer_stop(loop, &timeout_watcher); + ev_timer_stop(loop, &duration_watcher); + ev_timer_stop(loop, &warmup_watcher); ev_loop_destroy(loop); } +void Worker::stop_all_clients() { + for (auto client : clients) { + if (client && client->session) { + client->terminate_session(); + } + } +} + +void Worker::free_client(Client *deleted_client) { + for (auto &client : clients) { + if (client == deleted_client) { + client->req_todo = client->req_done; + stats.req_todo += client->req_todo; + auto index = &client - &clients[0]; + clients[index] = NULL; + return; + } + } +} + void Worker::run() { - if (!config->is_rate_mode()) { + if (!config->is_rate_mode() && !config->is_timing_based_mode()) { for (size_t i = 0; i < nclients; ++i) { auto req_todo = nreqs_per_client; if (nreqs_rem > 0) { ++req_todo; --nreqs_rem; } + auto client = make_unique(next_client_id++, this, req_todo); if (client->connect() != 0) { std::cerr << "client could not connect to host" << std::endl; @@ -1188,11 +1350,14 @@ void Worker::run() { client.release(); } } - } else { + } else if (config->is_rate_mode()){ ev_timer_again(loop, &timeout_watcher); // call callback so that we don't waste the first rate_period rate_period_timeout_w_cb(loop, &timeout_watcher, 0); + } else { + // call the callback to start for one single time + rate_period_timeout_w_cb(loop, &timeout_watcher, 0); } ev_run(loop, 0); } @@ -1222,7 +1387,8 @@ void Worker::sample_client_stat(ClientStat *cstat) { } void Worker::report_progress() { - if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval) { + if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval + || config->is_timing_based_mode()) { return; } @@ -1564,12 +1730,27 @@ std::unique_ptr create_worker(uint32_t id, SSL_CTX *ssl_ctx, << util::duration_str(config.rate_period) << " "; } - std::cout << "spawning thread #" << id << ": " << nclients - << " total client(s). " << rate_report.str() << nreqs - << " total requests" << std::endl; + if (config.is_timing_based_mode()) { + std::cout << "spawning thread #" << id << ": " << nclients + << " total client(s). Timing-based test with " + << config.warm_up_time << "s of warm-up time and " + << config.duration << "s of main duration for measurements." + << std::endl; + } else { + std::cout << "spawning thread #" << id << ": " << nclients + << " total client(s). " << rate_report.str() << nreqs + << " total requests" << std::endl; + } - return make_unique(id, ssl_ctx, nreqs, nclients, rate, max_samples, - &config); + if (config.is_rate_mode()) { + return make_unique(id, ssl_ctx, nreqs, nclients, rate, max_samples, + &config); + } else { + // Here rate is same as client because the rate_timeout callback + // will be called only once + return make_unique(id, ssl_ctx, nreqs, nclients, nclients, max_samples, + &config); + } } } // namespace @@ -1720,6 +1901,13 @@ Options: length of the period in time. This option is ignored if the rate option is not used. The default value for this option is 1s. + -D, --duration= + Specifies the main duration for the measurements in case + of timing-based benchmarking. + --warm-up-time= + Specifies the time period before starting the actual + measurements, in case of timing-based benchmarking. + Needs to provided along with -D option. -T, --connection-active-timeout= Specifies the maximum time that h2load is willing to keep a connection open, regardless of the activity on @@ -1833,6 +2021,7 @@ int main(int argc, char **argv) { {"rate", required_argument, nullptr, 'r'}, {"connection-active-timeout", required_argument, nullptr, 'T'}, {"connection-inactivity-timeout", required_argument, nullptr, 'N'}, + {"duration", required_argument, nullptr, 'D'}, {"timing-script-file", required_argument, &flag, 3}, {"base-uri", required_argument, nullptr, 'B'}, {"npn-list", required_argument, &flag, 4}, @@ -1840,10 +2029,11 @@ int main(int argc, char **argv) { {"h1", no_argument, &flag, 6}, {"header-table-size", required_argument, &flag, 7}, {"encoder-header-table-size", required_argument, &flag, 8}, + {"warm-up-time", required_argument, &flag, 9}, {nullptr, 0, nullptr, 0}}; int option_index = 0; auto c = - getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:B:", long_options, + getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:D:B:", long_options, &option_index); if (c == -1) { break; @@ -1999,6 +2189,14 @@ int main(int argc, char **argv) { config.base_uri = arg.str(); break; } + case 'D': + config.duration = strtoul(optarg, nullptr, 10); + if (config.duration == 0) { + std::cerr << "-D: the main duration for timing-based benchmarking " + << "must be positive." << std::endl; + exit(EXIT_FAILURE); + } + break; case 'v': config.verbose = true; break; @@ -2055,6 +2253,14 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } break; + case 9: + // --warm-up-time + config.warm_up_time = util::parse_duration_with_unit(optarg); + if (!std::isfinite(config.warm_up_time)) { + std::cerr << "--warm-up-time: value error " << optarg << std::endl; + exit(EXIT_FAILURE); + } + break; } break; default: @@ -2140,8 +2346,9 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } - if (config.nreqs == 0) { - std::cerr << "-n: the number of requests must be strictly greater than 0." + if (config.nreqs == 0 && !config.is_timing_based_mode()) { + std::cerr << "-n: the number of requests must be strictly greater than 0," + << "timing-based test is not being run." << std::endl; exit(EXIT_FAILURE); } @@ -2165,7 +2372,7 @@ int main(int argc, char **argv) { // With timing script, we don't distribute config.nreqs to each // client or thread. - if (!config.timing_script && config.nreqs < config.nclients) { + if (!config.timing_script && config.nreqs < config.nclients && !config.is_timing_based_mode()) { std::cerr << "-n, -c: the number of requests must be greater than or " << "equal to the clients." << std::endl; exit(EXIT_FAILURE); @@ -2173,11 +2380,20 @@ int main(int argc, char **argv) { if (config.nclients < config.nthreads) { std::cerr << "-c, -t: the number of clients must be greater than or equal " - "to the number of threads." + << "to the number of threads." << std::endl; exit(EXIT_FAILURE); } + if (config.is_timing_based_mode()) { + if (config.nreqs != 0) { + std::cerr << "-n: the number of requests needs to be zero (0) for timing-" + << "based test. Default value is 1." + << std::endl; + exit(EXIT_FAILURE); + } + } + if (config.is_rate_mode()) { if (config.rate < config.nthreads) { std::cerr << "-r, -t: the connection rate must be greater than or equal " @@ -2511,7 +2727,7 @@ int main(int argc, char **argv) { // Requests which have not been issued due to connection errors, are // counted towards req_failed and req_error. auto req_not_issued = - stats.req_todo - stats.req_status_success - stats.req_failed; + (stats.req_todo - stats.req_status_success - stats.req_failed); stats.req_failed += req_not_issued; stats.req_error += req_not_issued; @@ -2522,10 +2738,16 @@ int main(int argc, char **argv) { double rps = 0; int64_t bps = 0; if (duration.count() > 0) { - auto secd = std::chrono::duration_cast< - std::chrono::duration>(duration); - rps = stats.req_success / secd.count(); - bps = stats.bytes_total / secd.count(); + if (config.is_timing_based_mode()) { + // we only want to consider the main duration if warm-up is given + rps = stats.req_success / config.duration; + bps = stats.bytes_total / config.duration; + } else { + auto secd = std::chrono::duration_cast< + std::chrono::duration>(duration); + rps = stats.req_success / secd.count(); + bps = stats.bytes_total / secd.count(); + } } double header_space_savings = 0.; diff --git a/src/h2load.h b/src/h2load.h index cd194bfd..55b1130d 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -85,6 +85,10 @@ struct Config { // rate at which connections should be made size_t rate; ev_tstamp rate_period; + // amount of time for main measurements in timing-based test + ev_tstamp duration; + // amount of time to wait before starting measurements in timing-based test + ev_tstamp warm_up_time; // amount of time to wait for activity on a given connection ev_tstamp conn_active_timeout; // amount of time to wait after the last request is made on a connection @@ -118,6 +122,7 @@ struct Config { ~Config(); bool is_rate_mode() const; + bool is_timing_based_mode() const; bool has_base_uri() const; }; @@ -215,6 +220,15 @@ struct Stats { enum ClientState { CLIENT_IDLE, CLIENT_CONNECTED }; +// This type tells whether the client is in warmup phase or not or is over +enum class Phase { + INITIAL_IDLE, // Initial idle state before warm-up phase + WARM_UP, // Warm up phase when no measurements are done + MAIN_DURATION, // Main measurement phase; if timing-based + // test is not run, this is the default phase + DURATION_OVER // This phase occurs after the measurements are over +}; + struct Client; // We use reservoir sampling method @@ -250,6 +264,13 @@ struct Worker { ev_timer timeout_watcher; // The next client ID this worker assigns uint32_t next_client_id; + // Keeps track of the current phase (for timing-based experiment) for the worker + Phase current_phase; + // We need to keep track of the clients in order to stop them when needed + std::vector clients; + // This is only active when there is not a bounded number of requests specified + ev_timer duration_watcher; + ev_timer warmup_watcher; Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients, size_t rate, size_t max_samples, Config *config); @@ -260,6 +281,10 @@ struct Worker { void sample_client_stat(ClientStat *cstat); void report_progress(); void report_rate_progress(); + // This function calls the destructors of all the clients. + void stop_all_clients(); + // This function frees a client from the list of clients for this Worker. + void free_client(Client*); }; struct Stream {