From 60bbb5cae058cc5133af00f14f8f6f56bc5b1d03 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 5 Jan 2016 22:36:37 +0900 Subject: [PATCH] h2load: Perform sampling for request timings to reduce memory consumption --- src/h2load.cc | 62 ++++++++++++++++++++++++++++++------- src/h2load.h | 13 ++++++-- src/h2load_http1_session.cc | 18 ++++++----- src/h2load_http1_session.h | 3 +- src/h2load_http2_session.cc | 37 +++++++++++++++------- src/h2load_http2_session.h | 2 +- src/h2load_session.h | 2 +- src/h2load_spdy_session.cc | 15 +++------ src/h2load_spdy_session.h | 2 +- 9 files changed, 106 insertions(+), 48 deletions(-) diff --git a/src/h2load.cc b/src/h2load.cc index 1b8bd343..b1a5edda 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -98,11 +98,15 @@ Config config; RequestStat::RequestStat() : data_offset(0), completed(false) {} +constexpr size_t MAX_STATS = 1000000; + Stats::Stats(size_t req_todo, size_t nclients) : req_todo(0), req_started(0), req_done(0), req_success(0), req_status_success(0), req_failed(0), req_error(0), req_timedout(0), bytes_total(0), bytes_head(0), bytes_head_decomp(0), bytes_body(0), - status(), req_stats(req_todo), client_stats(nclients) {} + status(), client_stats(nclients) { + req_stats.reserve(std::min(req_todo, MAX_STATS)); +} Stream::Stream() : status_success(-1) {} @@ -420,9 +424,8 @@ void Client::disconnect() { } int Client::submit_request() { - auto req_stat = &worker->stats.req_stats[worker->stats.req_started++]; - - if (session->submit_request(req_stat) != 0) { + ++worker->stats.req_started; + if (session->submit_request() != 0) { return -1; } @@ -607,8 +610,12 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) { } } -void Client::on_stream_close(int32_t stream_id, bool success, - RequestStat *req_stat, bool final) { +void Client::on_stream_close(int32_t stream_id, bool success, bool final) { + auto req_stat = get_req_stat(stream_id); + if (!req_stat) { + return; + } + req_stat->stream_close_time = std::chrono::steady_clock::now(); if (success) { req_stat->completed = true; @@ -628,6 +635,12 @@ void Client::on_stream_close(int32_t stream_id, bool success, ++worker->stats.req_failed; ++worker->stats.req_error; } + + if (req_stat->completed && + (worker->stats.req_done % worker->request_times_sampling_step) == 0) { + worker->sample_req_stat(req_stat); + } + report_progress(); streams.erase(stream_id); if (req_done == req_todo) { @@ -645,6 +658,15 @@ void Client::on_stream_close(int32_t stream_id, bool success, } } +RequestStat *Client::get_req_stat(int32_t stream_id) { + auto it = streams.find(stream_id); + if (it == std::end(streams)) { + return nullptr; + } + + return &(*it).second.req_stat; +} + int Client::connection_made() { if (ssl) { report_tls_info(); @@ -750,7 +772,6 @@ int Client::connection_made() { if (!config.timing_script) { auto nreq = std::min(req_todo - req_started, (size_t)config.max_concurrent_streams); - for (; nreq > 0; --nreq) { if (submit_request() != 0) { process_request_failure(); @@ -1060,6 +1081,10 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, clients.push_back(make_unique(next_client_id++, this, req_todo)); } } + + auto request_times_max_stats = std::min(req_todo, MAX_STATS); + request_times_sampling_step = + (req_todo + request_times_max_stats - 1) / request_times_max_stats; } Worker::~Worker() { @@ -1088,6 +1113,11 @@ void Worker::run() { ev_run(loop, 0); } +void Worker::sample_req_stat(RequestStat *req_stat) { + stats.req_stats.push_back(*req_stat); + assert(stats.req_stats.size() <= MAX_STATS); +} + namespace { // Returns percentage of number of samples within mean +/- sd. double within_sd(const std::vector &samples, double mean, double sd) { @@ -1106,12 +1136,15 @@ double within_sd(const std::vector &samples, double mean, double sd) { namespace { // Computes statistics using |samples|. The min, max, mean, sd, and // percentage of number of samples within mean +/- sd are computed. -SDStat compute_time_stat(const std::vector &samples) { +// If |sampling| is true, this computes sample variance. Otherwise, +// population variance. +SDStat compute_time_stat(const std::vector &samples, + bool sampling = false) { if (samples.empty()) { return {0.0, 0.0, 0.0, 0.0, 0.0}; } // standard deviation calculated using Rapid calculation method: - // http://en.wikipedia.org/wiki/Standard_deviation#Rapid_calculation_methods + // https://en.wikipedia.org/wiki/Standard_deviation#Rapid_calculation_methods double a = 0, q = 0; size_t n = 0; double sum = 0; @@ -1130,7 +1163,7 @@ SDStat compute_time_stat(const std::vector &samples) { assert(n > 0); res.mean = sum / n; - res.sd = sqrt(q / n); + res.sd = sqrt(q / (sampling && n > 1 ? n - 1 : n)); res.within_sd = within_sd(samples, res.mean, res.sd); return res; @@ -1140,9 +1173,13 @@ SDStat compute_time_stat(const std::vector &samples) { namespace { SDStats process_time_stats(const std::vector> &workers) { + auto request_times_sampling = false; size_t nrequest_times = 0; for (const auto &w : workers) { nrequest_times += w->stats.req_stats.size(); + if (w->request_times_sampling_step != 1) { + request_times_sampling = true; + } } std::vector request_times; @@ -1195,8 +1232,9 @@ process_time_stats(const std::vector> &workers) { } } - return {compute_time_stat(request_times), compute_time_stat(connect_times), - compute_time_stat(ttfb_times), compute_time_stat(rps_values)}; + return {compute_time_stat(request_times, request_times_sampling), + compute_time_stat(connect_times), compute_time_stat(ttfb_times), + compute_time_stat(rps_values)}; } } // namespace diff --git a/src/h2load.h b/src/h2load.h index a7e07ea4..a6366f6d 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -226,6 +226,9 @@ struct Worker { // at most nreqs_rem clients get an extra request size_t nreqs_rem; size_t rate; + // every successful request_times_sampling_step-th request's + // req_stat will get sampled. + size_t request_times_sampling_step; ev_timer timeout_watcher; // The next client ID this worker assigns uint32_t next_client_id; @@ -235,9 +238,11 @@ struct Worker { ~Worker(); Worker(Worker &&o) = default; void run(); + void sample_req_stat(RequestStat *req_stat); }; struct Stream { + RequestStat req_stat; int status_success; Stream(); }; @@ -318,8 +323,12 @@ struct Client { // |success| == true means that the request/response was exchanged // |successfully, but it does not mean response carried successful // |HTTP status code. - void on_stream_close(int32_t stream_id, bool success, RequestStat *req_stat, - bool final = false); + void on_stream_close(int32_t stream_id, bool success, bool final = false); + // Returns RequestStat for |stream_id|. This function must be + // called after on_request(stream_id), and before + // on_stream_close(stream_id, ...). Otherwise, this will return + // nullptr. + RequestStat *get_req_stat(int32_t stream_id); void record_request_time(RequestStat *req_stat); void record_connect_start_time(); diff --git a/src/h2load_http1_session.cc b/src/h2load_http1_session.cc index aa43f90d..a295c625 100644 --- a/src/h2load_http1_session.cc +++ b/src/h2load_http1_session.cc @@ -80,9 +80,11 @@ int htp_msg_completecb(http_parser *htp) { auto client = session->get_client(); auto final = http_should_keep_alive(htp) == 0; - client->on_stream_close(session->stream_resp_counter_, true, - session->req_stats_[session->stream_resp_counter_], - final); + auto req_stat = client->get_req_stat(session->stream_resp_counter_); + + assert(req_stat); + + client->on_stream_close(session->stream_resp_counter_, true, final); session->stream_resp_counter_ += 2; @@ -150,7 +152,7 @@ http_parser_settings htp_hooks = { void Http1Session::on_connect() { client_->signal_write(); } -int Http1Session::submit_request(RequestStat *req_stat) { +int Http1Session::submit_request() { auto config = client_->worker->config; const auto &req = config->h1reqs[client_->reqidx]; client_->reqidx++; @@ -159,13 +161,13 @@ int Http1Session::submit_request(RequestStat *req_stat) { client_->reqidx = 0; } - assert(req_stat); + client_->on_request(stream_req_counter_); + + auto req_stat = client_->get_req_stat(stream_req_counter_); + client_->record_request_time(req_stat); client_->wb.write(req.c_str(), req.size()); - client_->on_request(stream_req_counter_); - req_stats_[stream_req_counter_] = req_stat; - // increment for next request stream_req_counter_ += 2; diff --git a/src/h2load_http1_session.h b/src/h2load_http1_session.h index a19ba65e..1b48c9c4 100644 --- a/src/h2load_http1_session.h +++ b/src/h2load_http1_session.h @@ -38,14 +38,13 @@ public: Http1Session(Client *client); virtual ~Http1Session(); virtual void on_connect(); - virtual int submit_request(RequestStat *req_stat); + virtual int submit_request(); virtual int on_read(const uint8_t *data, size_t len); virtual int on_write(); virtual void terminate(); Client *get_client(); int32_t stream_req_counter_; int32_t stream_resp_counter_; - std::unordered_map req_stats_; private: Client *client_; diff --git a/src/h2load_http2_session.cc b/src/h2load_http2_session.cc index 0bbbe8e8..7883131d 100644 --- a/src/h2load_http2_session.cc +++ b/src/h2load_http2_session.cc @@ -89,12 +89,25 @@ namespace { int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { auto client = static_cast(user_data); - auto req_stat = static_cast( - nghttp2_session_get_stream_user_data(session, stream_id)); - if (!req_stat) { + client->on_stream_close(stream_id, error_code == NGHTTP2_NO_ERROR); + + return 0; +} +} // namespace + +namespace { +int on_frame_not_send_callback(nghttp2_session *session, + const nghttp2_frame *frame, int lib_error_code, + void *user_data) { + if (frame->hd.type != NGHTTP2_HEADERS || + frame->headers.cat != NGHTTP2_HCAT_REQUEST) { return 0; } - client->on_stream_close(stream_id, error_code == NGHTTP2_NO_ERROR, req_stat); + + auto client = static_cast(user_data); + // request was not sent. Mark it as error. + client->on_stream_close(frame->hd.stream_id, false); + return 0; } } // namespace @@ -108,9 +121,7 @@ int before_frame_send_callback(nghttp2_session *session, } auto client = static_cast(user_data); - client->on_request(frame->hd.stream_id); - auto req_stat = static_cast( - nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); + auto req_stat = client->get_req_stat(frame->hd.stream_id); assert(req_stat); client->record_request_time(req_stat); @@ -124,8 +135,7 @@ ssize_t file_read_callback(nghttp2_session *session, int32_t stream_id, nghttp2_data_source *source, void *user_data) { auto client = static_cast(user_data); auto config = client->worker->config; - auto req_stat = static_cast( - nghttp2_session_get_stream_user_data(session, stream_id)); + auto req_stat = client->get_req_stat(stream_id); assert(req_stat); ssize_t nread; while ((nread = pread(config->data_fd, buf, length, req_stat->data_offset)) == @@ -183,6 +193,9 @@ void Http2Session::on_connect() { nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback); + nghttp2_session_callbacks_set_on_frame_not_send_callback( + callbacks, on_frame_not_send_callback); + nghttp2_session_callbacks_set_before_frame_send_callback( callbacks, before_frame_send_callback); @@ -212,7 +225,7 @@ void Http2Session::on_connect() { client_->signal_write(); } -int Http2Session::submit_request(RequestStat *req_stat) { +int Http2Session::submit_request() { if (nghttp2_session_check_request_allowed(session_) == 0) { return -1; } @@ -228,11 +241,13 @@ int Http2Session::submit_request(RequestStat *req_stat) { auto stream_id = nghttp2_submit_request(session_, nullptr, nva.data(), nva.size(), - config->data_fd == -1 ? nullptr : &prd, req_stat); + config->data_fd == -1 ? nullptr : &prd, nullptr); if (stream_id < 0) { return -1; } + client_->on_request(stream_id); + return 0; } diff --git a/src/h2load_http2_session.h b/src/h2load_http2_session.h index c3a3344b..2d8b1fa8 100644 --- a/src/h2load_http2_session.h +++ b/src/h2load_http2_session.h @@ -38,7 +38,7 @@ public: Http2Session(Client *client); virtual ~Http2Session(); virtual void on_connect(); - virtual int submit_request(RequestStat *req_stat); + virtual int submit_request(); virtual int on_read(const uint8_t *data, size_t len); virtual int on_write(); virtual void terminate(); diff --git a/src/h2load_session.h b/src/h2load_session.h index 1f8ed60a..17103aa9 100644 --- a/src/h2load_session.h +++ b/src/h2load_session.h @@ -41,7 +41,7 @@ public: // Called when the connection was made. virtual void on_connect() = 0; // Called when one request must be issued. - virtual int submit_request(RequestStat *req_stat) = 0; + virtual int submit_request() = 0; // Called when incoming bytes are available. The subclass has to // return the number of bytes read. virtual int on_read(const uint8_t *data, size_t len) = 0; diff --git a/src/h2load_spdy_session.cc b/src/h2load_spdy_session.cc index a3b85591..4afa13e1 100644 --- a/src/h2load_spdy_session.cc +++ b/src/h2load_spdy_session.cc @@ -48,9 +48,7 @@ void before_ctrl_send_callback(spdylay_session *session, return; } client->on_request(frame->syn_stream.stream_id); - auto req_stat = - static_cast(spdylay_session_get_stream_user_data( - session, frame->syn_stream.stream_id)); + auto req_stat = client->get_req_stat(frame->syn_stream.stream_id); client->record_request_time(req_stat); } } // namespace @@ -104,9 +102,7 @@ void on_stream_close_callback(spdylay_session *session, int32_t stream_id, spdylay_status_code status_code, void *user_data) { auto client = static_cast(user_data); - auto req_stat = static_cast( - spdylay_session_get_stream_user_data(session, stream_id)); - client->on_stream_close(stream_id, status_code == SPDYLAY_OK, req_stat); + client->on_stream_close(stream_id, status_code == SPDYLAY_OK); } } // namespace @@ -130,8 +126,7 @@ ssize_t file_read_callback(spdylay_session *session, int32_t stream_id, spdylay_data_source *source, void *user_data) { auto client = static_cast(user_data); auto config = client->worker->config; - auto req_stat = static_cast( - spdylay_session_get_stream_user_data(session, stream_id)); + auto req_stat = client->get_req_stat(stream_id); ssize_t nread; while ((nread = pread(config->data_fd, buf, length, req_stat->data_offset)) == @@ -185,7 +180,7 @@ void SpdySession::on_connect() { client_->signal_write(); } -int SpdySession::submit_request(RequestStat *req_stat) { +int SpdySession::submit_request() { int rv; auto config = client_->worker->config; auto &nv = config->nv[client_->reqidx++]; @@ -197,7 +192,7 @@ int SpdySession::submit_request(RequestStat *req_stat) { spdylay_data_provider prd{{0}, file_read_callback}; rv = spdylay_submit_request(session_, 0, nv.data(), - config->data_fd == -1 ? nullptr : &prd, req_stat); + config->data_fd == -1 ? nullptr : &prd, nullptr); if (rv != 0) { return -1; diff --git a/src/h2load_spdy_session.h b/src/h2load_spdy_session.h index 288be1df..44fd4d55 100644 --- a/src/h2load_spdy_session.h +++ b/src/h2load_spdy_session.h @@ -40,7 +40,7 @@ public: SpdySession(Client *client, uint16_t spdy_version); virtual ~SpdySession(); virtual void on_connect(); - virtual int submit_request(RequestStat *req_stat); + virtual int submit_request(); virtual int on_read(const uint8_t *data, size_t len); virtual int on_write(); virtual void terminate();