h2load: Start thread execution using conditional variable

When thread is created, we pause them.  After all threads are created,
master thread sends signal to all worker threads and let them start to
benchmark.  This will make thread start almost at the same time since
we can avoid thread creation overhead.  It also exclude thread
creating time from benchmark time.  We also simplified thread creation
routine, and now we always use dedicted worker thread to issue
requests even if -t1.
This commit is contained in:
Tatsuhiro Tsujikawa 2015-10-09 21:36:28 +09:00
parent 0d27a89915
commit ca9e7c2c2d
1 changed files with 52 additions and 46 deletions

View File

@ -1254,6 +1254,24 @@ void read_script_from_file(std::istream &infile,
} }
} // namespace } // namespace
namespace {
std::unique_ptr<Worker> create_worker(uint32_t id, SSL_CTX *ssl_ctx,
size_t nreqs, size_t nclients,
size_t rate) {
std::stringstream rate_report;
if (config.is_rate_mode() && nclients > rate) {
rate_report << "Up to " << rate << " client(s) will be created every "
<< std::setprecision(3) << config.rate_period << " seconds. ";
}
std::cout << "spawning thread #" << id << ": " << nclients
<< " total client(s). " << rate_report.str() << nreqs
<< " total requests" << std::endl;
return make_unique<Worker>(id, ssl_ctx, nreqs, nclients, rate, &config);
}
} // namespace
namespace { namespace {
void print_version(std::ostream &out) { void print_version(std::ostream &out) {
out << "h2load nghttp2/" NGHTTP2_VERSION << std::endl; out << "h2load nghttp2/" NGHTTP2_VERSION << std::endl;
@ -1905,6 +1923,12 @@ int main(int argc, char **argv) {
resolve_host(); resolve_host();
std::cout << "starting benchmark..." << std::endl;
std::vector<std::unique_ptr<Worker>> workers;
workers.reserve(config.nthreads);
#ifndef NOTHREADS
size_t nreqs_per_thread = 0; size_t nreqs_per_thread = 0;
ssize_t nreqs_rem = 0; ssize_t nreqs_rem = 0;
@ -1919,16 +1943,12 @@ int main(int argc, char **argv) {
size_t rate_per_thread = config.rate / config.nthreads; size_t rate_per_thread = config.rate / config.nthreads;
ssize_t rate_per_thread_rem = config.rate % config.nthreads; ssize_t rate_per_thread_rem = config.rate % config.nthreads;
std::cout << "starting benchmark..." << std::endl; std::mutex mu;
std::condition_variable cv;
auto ready = false;
auto start = std::chrono::steady_clock::now();
std::vector<std::unique_ptr<Worker>> workers;
workers.reserve(config.nthreads);
#ifndef NOTHREADS
std::vector<std::future<void>> futures; std::vector<std::future<void>> futures;
for (size_t i = 0; i < config.nthreads - 1; ++i) { for (size_t i = 0; i < config.nthreads; ++i) {
auto rate = rate_per_thread; auto rate = rate_per_thread;
if (rate_per_thread_rem > 0) { if (rate_per_thread_rem > 0) {
--rate_per_thread_rem; --rate_per_thread_rem;
@ -1955,55 +1975,41 @@ int main(int argc, char **argv) {
} }
} }
std::stringstream rate_report; workers.push_back(create_worker(i, ssl_ctx, nreqs, nclients, rate));
if (config.is_rate_mode() && nclients > rate) {
rate_report << "Up to " << rate << " client(s) will be created every "
<< std::setprecision(3) << config.rate_period << " seconds. ";
}
std::cout << "spawning thread #" << i << ": " << nclients
<< " total client(s). " << rate_report.str() << nreqs
<< " total requests" << std::endl;
workers.push_back(
make_unique<Worker>(i, ssl_ctx, nreqs, nclients, rate, &config));
auto &worker = workers.back(); auto &worker = workers.back();
futures.push_back( futures.push_back(
std::async(std::launch::async, [&worker]() { worker->run(); })); std::async(std::launch::async, [&worker, &mu, &cv, &ready]() {
{
std::unique_lock<std::mutex> ulk(mu);
cv.wait(ulk, [&ready] { return ready; });
}
worker->run();
}));
} }
#endif // NOTHREADS
assert(rate_per_thread_rem == 0);
assert(nclients_rem == 0);
assert(nreqs_rem == 0);
{ {
auto rate_last = rate_per_thread; std::lock_guard<std::mutex> lg(mu);
auto nclients_last = nclients_per_thread; ready = true;
auto nreqs_last = cv.notify_all();
config.timing_script ? config.nreqs * nclients_last : nreqs_per_thread;
std::stringstream rate_report;
if (config.is_rate_mode() && nclients_last > rate_last) {
rate_report << "Up to " << rate_last
<< " client(s) will be created every " << std::setprecision(3)
<< config.rate_period << " seconds. ";
} }
std::cout << "spawning thread #" << (config.nthreads - 1) << ": " auto start = std::chrono::steady_clock::now();
<< nclients_last << " total client(s). " << rate_report.str()
<< nreqs_last << " total requests" << std::endl;
workers.push_back(make_unique<Worker>(config.nthreads - 1, ssl_ctx,
nreqs_last, nclients_last, rate_last,
&config));
}
workers.back()->run();
#ifndef NOTHREADS
for (auto &fut : futures) { for (auto &fut : futures) {
fut.get(); fut.get();
} }
#else // NOTHREADS
auto rate = config.rate;
auto nclients = config.nclients;
auto nreqs =
config.timing_script ? config.nreqs * config.nclients : config.nreqs;
workers.push_back(create_worker(0, ssl_ctx, nreqs, nclients, rate));
auto start = std::chrono::steady_clock::now();
workers.back()->run();
#endif // NOTHREADS #endif // NOTHREADS
auto end = std::chrono::steady_clock::now(); auto end = std::chrono::steady_clock::now();