h2load: Use memchunks
This commit is contained in:
parent
1a2dc1e822
commit
9f6c947a87
|
@ -318,7 +318,8 @@ void client_request_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
|
|||
} // namespace
|
||||
|
||||
Client::Client(uint32_t id, Worker *worker, size_t req_todo)
|
||||
: cstat{},
|
||||
: wb(&worker->mcpool),
|
||||
cstat{},
|
||||
worker(worker),
|
||||
ssl(nullptr),
|
||||
next_addr(config.addrs),
|
||||
|
@ -910,6 +911,10 @@ int Client::on_read(const uint8_t *data, size_t len) {
|
|||
}
|
||||
|
||||
int Client::on_write() {
|
||||
if (wb.rleft() >= BACKOFF_WRITE_BUFFER_THRES) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (session->on_write() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -943,28 +948,32 @@ int Client::read_clear() {
|
|||
}
|
||||
|
||||
int Client::write_clear() {
|
||||
std::array<struct iovec, 2> iov;
|
||||
|
||||
for (;;) {
|
||||
if (wb.rleft() > 0) {
|
||||
ssize_t nwrite;
|
||||
while ((nwrite = write(fd, wb.pos, wb.rleft())) == -1 && errno == EINTR)
|
||||
;
|
||||
if (nwrite == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
ev_io_start(worker->loop, &wev);
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
wb.drain(nwrite);
|
||||
continue;
|
||||
}
|
||||
wb.reset();
|
||||
if (on_write() != 0) {
|
||||
return -1;
|
||||
}
|
||||
if (wb.rleft() == 0) {
|
||||
|
||||
auto iovcnt = wb.riovec(iov.data(), iov.size());
|
||||
|
||||
if (iovcnt == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
ssize_t nwrite;
|
||||
while ((nwrite = writev(fd, iov.data(), iovcnt)) == -1 && errno == EINTR)
|
||||
;
|
||||
|
||||
if (nwrite == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
ev_io_start(worker->loop, &wev);
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
wb.drain(nwrite);
|
||||
}
|
||||
|
||||
ev_io_stop(worker->loop, &wev);
|
||||
|
@ -1057,35 +1066,36 @@ int Client::read_tls() {
|
|||
int Client::write_tls() {
|
||||
ERR_clear_error();
|
||||
|
||||
struct iovec iov;
|
||||
|
||||
for (;;) {
|
||||
if (wb.rleft() > 0) {
|
||||
auto rv = SSL_write(ssl, wb.pos, wb.rleft());
|
||||
|
||||
if (rv <= 0) {
|
||||
auto err = SSL_get_error(ssl, rv);
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
// renegotiation started
|
||||
return -1;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
ev_io_start(worker->loop, &wev);
|
||||
return 0;
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
wb.drain(rv);
|
||||
|
||||
continue;
|
||||
}
|
||||
wb.reset();
|
||||
if (on_write() != 0) {
|
||||
return -1;
|
||||
}
|
||||
if (wb.rleft() == 0) {
|
||||
|
||||
auto iovcnt = wb.riovec(&iov, 1);
|
||||
|
||||
if (iovcnt == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
auto rv = SSL_write(ssl, iov.iov_base, iov.iov_len);
|
||||
|
||||
if (rv <= 0) {
|
||||
auto err = SSL_get_error(ssl, rv);
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
// renegotiation started
|
||||
return -1;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
ev_io_start(worker->loop, &wev);
|
||||
return 0;
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
wb.drain(rv);
|
||||
}
|
||||
|
||||
ev_io_stop(worker->loop, &wev);
|
||||
|
|
|
@ -50,13 +50,15 @@
|
|||
#include <openssl/ssl.h>
|
||||
|
||||
#include "http2.h"
|
||||
#include "buffer.h"
|
||||
#include "memchunk.h"
|
||||
#include "template.h"
|
||||
|
||||
using namespace nghttp2;
|
||||
|
||||
namespace h2load {
|
||||
|
||||
constexpr auto BACKOFF_WRITE_BUFFER_THRES = 16_k;
|
||||
|
||||
class Session;
|
||||
struct Worker;
|
||||
|
||||
|
@ -225,6 +227,7 @@ struct Sampling {
|
|||
};
|
||||
|
||||
struct Worker {
|
||||
MemchunkPool mcpool;
|
||||
Stats stats;
|
||||
Sampling request_times_smp;
|
||||
Sampling client_smp;
|
||||
|
@ -267,6 +270,7 @@ struct Stream {
|
|||
};
|
||||
|
||||
struct Client {
|
||||
DefaultMemchunks wb;
|
||||
std::unordered_map<int32_t, Stream> streams;
|
||||
ClientStat cstat;
|
||||
std::unique_ptr<Session> session;
|
||||
|
@ -293,7 +297,6 @@ struct Client {
|
|||
// The client id per worker
|
||||
uint32_t id;
|
||||
int fd;
|
||||
Buffer<64_k> wb;
|
||||
ev_timer conn_active_watcher;
|
||||
ev_timer conn_inactivity_watcher;
|
||||
std::string selected_proto;
|
||||
|
|
|
@ -174,7 +174,7 @@ int Http1Session::submit_request() {
|
|||
auto req_stat = client_->get_req_stat(stream_req_counter_);
|
||||
|
||||
client_->record_request_time(req_stat);
|
||||
client_->wb.write(req.c_str(), req.size());
|
||||
client_->wb.append(req);
|
||||
|
||||
// TODO try read some data here
|
||||
|
||||
|
@ -226,8 +226,12 @@ int Http1Session::on_write() {
|
|||
auto req_stat = client_->get_req_stat(stream_req_counter_);
|
||||
auto &wb = client_->wb;
|
||||
|
||||
// TODO unfortunately, wb has no interface to use with read(2)
|
||||
// family functions.
|
||||
std::array<uint8_t, 16_k> buf;
|
||||
|
||||
ssize_t nread;
|
||||
while ((nread = pread(config->data_fd, wb.last, wb.wleft(),
|
||||
while ((nread = pread(config->data_fd, buf.data(), buf.size(),
|
||||
req_stat->data_offset)) == -1 &&
|
||||
errno == EINTR)
|
||||
;
|
||||
|
@ -238,7 +242,7 @@ int Http1Session::on_write() {
|
|||
|
||||
req_stat->data_offset += nread;
|
||||
|
||||
wb.write(nread);
|
||||
wb.append(buf.data(), nread);
|
||||
|
||||
if (client_->worker->config->verbose) {
|
||||
std::cout << "[send " << nread << " byte(s)]" << std::endl;
|
||||
|
|
|
@ -169,11 +169,11 @@ ssize_t send_callback(nghttp2_session *session, const uint8_t *data,
|
|||
auto client = static_cast<Client *>(user_data);
|
||||
auto &wb = client->wb;
|
||||
|
||||
if (wb.wleft() == 0) {
|
||||
if (wb.rleft() >= BACKOFF_WRITE_BUFFER_THRES) {
|
||||
return NGHTTP2_ERR_WOULDBLOCK;
|
||||
}
|
||||
|
||||
return wb.write(data, length);
|
||||
return wb.append(data, length);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
|
|
|
@ -112,11 +112,11 @@ ssize_t send_callback(spdylay_session *session, const uint8_t *data,
|
|||
auto client = static_cast<Client *>(user_data);
|
||||
auto &wb = client->wb;
|
||||
|
||||
if (wb.wleft() == 0) {
|
||||
if (wb.rleft() >= BACKOFF_WRITE_BUFFER_THRES) {
|
||||
return SPDYLAY_ERR_DEFERRED;
|
||||
}
|
||||
|
||||
return wb.write(data, length);
|
||||
return wb.append(data, length);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
|
|
Loading…
Reference in New Issue