From 252df2d22c845ff0808fe4fdd2122981d7f33ea3 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Mon, 28 Sep 2015 23:50:44 +0900 Subject: [PATCH] h2load: Reconnect server on connection: close --- src/h2load.cc | 110 +++++++++++++++++++++++------------- src/h2load.h | 12 +++- src/h2load_http1_session.cc | 20 ++++++- 3 files changed, 101 insertions(+), 41 deletions(-) diff --git a/src/h2load.cc b/src/h2load.cc index 96a8b586..ebb0cdb3 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -108,6 +108,8 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { auto rv = client->do_write(); if (rv == Client::ERR_CONNECT_FAIL) { client->disconnect(); + // Try next address + client->current_addr = nullptr; rv = client->connect(); if (rv != 0) { client->fail(); @@ -220,9 +222,10 @@ void client_request_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { } // namespace Client::Client(Worker *worker, size_t req_todo) - : worker(worker), ssl(nullptr), next_addr(config.addrs), reqidx(0), - state(CLIENT_IDLE), first_byte_received(false), req_todo(req_todo), - req_started(0), req_done(0), fd(-1) { + : worker(worker), ssl(nullptr), next_addr(config.addrs), + current_addr(nullptr), reqidx(0), state(CLIENT_IDLE), + first_byte_received(false), req_todo(req_todo), req_started(0), + req_done(0), fd(-1), new_connection_requested(false) { ev_io_init(&wev, writecb, 0, EV_WRITE); ev_io_init(&rev, readcb, 0, EV_READ); @@ -246,48 +249,67 @@ Client::~Client() { disconnect(); } int Client::do_read() { return readfn(*this); } int Client::do_write() { return writefn(*this); } +int Client::make_socket(addrinfo *addr) { + fd = util::create_nonblock_socket(addr->ai_family); + if (fd == -1) { + return -1; + } + if (config.scheme == "https") { + ssl = SSL_new(worker->ssl_ctx); + + auto config = worker->config; + + if (!util::numeric_host(config->host.c_str())) { + SSL_set_tlsext_host_name(ssl, config->host.c_str()); + } + + SSL_set_fd(ssl, fd); + SSL_set_connect_state(ssl); + } + + auto rv = ::connect(fd, addr->ai_addr, addr->ai_addrlen); + if (rv != 0 && errno != EINPROGRESS) { + if (ssl) { + SSL_free(ssl); + ssl = nullptr; + } + close(fd); + fd = -1; + return -1; + } + return 0; +} + int Client::connect() { + int rv; + record_start_time(&worker->stats); if (worker->config->conn_inactivity_timeout > 0) { ev_timer_again(worker->loop, &conn_inactivity_watcher); } - while (next_addr) { - auto addr = next_addr; - next_addr = next_addr->ai_next; - fd = util::create_nonblock_socket(addr->ai_family); + if (current_addr) { + rv = make_socket(current_addr); + if (rv == -1) { + return -1; + } + } else { + addrinfo *addr; + while (next_addr) { + addr = next_addr; + next_addr = next_addr->ai_next; + rv = make_socket(addr); + if (rv == 0) { + break; + } + } + if (fd == -1) { - continue; - } - if (config.scheme == "https") { - ssl = SSL_new(worker->ssl_ctx); - - auto config = worker->config; - - if (!util::numeric_host(config->host.c_str())) { - SSL_set_tlsext_host_name(ssl, config->host.c_str()); - } - - SSL_set_fd(ssl, fd); - SSL_set_connect_state(ssl); + return -1; } - auto rv = ::connect(fd, addr->ai_addr, addr->ai_addrlen); - if (rv != 0 && errno != EINPROGRESS) { - if (ssl) { - SSL_free(ssl); - ssl = nullptr; - } - close(fd); - fd = -1; - continue; - } - break; - } - - if (fd == -1) { - return -1; + current_addr = addr; } writefn = &Client::connected; @@ -313,9 +335,19 @@ void Client::restart_timeout() { } void Client::fail() { - process_abandoned_streams(); - disconnect(); + + if (new_connection_requested) { + new_connection_requested = false; + + // Keep using current address + if (connect() == 0) { + return; + } + std::cerr << "client could not connect to host" << std::endl; + } + + process_abandoned_streams(); } void Client::disconnect() { @@ -505,7 +537,7 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) { } void Client::on_stream_close(int32_t stream_id, bool success, - RequestStat *req_stat) { + RequestStat *req_stat, bool final) { req_stat->stream_close_time = std::chrono::steady_clock::now(); if (success) { req_stat->completed = true; @@ -525,7 +557,7 @@ void Client::on_stream_close(int32_t stream_id, bool success, return; } - if (!config.timing_script) { + if (!config.timing_script && !final) { if (req_started < req_todo) { submit_request(); return; diff --git a/src/h2load.h b/src/h2load.h index e9225c37..a4a5e5a4 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -225,6 +225,11 @@ struct Client { SSL *ssl; ev_timer request_timeout_watcher; addrinfo *next_addr; + // Address for the current address. When try_new_connection() is + // used and current_addr is not nullptr, it is used instead of + // trying next address though next_addr. To try new address, set + // nullptr to current_addr before calling connect(). + addrinfo *current_addr; size_t reqidx; ClientState state; bool first_byte_received; @@ -239,11 +244,13 @@ struct Client { ev_timer conn_active_watcher; ev_timer conn_inactivity_watcher; std::string selected_proto; + bool new_connection_requested; enum { ERR_CONNECT_FAIL = -100 }; Client(Worker *worker, size_t req_todo); ~Client(); + int make_socket(addrinfo *addr); int connect(); void disconnect(); void fail(); @@ -256,6 +263,8 @@ struct Client { void report_tls_info(); void report_app_info(); void terminate_session(); + // Asks client to create new connection, instead of just fail. + void try_new_connection(); int do_read(); int do_write(); @@ -277,7 +286,8 @@ struct Client { void on_header(int32_t stream_id, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen); void on_status_code(int32_t stream_id, uint16_t status); - void on_stream_close(int32_t stream_id, bool success, RequestStat *req_stat); + void on_stream_close(int32_t stream_id, bool success, RequestStat *req_stat, + bool final = false); void record_request_time(RequestStat *req_stat); void record_start_time(Stats *stat); diff --git a/src/h2load_http1_session.cc b/src/h2load_http1_session.cc index 965d3ca3..b354db07 100644 --- a/src/h2load_http1_session.cc +++ b/src/h2load_http1_session.cc @@ -71,11 +71,24 @@ int htp_msg_completecb(http_parser *htp) { auto session = static_cast(htp->data); auto client = session->get_client(); + auto final = http_should_keep_alive(htp) == 0; client->on_stream_close(session->stream_resp_counter_, true, - session->req_stats_[session->stream_resp_counter_]); + session->req_stats_[session->stream_resp_counter_], + final); session->stream_resp_counter_ += 2; + if (final) { + http_parser_pause(htp, 1); + // Connection is going down. If we have still request to do, + // create new connection and keep on doing the job. + if (client->req_started < client->req_todo) { + client->try_new_connection(); + } + + return 0; + } + return 0; } } // namespace @@ -157,6 +170,11 @@ int Http1Session::on_read(const uint8_t *data, size_t len) { auto htperr = HTTP_PARSER_ERRNO(&htp_); + if (htperr == HPE_PAUSED) { + // pause is done only when connection: close is requested + return -1; + } + if (htperr != HPE_OK) { std::cerr << "[ERROR] HTTP parse error: " << "(" << http_errno_name(htperr) << ") "