Merge branch 'master' of https://github.com/sohamm17/nghttp2 into sohamm17-master

This commit is contained in:
Tatsuhiro Tsujikawa 2017-08-23 19:16:40 +09:00
commit 5d9434eb09
2 changed files with 312 additions and 65 deletions

View File

@ -90,6 +90,8 @@ Config::Config()
connection_window_bits(30),
rate(0),
rate_period(1.0),
duration(0.0),
warm_up_time(0.0),
conn_active_timeout(0.),
conn_inactivity_timeout(0.),
no_tls_proto(PROTO_HTTP2),
@ -118,6 +120,7 @@ Config::~Config() {
}
bool Config::is_rate_mode() const { return (this->rate != 0); }
bool Config::is_timing_based_mode() const { return (this->duration > 0); }
bool Config::has_base_uri() const { return (!this->base_uri.empty()); }
Config config;
@ -169,6 +172,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
rv = client->connect();
if (rv != 0) {
client->fail();
client->worker->free_client(client);
delete client;
return;
}
@ -176,6 +180,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
}
if (rv != 0) {
client->fail();
client->worker->free_client(client);
delete client;
}
}
@ -189,6 +194,7 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) {
if (client->try_again_or_fail() == 0) {
return;
}
client->worker->free_client(client);
delete client;
return;
}
@ -219,14 +225,69 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) {
if (client->connect() != 0) {
std::cerr << "client could not connect to host" << std::endl;
client->fail();
} else {
if (worker->config->is_timing_based_mode()) {
worker->clients.push_back(client.release());
} else {
client.release();
}
}
worker->report_rate_progress();
}
if (!worker->config->is_timing_based_mode()) {
if (worker->nconns_made >= worker->nclients) {
ev_timer_stop(worker->loop, w);
}
} else {
// To check whether all created clients are pushed correctly
assert(worker->nclients == worker->clients.size());
}
}
} // namespace
namespace {
// Called when the duration for infinite number of requests are over
void duration_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto worker = static_cast<Worker *>(w->data);
worker->current_phase = Phase::DURATION_OVER;
std::cout << "Main benchmark duration is over for thread #" << worker->id
<< ". Stopping all clients." << std::endl;
worker->stop_all_clients();
std::cout << "Stopped all clients for thread #" << worker->id << std::endl;
}
} // namespace
namespace {
// Called when the warmup duration for infinite number of requests are over
void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto worker = static_cast<Worker *>(w->data);
std::cout << "Warm-up phase is over for thread #" << worker->id << "."
<< std::endl;
std::cout << "Main benchmark duration is started for thread #" << worker->id
<< "." << std::endl;
assert(worker->stats.req_started == 0);
assert(worker->stats.req_done == 0);
for (auto client : worker->clients) {
if (client) {
assert(client->req_todo == 0);
assert(client->req_left == 1);
assert(client->req_inflight == 0);
assert(client->req_started == 0);
assert(client->req_done == 0);
client->record_client_start_time();
client->clear_connect_times();
client->record_connect_start_time();
}
}
worker->current_phase = Phase::MAIN_DURATION;
ev_timer_start(worker->loop, &worker->duration_watcher);
}
} // namespace
@ -315,6 +376,11 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
fd(-1),
new_connection_requested(false),
final(false) {
if (req_todo == 0) { // this means infinite number of requests are to be made
// This ensures that number of requests are unbounded
// Just a positive number is fine, we chose the first positive number
req_left = 1;
}
ev_io_init(&wev, writecb, 0, EV_WRITE);
ev_io_init(&rev, readcb, 0, EV_READ);
@ -383,9 +449,17 @@ int Client::make_socket(addrinfo *addr) {
int Client::connect() {
int rv;
if (!worker->config->is_timing_based_mode()
|| worker->current_phase == Phase::MAIN_DURATION) {
record_client_start_time();
clear_connect_times();
record_connect_start_time();
} else if (worker->current_phase == Phase::INITIAL_IDLE) {
worker->current_phase = Phase::WARM_UP;
std::cout << "Warm-up started for thread #" << worker->id
<< "." << std::endl;
ev_timer_start(worker->loop, &worker->warmup_watcher);
}
if (worker->config->conn_inactivity_timeout > 0.) {
ev_timer_again(worker->loop, &conn_inactivity_watcher);
@ -443,13 +517,17 @@ int Client::try_again_or_fail() {
if (new_connection_requested) {
new_connection_requested = false;
if (req_left) {
if (worker->current_phase == Phase::MAIN_DURATION) {
// At the moment, we don't have a facility to re-start request
// already in in-flight. Make them fail.
worker->stats.req_failed += req_inflight;
worker->stats.req_error += req_inflight;
req_inflight = 0;
}
// Keep using current address
if (connect() == 0) {
@ -505,11 +583,16 @@ int Client::submit_request() {
return -1;
}
if (worker->current_phase != Phase::MAIN_DURATION) {
return 0;
}
++worker->stats.req_started;
--req_left;
++req_started;
++req_inflight;
if (!worker->config->is_timing_based_mode()) {
--req_left;
}
// if an active timeout is set and this is the last request to be submitted
// on this connection, start the active timeout.
if (worker->config->conn_active_timeout > 0. && req_left == 0) {
@ -520,6 +603,10 @@ int Client::submit_request() {
}
void Client::process_timedout_streams() {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
for (auto &p : streams) {
auto &req_stat = p.second.req_stat;
if (!req_stat.completed) {
@ -533,6 +620,10 @@ void Client::process_timedout_streams() {
}
void Client::process_abandoned_streams() {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
auto req_abandoned = req_inflight + req_left;
worker->stats.req_failed += req_abandoned;
@ -543,6 +634,10 @@ void Client::process_abandoned_streams() {
}
void Client::process_request_failure() {
if (worker->current_phase != Phase::MAIN_DURATION) {
return;
}
worker->stats.req_failed += req_left;
worker->stats.req_error += req_left;
@ -551,6 +646,7 @@ void Client::process_request_failure() {
if (req_inflight == 0) {
terminate_session();
}
std::cout << "Process Request Failure:" << worker->stats.req_failed << std::endl;
}
namespace {
@ -629,6 +725,15 @@ void Client::on_header(int32_t stream_id, const uint8_t *name, size_t namelen,
return;
}
auto &stream = (*itr).second;
if (worker->current_phase != Phase::MAIN_DURATION) {
// If the stream is for warm-up phase, then mark as a success
// But we do not update the count for 2xx, 3xx, etc status codes
// Same has been done in on_status_code function
stream.status_success = 1;
return;
}
if (stream.status_success == -1 && namelen == 7 &&
util::streq_l(":status", name, namelen)) {
int status = 0;
@ -667,6 +772,11 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) {
}
auto &stream = (*itr).second;
if (worker->current_phase != Phase::MAIN_DURATION) {
stream.status_success = 1;
return;
}
if (status >= 200 && status < 300) {
++worker->stats.status[2];
stream.status_success = 1;
@ -682,9 +792,10 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) {
}
void Client::on_stream_close(int32_t stream_id, bool success, bool final) {
++req_done;
if (worker->current_phase == Phase::MAIN_DURATION) {
if (req_inflight > 0) {
--req_inflight;
}
auto req_stat = get_req_stat(stream_id);
if (!req_stat) {
return;
@ -702,7 +813,10 @@ void Client::on_stream_close(int32_t stream_id, bool success, bool final) {
++worker->stats.req_failed;
}
if (sampling_should_pick(worker->request_times_smp)) {
sampling_advance_point(worker->request_times_smp);
worker->sample_req_stat(req_stat);
}
// Count up in successful cases only
++worker->request_times_smp.n;
@ -710,8 +824,11 @@ void Client::on_stream_close(int32_t stream_id, bool success, bool final) {
++worker->stats.req_failed;
++worker->stats.req_error;
}
// To avoid overflow error
assert(worker->stats.req_done <= worker->max_samples);
++worker->stats.req_done;
++req_done;
}
worker->report_progress();
streams.erase(stream_id);
@ -838,7 +955,9 @@ int Client::connection_made() {
record_connect_time();
if (!config.timing_script) {
auto nreq = std::min(req_left, session->max_concurrent_streams());
auto nreq = config.is_timing_based_mode()
? std::max(req_left, session->max_concurrent_streams())
: std::min(req_left, session->max_concurrent_streams());
for (; nreq > 0; --nreq) {
if (submit_request() != 0) {
process_request_failure();
@ -879,7 +998,9 @@ int Client::on_read(const uint8_t *data, size_t len) {
if (rv != 0) {
return -1;
}
if (worker->current_phase == Phase::MAIN_DURATION) {
worker->stats.bytes_total += len;
}
signal_write();
return 0;
}
@ -1149,37 +1270,78 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
rate(rate),
max_samples(max_samples),
next_client_id(0) {
if (!config->is_rate_mode()) {
if (!config->is_rate_mode() && !config->is_timing_based_mode()) {
progress_interval = std::max(static_cast<size_t>(1), req_todo / 10);
} else {
progress_interval = std::max(static_cast<size_t>(1), nclients / 10);
}
// Below timeout is not needed in case of timing-based benchmarking
// create timer that will go off every rate_period
ev_timer_init(&timeout_watcher, rate_period_timeout_w_cb, 0.,
config->rate_period);
timeout_watcher.data = this;
if (config->is_timing_based_mode()) {
stats.req_stats.reserve(std::max(req_todo, max_samples));
stats.client_stats.reserve(std::max(nclients, max_samples));
} else {
stats.req_stats.reserve(std::min(req_todo, max_samples));
stats.client_stats.reserve(std::min(nclients, max_samples));
}
sampling_init(request_times_smp, max_samples);
sampling_init(client_smp, max_samples);
ev_timer_init(&duration_watcher, duration_timeout_cb, config->duration, 0.);
duration_watcher.data = this;
ev_timer_init(&warmup_watcher, warmup_timeout_cb, config->warm_up_time, 0.);
warmup_watcher.data = this;
if (config->is_timing_based_mode()) {
current_phase = Phase::INITIAL_IDLE;
} else {
current_phase = Phase::MAIN_DURATION;
}
}
Worker::~Worker() {
ev_timer_stop(loop, &timeout_watcher);
ev_timer_stop(loop, &duration_watcher);
ev_timer_stop(loop, &warmup_watcher);
ev_loop_destroy(loop);
}
void Worker::stop_all_clients() {
for (auto client : clients) {
if (client && client->session) {
client->terminate_session();
}
}
}
void Worker::free_client(Client *deleted_client) {
for (auto &client : clients) {
if (client == deleted_client) {
client->req_todo = client->req_done;
stats.req_todo += client->req_todo;
auto index = &client - &clients[0];
clients[index] = NULL;
return;
}
}
}
void Worker::run() {
if (!config->is_rate_mode()) {
if (!config->is_rate_mode() && !config->is_timing_based_mode()) {
for (size_t i = 0; i < nclients; ++i) {
auto req_todo = nreqs_per_client;
if (nreqs_rem > 0) {
++req_todo;
--nreqs_rem;
}
auto client = make_unique<Client>(next_client_id++, this, req_todo);
if (client->connect() != 0) {
std::cerr << "client could not connect to host" << std::endl;
@ -1188,11 +1350,14 @@ void Worker::run() {
client.release();
}
}
} else {
} else if (config->is_rate_mode()){
ev_timer_again(loop, &timeout_watcher);
// call callback so that we don't waste the first rate_period
rate_period_timeout_w_cb(loop, &timeout_watcher, 0);
} else {
// call the callback to start for one single time
rate_period_timeout_w_cb(loop, &timeout_watcher, 0);
}
ev_run(loop, 0);
}
@ -1222,7 +1387,8 @@ void Worker::sample_client_stat(ClientStat *cstat) {
}
void Worker::report_progress() {
if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval) {
if (id != 0 || config->is_rate_mode() || stats.req_done % progress_interval
|| config->is_timing_based_mode()) {
return;
}
@ -1564,12 +1730,27 @@ std::unique_ptr<Worker> create_worker(uint32_t id, SSL_CTX *ssl_ctx,
<< util::duration_str(config.rate_period) << " ";
}
if (config.is_timing_based_mode()) {
std::cout << "spawning thread #" << id << ": " << nclients
<< " total client(s). Timing-based test with "
<< config.warm_up_time << "s of warm-up time and "
<< config.duration << "s of main duration for measurements."
<< std::endl;
} else {
std::cout << "spawning thread #" << id << ": " << nclients
<< " total client(s). " << rate_report.str() << nreqs
<< " total requests" << std::endl;
}
if (config.is_rate_mode()) {
return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, rate, max_samples,
&config);
} else {
// Here rate is same as client because the rate_timeout callback
// will be called only once
return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, nclients, max_samples,
&config);
}
}
} // namespace
@ -1720,6 +1901,13 @@ Options:
length of the period in time. This option is ignored if
the rate option is not used. The default value for this
option is 1s.
-D, --duration=<N>
Specifies the main duration for the measurements in case
of timing-based benchmarking.
--warm-up-time=<DURATION>
Specifies the time period before starting the actual
measurements, in case of timing-based benchmarking.
Needs to provided along with -D option.
-T, --connection-active-timeout=<DURATION>
Specifies the maximum time that h2load is willing to
keep a connection open, regardless of the activity on
@ -1833,6 +2021,7 @@ int main(int argc, char **argv) {
{"rate", required_argument, nullptr, 'r'},
{"connection-active-timeout", required_argument, nullptr, 'T'},
{"connection-inactivity-timeout", required_argument, nullptr, 'N'},
{"duration", required_argument, nullptr, 'D'},
{"timing-script-file", required_argument, &flag, 3},
{"base-uri", required_argument, nullptr, 'B'},
{"npn-list", required_argument, &flag, 4},
@ -1840,10 +2029,11 @@ int main(int argc, char **argv) {
{"h1", no_argument, &flag, 6},
{"header-table-size", required_argument, &flag, 7},
{"encoder-header-table-size", required_argument, &flag, 8},
{"warm-up-time", required_argument, &flag, 9},
{nullptr, 0, nullptr, 0}};
int option_index = 0;
auto c =
getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:B:", long_options,
getopt_long(argc, argv, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:D:B:", long_options,
&option_index);
if (c == -1) {
break;
@ -1999,6 +2189,14 @@ int main(int argc, char **argv) {
config.base_uri = arg.str();
break;
}
case 'D':
config.duration = strtoul(optarg, nullptr, 10);
if (config.duration == 0) {
std::cerr << "-D: the main duration for timing-based benchmarking "
<< "must be positive." << std::endl;
exit(EXIT_FAILURE);
}
break;
case 'v':
config.verbose = true;
break;
@ -2055,6 +2253,14 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
break;
case 9:
// --warm-up-time
config.warm_up_time = util::parse_duration_with_unit(optarg);
if (!std::isfinite(config.warm_up_time)) {
std::cerr << "--warm-up-time: value error " << optarg << std::endl;
exit(EXIT_FAILURE);
}
break;
}
break;
default:
@ -2140,8 +2346,9 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
if (config.nreqs == 0) {
std::cerr << "-n: the number of requests must be strictly greater than 0."
if (config.nreqs == 0 && !config.is_timing_based_mode()) {
std::cerr << "-n: the number of requests must be strictly greater than 0,"
<< "timing-based test is not being run."
<< std::endl;
exit(EXIT_FAILURE);
}
@ -2165,7 +2372,7 @@ int main(int argc, char **argv) {
// With timing script, we don't distribute config.nreqs to each
// client or thread.
if (!config.timing_script && config.nreqs < config.nclients) {
if (!config.timing_script && config.nreqs < config.nclients && !config.is_timing_based_mode()) {
std::cerr << "-n, -c: the number of requests must be greater than or "
<< "equal to the clients." << std::endl;
exit(EXIT_FAILURE);
@ -2173,11 +2380,20 @@ int main(int argc, char **argv) {
if (config.nclients < config.nthreads) {
std::cerr << "-c, -t: the number of clients must be greater than or equal "
"to the number of threads."
<< "to the number of threads."
<< std::endl;
exit(EXIT_FAILURE);
}
if (config.is_timing_based_mode()) {
if (config.nreqs != 0) {
std::cerr << "-n: the number of requests needs to be zero (0) for timing-"
<< "based test. Default value is 1."
<< std::endl;
exit(EXIT_FAILURE);
}
}
if (config.is_rate_mode()) {
if (config.rate < config.nthreads) {
std::cerr << "-r, -t: the connection rate must be greater than or equal "
@ -2511,7 +2727,7 @@ int main(int argc, char **argv) {
// Requests which have not been issued due to connection errors, are
// counted towards req_failed and req_error.
auto req_not_issued =
stats.req_todo - stats.req_status_success - stats.req_failed;
(stats.req_todo - stats.req_status_success - stats.req_failed);
stats.req_failed += req_not_issued;
stats.req_error += req_not_issued;
@ -2522,11 +2738,17 @@ int main(int argc, char **argv) {
double rps = 0;
int64_t bps = 0;
if (duration.count() > 0) {
if (config.is_timing_based_mode()) {
// we only want to consider the main duration if warm-up is given
rps = stats.req_success / config.duration;
bps = stats.bytes_total / config.duration;
} else {
auto secd = std::chrono::duration_cast<
std::chrono::duration<double, std::chrono::seconds::period>>(duration);
rps = stats.req_success / secd.count();
bps = stats.bytes_total / secd.count();
}
}
double header_space_savings = 0.;
if (stats.bytes_head_decomp > 0) {

View File

@ -85,6 +85,10 @@ struct Config {
// rate at which connections should be made
size_t rate;
ev_tstamp rate_period;
// amount of time for main measurements in timing-based test
ev_tstamp duration;
// amount of time to wait before starting measurements in timing-based test
ev_tstamp warm_up_time;
// amount of time to wait for activity on a given connection
ev_tstamp conn_active_timeout;
// amount of time to wait after the last request is made on a connection
@ -118,6 +122,7 @@ struct Config {
~Config();
bool is_rate_mode() const;
bool is_timing_based_mode() const;
bool has_base_uri() const;
};
@ -215,6 +220,15 @@ struct Stats {
enum ClientState { CLIENT_IDLE, CLIENT_CONNECTED };
// This type tells whether the client is in warmup phase or not or is over
enum class Phase {
INITIAL_IDLE, // Initial idle state before warm-up phase
WARM_UP, // Warm up phase when no measurements are done
MAIN_DURATION, // Main measurement phase; if timing-based
// test is not run, this is the default phase
DURATION_OVER // This phase occurs after the measurements are over
};
struct Client;
// We use reservoir sampling method
@ -250,6 +264,13 @@ struct Worker {
ev_timer timeout_watcher;
// The next client ID this worker assigns
uint32_t next_client_id;
// Keeps track of the current phase (for timing-based experiment) for the worker
Phase current_phase;
// We need to keep track of the clients in order to stop them when needed
std::vector<Client*> clients;
// This is only active when there is not a bounded number of requests specified
ev_timer duration_watcher;
ev_timer warmup_watcher;
Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients,
size_t rate, size_t max_samples, Config *config);
@ -260,6 +281,10 @@ struct Worker {
void sample_client_stat(ClientStat *cstat);
void report_progress();
void report_rate_progress();
// This function calls the destructors of all the clients.
void stop_all_clients();
// This function frees a client from the list of clients for this Worker.
void free_client(Client*);
};
struct Stream {