From 7469139dda6efbb39289331791659599854a60a5 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sun, 12 Jun 2016 17:42:12 +0900 Subject: [PATCH] h2load: Implement HTTP/1 upload h2load has supported uploading a file quite a while, but it turns out that it worked with HTTP/2 and SPDY only. HTTP/1 with upload did not work. This commit fixes this bug, and implement HTTP/1 upload. Due to architectural limitation of h2load, when -d option is used, the number of in-flight pipe-lined requests is set to 1. --- src/h2load.cc | 15 ++++++-- src/h2load.h | 3 ++ src/h2load_http1_session.cc | 68 +++++++++++++++++++++++++++++++++---- src/h2load_http1_session.h | 1 + src/h2load_http2_session.cc | 4 +++ src/h2load_http2_session.h | 1 + src/h2load_session.h | 2 ++ src/h2load_spdy_session.cc | 4 +++ src/h2load_spdy_session.h | 1 + 9 files changed, 90 insertions(+), 9 deletions(-) diff --git a/src/h2load.cc b/src/h2load.cc index b70a0084..b810529f 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -269,7 +269,7 @@ bool check_stop_client_request_timeout(Client *client, ev_timer *w) { auto nreq = client->req_todo - client->req_started; if (nreq == 0 || - client->streams.size() >= (size_t)config.max_concurrent_streams) { + client->streams.size() >= client->session->max_concurrent_streams()) { // no more requests to make, stop timer ev_timer_stop(client->worker->loop, w); return true; @@ -330,7 +330,8 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo) req_done(0), id(id), fd(-1), - new_connection_requested(false) { + new_connection_requested(false), + final(false) { ev_io_init(&wev, writecb, 0, EV_WRITE); ev_io_init(&rev, readcb, 0, EV_READ); @@ -518,6 +519,8 @@ void Client::disconnect() { close(fd); fd = -1; } + + final = false; } int Client::submit_request() { @@ -860,7 +863,7 @@ int Client::connection_made() { if (!config.timing_script) { auto nreq = - std::min(req_todo - req_started, (size_t)config.max_concurrent_streams); + std::min(req_todo - req_started, session->max_concurrent_streams()); for (; nreq > 0; --nreq) { if (submit_request() != 0) { process_request_failure(); @@ -2244,6 +2247,12 @@ int main(int argc, char **argv) { h1req += nv.value; h1req += "\r\n"; } + // TODO do this for h2 and spdy too. + if (config.data_fd != -1) { + h1req += "Content-Length: "; + h1req += util::utos(config.data_length); + h1req += "\r\n"; + } h1req += "\r\n"; config.h1reqs.push_back(std::move(h1req)); diff --git a/src/h2load.h b/src/h2load.h index de11b4b4..9ba4e484 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -298,6 +298,9 @@ struct Client { ev_timer conn_inactivity_watcher; std::string selected_proto; bool new_connection_requested; + // true if the current connection will be closed, and no more new + // request cannot be processed. + bool final; enum { ERR_CONNECT_FAIL = -100 }; diff --git a/src/h2load_http1_session.cc b/src/h2load_http1_session.cc index 4f4b6c59..968e1e32 100644 --- a/src/h2load_http1_session.cc +++ b/src/h2load_http1_session.cc @@ -57,7 +57,7 @@ namespace { int htp_msg_begincb(http_parser *htp) { auto session = static_cast(htp->data); - if (session->stream_resp_counter_ >= session->stream_req_counter_) { + if (session->stream_resp_counter_ > session->stream_req_counter_) { return -1; } @@ -82,16 +82,21 @@ int htp_msg_completecb(http_parser *htp) { auto session = static_cast(htp->data); auto client = session->get_client(); - auto final = http_should_keep_alive(htp) == 0; + client->final = http_should_keep_alive(htp) == 0; auto req_stat = client->get_req_stat(session->stream_resp_counter_); assert(req_stat); - client->on_stream_close(session->stream_resp_counter_, true, final); + auto config = client->worker->config; + if (req_stat->data_offset >= config->data_length) { + client->on_stream_close(session->stream_resp_counter_, true, client->final); + } session->stream_resp_counter_ += 2; - if (final) { + if (client->final) { + session->stream_req_counter_ = session->stream_resp_counter_; + http_parser_pause(htp, 1); // Connection is going down. If we have still request to do, // create new connection and keep on doing the job. @@ -171,8 +176,12 @@ int Http1Session::submit_request() { client_->record_request_time(req_stat); client_->wb.write(req.c_str(), req.size()); - // increment for next request - stream_req_counter_ += 2; + // TODO try read some data here + + if (config->data_fd == -1 || config->data_length == 0) { + // increment for next request + stream_req_counter_ += 2; + } return 0; } @@ -206,6 +215,47 @@ int Http1Session::on_write() { if (complete_) { return -1; } + + auto config = client_->worker->config; + auto req_stat = client_->get_req_stat(stream_req_counter_); + if (!req_stat) { + return 0; + } + + if (req_stat->data_offset < config->data_length) { + auto req_stat = client_->get_req_stat(stream_req_counter_); + auto &wb = client_->wb; + + ssize_t nread; + while ((nread = pread(config->data_fd, wb.last, wb.wleft(), + req_stat->data_offset)) == -1 && + errno == EINTR) + ; + + if (nread == -1) { + return -1; + } + + req_stat->data_offset += nread; + + wb.write(nread); + + if (client_->worker->config->verbose) { + std::cout << "[send " << nread << " byte(s)]" << std::endl; + } + + if (req_stat->data_offset == config->data_length) { + // increment for next request + stream_req_counter_ += 2; + + if (stream_resp_counter_ == stream_req_counter_) { + // Response has already been received + client_->on_stream_close(stream_resp_counter_ - 2, true, + client_->final); + } + } + } + return 0; } @@ -213,4 +263,10 @@ void Http1Session::terminate() { complete_ = true; } Client *Http1Session::get_client() { return client_; } +size_t Http1Session::max_concurrent_streams() { + auto config = client_->worker->config; + + return config->data_fd == -1 ? config->max_concurrent_streams : 1; +} + } // namespace h2load diff --git a/src/h2load_http1_session.h b/src/h2load_http1_session.h index 1b48c9c4..3a0b5db5 100644 --- a/src/h2load_http1_session.h +++ b/src/h2load_http1_session.h @@ -42,6 +42,7 @@ public: virtual int on_read(const uint8_t *data, size_t len); virtual int on_write(); virtual void terminate(); + virtual size_t max_concurrent_streams(); Client *get_client(); int32_t stream_req_counter_; int32_t stream_resp_counter_; diff --git a/src/h2load_http2_session.cc b/src/h2load_http2_session.cc index 5e333964..71f1d75c 100644 --- a/src/h2load_http2_session.cc +++ b/src/h2load_http2_session.cc @@ -289,4 +289,8 @@ void Http2Session::terminate() { nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR); } +size_t Http2Session::max_concurrent_streams() { + return (size_t)client_->worker->config->max_concurrent_streams; +} + } // namespace h2load diff --git a/src/h2load_http2_session.h b/src/h2load_http2_session.h index 2d8b1fa8..1b9b9f77 100644 --- a/src/h2load_http2_session.h +++ b/src/h2load_http2_session.h @@ -42,6 +42,7 @@ public: virtual int on_read(const uint8_t *data, size_t len); virtual int on_write(); virtual void terminate(); + virtual size_t max_concurrent_streams(); private: Client *client_; diff --git a/src/h2load_session.h b/src/h2load_session.h index 17103aa9..ab3b8ec8 100644 --- a/src/h2load_session.h +++ b/src/h2load_session.h @@ -50,6 +50,8 @@ public: virtual int on_write() = 0; // Called when the underlying session must be terminated. virtual void terminate() = 0; + // Return the maximum concurrency per connection + virtual size_t max_concurrent_streams() = 0; }; } // namespace h2load diff --git a/src/h2load_spdy_session.cc b/src/h2load_spdy_session.cc index 4afa13e1..40c29d55 100644 --- a/src/h2load_spdy_session.cc +++ b/src/h2load_spdy_session.cc @@ -282,4 +282,8 @@ void SpdySession::handle_window_update(int32_t stream_id, size_t recvlen) { } } +size_t SpdySession::max_concurrent_streams() { + return (size_t)client_->worker->config->max_concurrent_streams; +} + } // namespace h2load diff --git a/src/h2load_spdy_session.h b/src/h2load_spdy_session.h index 44fd4d55..c7721fac 100644 --- a/src/h2load_spdy_session.h +++ b/src/h2load_spdy_session.h @@ -44,6 +44,7 @@ public: virtual int on_read(const uint8_t *data, size_t len); virtual int on_write(); virtual void terminate(); + virtual size_t max_concurrent_streams(); void handle_window_update(int32_t stream_id, size_t recvlen); private: