add support for requests until reconnect via -q

This commit is contained in:
Joe Damato 2021-06-21 11:22:36 -07:00
parent 9e6c0685a2
commit b3564ba872
3 changed files with 56 additions and 17 deletions

View File

@ -98,6 +98,7 @@ Config::Config()
verbose(false), verbose(false),
timing_script(false), timing_script(false),
base_uri_unix(false), base_uri_unix(false),
requests_until_reconnect(0),
unix_addr{}, unix_addr{},
rps(0.) {} rps(0.) {}
@ -215,7 +216,7 @@ void rate_period_timeout_w_cb(struct ev_loop *loop, ev_timer *w, int revents) {
--worker->nreqs_rem; --worker->nreqs_rem;
} }
auto client = auto client =
std::make_unique<Client>(worker->next_client_id++, worker, req_todo); std::make_unique<Client>(worker->next_client_id++, worker, req_todo, 0);
++worker->nconns_made; ++worker->nconns_made;
@ -404,7 +405,8 @@ void client_request_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
} }
} // namespace } // namespace
Client::Client(uint32_t id, Worker *worker, size_t req_todo) Client::Client(uint32_t id, Worker *worker, size_t req_todo, uint32_t
requests_until_reconnect)
: wb(&worker->mcpool), : wb(&worker->mcpool),
cstat{}, cstat{},
worker(worker), worker(worker),
@ -418,6 +420,8 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
req_inflight(0), req_inflight(0),
req_started(0), req_started(0),
req_done(0), req_done(0),
req_until_reconnect(requests_until_reconnect),
req_until_reconnect_rearmed(false),
id(id), id(id),
fd(-1), fd(-1),
new_connection_requested(false), new_connection_requested(false),
@ -573,12 +577,16 @@ int Client::try_again_or_fail() {
if (req_left) { if (req_left) {
if (worker->current_phase == Phase::MAIN_DURATION) { if (worker->current_phase == Phase::MAIN_DURATION) {
if (req_inflight > 0) {
// At the moment, we don't have a facility to re-start request // At the moment, we don't have a facility to re-start request
// already in in-flight. Make them fail. // already in in-flight. Make them fail.
worker->stats.req_failed += req_inflight; worker->stats.req_failed += req_inflight;
worker->stats.req_error += req_inflight; worker->stats.req_error += req_inflight;
req_inflight = 0; req_inflight = 0;
} else {
req_until_reconnect_rearmed = false;
}
} }
// Keep using current address // Keep using current address
@ -1343,7 +1351,7 @@ int get_ev_loop_flags() {
} // namespace } // namespace
Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients, Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
size_t rate, size_t max_samples, Config *config) uint32_t requests_until_reconnect, size_t rate, size_t max_samples, Config *config)
: stats(req_todo, nclients), : stats(req_todo, nclients),
loop(ev_loop_new(get_ev_loop_flags())), loop(ev_loop_new(get_ev_loop_flags())),
ssl_ctx(ssl_ctx), ssl_ctx(ssl_ctx),
@ -1355,6 +1363,7 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
nclients(nclients), nclients(nclients),
nreqs_per_client(req_todo / nclients), nreqs_per_client(req_todo / nclients),
nreqs_rem(req_todo % nclients), nreqs_rem(req_todo % nclients),
requests_until_reconnect(requests_until_reconnect),
rate(rate), rate(rate),
max_samples(max_samples), max_samples(max_samples),
next_client_id(0) { next_client_id(0) {
@ -1430,7 +1439,8 @@ void Worker::run() {
--nreqs_rem; --nreqs_rem;
} }
auto client = std::make_unique<Client>(next_client_id++, this, req_todo); auto client = std::make_unique<Client>(next_client_id++, this, req_todo,
requests_until_reconnect);
if (client->connect() != 0) { if (client->connect() != 0) {
std::cerr << "client could not connect to host" << std::endl; std::cerr << "client could not connect to host" << std::endl;
client->fail(); client->fail();
@ -1837,14 +1847,20 @@ std::unique_ptr<Worker> create_worker(uint32_t id, SSL_CTX *ssl_ctx,
<< " total requests" << std::endl; << " total requests" << std::endl;
} }
if (config.requests_until_reconnect > 0) {
std::cout << " number of requests until reconnect: " <<
config.requests_until_reconnect << std::endl;
}
if (config.is_rate_mode()) { if (config.is_rate_mode()) {
return std::make_unique<Worker>(id, ssl_ctx, nreqs, nclients, rate, return std::make_unique<Worker>(id, ssl_ctx, nreqs, nclients,
max_samples, &config); config.requests_until_reconnect, rate, max_samples, &config);
} else { } else {
// Here rate is same as client because the rate_timeout callback // Here rate is same as client because the rate_timeout callback
// will be called only once // will be called only once
return std::make_unique<Worker>(id, ssl_ctx, nreqs, nclients, nclients, return std::make_unique<Worker>(id, ssl_ctx, nreqs, nclients,
max_samples, &config); config.requests_until_reconnect,
nclients, max_samples, &config);
} }
} }
} // namespace } // namespace
@ -1921,6 +1937,10 @@ Options:
Number of native threads. Number of native threads.
Default: )" Default: )"
<< config.nthreads << R"( << config.nthreads << R"(
-q, --requests-until-reconnect=<N>
Number of requests to issue per client before a reconnect is
done from that client. Default is 0, which means this behavior
is disabled.
-i, --input-file=<PATH> -i, --input-file=<PATH>
Path of a file with multiple URIs are separated by EOLs. Path of a file with multiple URIs are separated by EOLs.
This option will disable URIs getting from command-line. This option will disable URIs getting from command-line.
@ -2130,10 +2150,11 @@ int main(int argc, char **argv) {
{"log-file", required_argument, &flag, 10}, {"log-file", required_argument, &flag, 10},
{"connect-to", required_argument, &flag, 11}, {"connect-to", required_argument, &flag, 11},
{"rps", required_argument, &flag, 12}, {"rps", required_argument, &flag, 12},
{"requests-until-reconnect", required_argument, nullptr, 'q'},
{nullptr, 0, nullptr, 0}}; {nullptr, 0, nullptr, 0}};
int option_index = 0; int option_index = 0;
auto c = getopt_long(argc, argv, auto c = getopt_long(argc, argv,
"hvW:c:d:m:n:p:t:w:H:i:r:T:N:D:B:", long_options, "hvW:c:d:m:n:p:t:w:H:i:r:T:N:D:B:q:", long_options,
&option_index); &option_index);
if (c == -1) { if (c == -1) {
break; break;
@ -2143,6 +2164,9 @@ int main(int argc, char **argv) {
config.nreqs = strtoul(optarg, nullptr, 10); config.nreqs = strtoul(optarg, nullptr, 10);
nreqs_set_manually = true; nreqs_set_manually = true;
break; break;
case 'q':
config.requests_until_reconnect = strtoul(optarg, nullptr, 10);
break;
case 'c': case 'c':
config.nclients = strtoul(optarg, nullptr, 10); config.nclients = strtoul(optarg, nullptr, 10);
break; break;

View File

@ -104,6 +104,7 @@ struct Config {
uint16_t default_port; uint16_t default_port;
uint16_t connect_to_port; uint16_t connect_to_port;
bool verbose; bool verbose;
uint32_t requests_until_reconnect;
bool timing_script; bool timing_script;
std::string base_uri; std::string base_uri;
// true if UNIX domain socket is used. In this case, base_uri is // true if UNIX domain socket is used. In this case, base_uri is
@ -268,6 +269,9 @@ struct Worker {
ev_timer timeout_watcher; ev_timer timeout_watcher;
// The next client ID this worker assigns // The next client ID this worker assigns
uint32_t next_client_id; uint32_t next_client_id;
// the number of requests that should be processed before the client
// reconnects
const uint32_t requests_until_reconnect;
// Keeps track of the current phase (for timing-based experiment) for the // Keeps track of the current phase (for timing-based experiment) for the
// worker // worker
Phase current_phase; Phase current_phase;
@ -279,7 +283,7 @@ struct Worker {
ev_timer warmup_watcher; ev_timer warmup_watcher;
Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients, Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t nreq_todo, size_t nclients,
size_t rate, size_t max_samples, Config *config); uint32_t reqs_until_reconnect, size_t rate, size_t max_samples, Config *config);
~Worker(); ~Worker();
Worker(Worker &&o) = default; Worker(Worker &&o) = default;
void run(); void run();
@ -329,6 +333,8 @@ struct Client {
size_t req_started; size_t req_started;
// The number of requests this client has done so far. // The number of requests this client has done so far.
size_t req_done; size_t req_done;
// The number of requests this client has left until a reconnect is done.
uint32_t req_until_reconnect;
// The client id per worker // The client id per worker
uint32_t id; uint32_t id;
int fd; int fd;
@ -336,6 +342,7 @@ struct Client {
ev_timer conn_inactivity_watcher; ev_timer conn_inactivity_watcher;
std::string selected_proto; std::string selected_proto;
bool new_connection_requested; bool new_connection_requested;
bool req_until_reconnect_rearmed;
// true if the current connection will be closed, and no more new // true if the current connection will be closed, and no more new
// request cannot be processed. // request cannot be processed.
bool final; bool final;
@ -356,7 +363,8 @@ struct Client {
enum { ERR_CONNECT_FAIL = -100 }; enum { ERR_CONNECT_FAIL = -100 };
Client(uint32_t id, Worker *worker, size_t req_todo); Client(uint32_t id, Worker *worker, size_t req_todo, uint32_t
requests_until_reconnect);
~Client(); ~Client();
int make_socket(addrinfo *addr); int make_socket(addrinfo *addr);
int connect(); int connect();

View File

@ -89,18 +89,25 @@ int htp_msg_completecb(llhttp_t *htp) {
session->stream_resp_counter_ += 2; session->stream_resp_counter_ += 2;
if (client->final) { if (client->final || (client->worker->requests_until_reconnect > 0 &&
client->req_until_reconnect == 0)) {
session->stream_req_counter_ = session->stream_resp_counter_; session->stream_req_counter_ = session->stream_resp_counter_;
// Connection is going down. If we have still request to do, // Connection is going down. If we have still request to do,
// create new connection and keep on doing the job. // create new connection and keep on doing the job.
if (client->req_left) { if (client->req_left) {
client->req_until_reconnect = client->worker->requests_until_reconnect;
client->req_until_reconnect_rearmed = true;
client->try_new_connection(); client->try_new_connection();
} }
return HPE_PAUSED; return HPE_PAUSED;
} }
if (client->worker->requests_until_reconnect > 0) {
client->req_until_reconnect--;
}
return 0; return 0;
} }
} // namespace } // namespace