diff --git a/src/h2load.cc b/src/h2load.cc index 64609843..4dc68f97 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -318,7 +318,8 @@ void client_request_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { } // namespace Client::Client(uint32_t id, Worker *worker, size_t req_todo) - : cstat{}, + : wb(&worker->mcpool), + cstat{}, worker(worker), ssl(nullptr), next_addr(config.addrs), @@ -910,6 +911,10 @@ int Client::on_read(const uint8_t *data, size_t len) { } int Client::on_write() { + if (wb.rleft() >= BACKOFF_WRITE_BUFFER_THRES) { + return 0; + } + if (session->on_write() != 0) { return -1; } @@ -943,28 +948,32 @@ int Client::read_clear() { } int Client::write_clear() { + std::array iov; + for (;;) { - if (wb.rleft() > 0) { - ssize_t nwrite; - while ((nwrite = write(fd, wb.pos, wb.rleft())) == -1 && errno == EINTR) - ; - if (nwrite == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - ev_io_start(worker->loop, &wev); - return 0; - } - return -1; - } - wb.drain(nwrite); - continue; - } - wb.reset(); if (on_write() != 0) { return -1; } - if (wb.rleft() == 0) { + + auto iovcnt = wb.riovec(iov.data(), iov.size()); + + if (iovcnt == 0) { break; } + + ssize_t nwrite; + while ((nwrite = writev(fd, iov.data(), iovcnt)) == -1 && errno == EINTR) + ; + + if (nwrite == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + ev_io_start(worker->loop, &wev); + return 0; + } + return -1; + } + + wb.drain(nwrite); } ev_io_stop(worker->loop, &wev); @@ -1057,35 +1066,36 @@ int Client::read_tls() { int Client::write_tls() { ERR_clear_error(); + struct iovec iov; + for (;;) { - if (wb.rleft() > 0) { - auto rv = SSL_write(ssl, wb.pos, wb.rleft()); - - if (rv <= 0) { - auto err = SSL_get_error(ssl, rv); - switch (err) { - case SSL_ERROR_WANT_READ: - // renegotiation started - return -1; - case SSL_ERROR_WANT_WRITE: - ev_io_start(worker->loop, &wev); - return 0; - default: - return -1; - } - } - - wb.drain(rv); - - continue; - } - wb.reset(); if (on_write() != 0) { return -1; } - if (wb.rleft() == 0) { + + auto iovcnt = wb.riovec(&iov, 1); + + if (iovcnt == 0) { break; } + + auto rv = SSL_write(ssl, iov.iov_base, iov.iov_len); + + if (rv <= 0) { + auto err = SSL_get_error(ssl, rv); + switch (err) { + case SSL_ERROR_WANT_READ: + // renegotiation started + return -1; + case SSL_ERROR_WANT_WRITE: + ev_io_start(worker->loop, &wev); + return 0; + default: + return -1; + } + } + + wb.drain(rv); } ev_io_stop(worker->loop, &wev); diff --git a/src/h2load.h b/src/h2load.h index 9ba4e484..39737805 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -50,13 +50,15 @@ #include #include "http2.h" -#include "buffer.h" +#include "memchunk.h" #include "template.h" using namespace nghttp2; namespace h2load { +constexpr auto BACKOFF_WRITE_BUFFER_THRES = 16_k; + class Session; struct Worker; @@ -225,6 +227,7 @@ struct Sampling { }; struct Worker { + MemchunkPool mcpool; Stats stats; Sampling request_times_smp; Sampling client_smp; @@ -267,6 +270,7 @@ struct Stream { }; struct Client { + DefaultMemchunks wb; std::unordered_map streams; ClientStat cstat; std::unique_ptr session; @@ -293,7 +297,6 @@ struct Client { // The client id per worker uint32_t id; int fd; - Buffer<64_k> wb; ev_timer conn_active_watcher; ev_timer conn_inactivity_watcher; std::string selected_proto; diff --git a/src/h2load_http1_session.cc b/src/h2load_http1_session.cc index 968e1e32..09446092 100644 --- a/src/h2load_http1_session.cc +++ b/src/h2load_http1_session.cc @@ -174,7 +174,7 @@ int Http1Session::submit_request() { auto req_stat = client_->get_req_stat(stream_req_counter_); client_->record_request_time(req_stat); - client_->wb.write(req.c_str(), req.size()); + client_->wb.append(req); // TODO try read some data here @@ -226,8 +226,12 @@ int Http1Session::on_write() { auto req_stat = client_->get_req_stat(stream_req_counter_); auto &wb = client_->wb; + // TODO unfortunately, wb has no interface to use with read(2) + // family functions. + std::array buf; + ssize_t nread; - while ((nread = pread(config->data_fd, wb.last, wb.wleft(), + while ((nread = pread(config->data_fd, buf.data(), buf.size(), req_stat->data_offset)) == -1 && errno == EINTR) ; @@ -238,7 +242,7 @@ int Http1Session::on_write() { req_stat->data_offset += nread; - wb.write(nread); + wb.append(buf.data(), nread); if (client_->worker->config->verbose) { std::cout << "[send " << nread << " byte(s)]" << std::endl; diff --git a/src/h2load_http2_session.cc b/src/h2load_http2_session.cc index 71f1d75c..0b780128 100644 --- a/src/h2load_http2_session.cc +++ b/src/h2load_http2_session.cc @@ -169,11 +169,11 @@ ssize_t send_callback(nghttp2_session *session, const uint8_t *data, auto client = static_cast(user_data); auto &wb = client->wb; - if (wb.wleft() == 0) { + if (wb.rleft() >= BACKOFF_WRITE_BUFFER_THRES) { return NGHTTP2_ERR_WOULDBLOCK; } - return wb.write(data, length); + return wb.append(data, length); } } // namespace diff --git a/src/h2load_spdy_session.cc b/src/h2load_spdy_session.cc index 40c29d55..54083809 100644 --- a/src/h2load_spdy_session.cc +++ b/src/h2load_spdy_session.cc @@ -112,11 +112,11 @@ ssize_t send_callback(spdylay_session *session, const uint8_t *data, auto client = static_cast(user_data); auto &wb = client->wb; - if (wb.wleft() == 0) { + if (wb.rleft() >= BACKOFF_WRITE_BUFFER_THRES) { return SPDYLAY_ERR_DEFERRED; } - return wb.write(data, length); + return wb.append(data, length); } } // namespace