Merge branch 'h2load-http1-upload'
This commit is contained in:
commit
9bdf214f48
|
@ -269,7 +269,7 @@ bool check_stop_client_request_timeout(Client *client, ev_timer *w) {
|
|||
auto nreq = client->req_todo - client->req_started;
|
||||
|
||||
if (nreq == 0 ||
|
||||
client->streams.size() >= (size_t)config.max_concurrent_streams) {
|
||||
client->streams.size() >= client->session->max_concurrent_streams()) {
|
||||
// no more requests to make, stop timer
|
||||
ev_timer_stop(client->worker->loop, w);
|
||||
return true;
|
||||
|
@ -330,7 +330,8 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
|
|||
req_done(0),
|
||||
id(id),
|
||||
fd(-1),
|
||||
new_connection_requested(false) {
|
||||
new_connection_requested(false),
|
||||
final(false) {
|
||||
ev_io_init(&wev, writecb, 0, EV_WRITE);
|
||||
ev_io_init(&rev, readcb, 0, EV_READ);
|
||||
|
||||
|
@ -518,6 +519,8 @@ void Client::disconnect() {
|
|||
close(fd);
|
||||
fd = -1;
|
||||
}
|
||||
|
||||
final = false;
|
||||
}
|
||||
|
||||
int Client::submit_request() {
|
||||
|
@ -860,7 +863,7 @@ int Client::connection_made() {
|
|||
|
||||
if (!config.timing_script) {
|
||||
auto nreq =
|
||||
std::min(req_todo - req_started, (size_t)config.max_concurrent_streams);
|
||||
std::min(req_todo - req_started, session->max_concurrent_streams());
|
||||
for (; nreq > 0; --nreq) {
|
||||
if (submit_request() != 0) {
|
||||
process_request_failure();
|
||||
|
@ -2244,6 +2247,12 @@ int main(int argc, char **argv) {
|
|||
h1req += nv.value;
|
||||
h1req += "\r\n";
|
||||
}
|
||||
// TODO do this for h2 and spdy too.
|
||||
if (config.data_fd != -1) {
|
||||
h1req += "Content-Length: ";
|
||||
h1req += util::utos(config.data_length);
|
||||
h1req += "\r\n";
|
||||
}
|
||||
h1req += "\r\n";
|
||||
|
||||
config.h1reqs.push_back(std::move(h1req));
|
||||
|
|
|
@ -298,6 +298,9 @@ struct Client {
|
|||
ev_timer conn_inactivity_watcher;
|
||||
std::string selected_proto;
|
||||
bool new_connection_requested;
|
||||
// true if the current connection will be closed, and no more new
|
||||
// request cannot be processed.
|
||||
bool final;
|
||||
|
||||
enum { ERR_CONNECT_FAIL = -100 };
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ namespace {
|
|||
int htp_msg_begincb(http_parser *htp) {
|
||||
auto session = static_cast<Http1Session *>(htp->data);
|
||||
|
||||
if (session->stream_resp_counter_ >= session->stream_req_counter_) {
|
||||
if (session->stream_resp_counter_ > session->stream_req_counter_) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -82,16 +82,21 @@ int htp_msg_completecb(http_parser *htp) {
|
|||
auto session = static_cast<Http1Session *>(htp->data);
|
||||
auto client = session->get_client();
|
||||
|
||||
auto final = http_should_keep_alive(htp) == 0;
|
||||
client->final = http_should_keep_alive(htp) == 0;
|
||||
auto req_stat = client->get_req_stat(session->stream_resp_counter_);
|
||||
|
||||
assert(req_stat);
|
||||
|
||||
client->on_stream_close(session->stream_resp_counter_, true, final);
|
||||
auto config = client->worker->config;
|
||||
if (req_stat->data_offset >= config->data_length) {
|
||||
client->on_stream_close(session->stream_resp_counter_, true, client->final);
|
||||
}
|
||||
|
||||
session->stream_resp_counter_ += 2;
|
||||
|
||||
if (final) {
|
||||
if (client->final) {
|
||||
session->stream_req_counter_ = session->stream_resp_counter_;
|
||||
|
||||
http_parser_pause(htp, 1);
|
||||
// Connection is going down. If we have still request to do,
|
||||
// create new connection and keep on doing the job.
|
||||
|
@ -171,8 +176,12 @@ int Http1Session::submit_request() {
|
|||
client_->record_request_time(req_stat);
|
||||
client_->wb.write(req.c_str(), req.size());
|
||||
|
||||
// TODO try read some data here
|
||||
|
||||
if (config->data_fd == -1 || config->data_length == 0) {
|
||||
// increment for next request
|
||||
stream_req_counter_ += 2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -206,6 +215,47 @@ int Http1Session::on_write() {
|
|||
if (complete_) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto config = client_->worker->config;
|
||||
auto req_stat = client_->get_req_stat(stream_req_counter_);
|
||||
if (!req_stat) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (req_stat->data_offset < config->data_length) {
|
||||
auto req_stat = client_->get_req_stat(stream_req_counter_);
|
||||
auto &wb = client_->wb;
|
||||
|
||||
ssize_t nread;
|
||||
while ((nread = pread(config->data_fd, wb.last, wb.wleft(),
|
||||
req_stat->data_offset)) == -1 &&
|
||||
errno == EINTR)
|
||||
;
|
||||
|
||||
if (nread == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
req_stat->data_offset += nread;
|
||||
|
||||
wb.write(nread);
|
||||
|
||||
if (client_->worker->config->verbose) {
|
||||
std::cout << "[send " << nread << " byte(s)]" << std::endl;
|
||||
}
|
||||
|
||||
if (req_stat->data_offset == config->data_length) {
|
||||
// increment for next request
|
||||
stream_req_counter_ += 2;
|
||||
|
||||
if (stream_resp_counter_ == stream_req_counter_) {
|
||||
// Response has already been received
|
||||
client_->on_stream_close(stream_resp_counter_ - 2, true,
|
||||
client_->final);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -213,4 +263,10 @@ void Http1Session::terminate() { complete_ = true; }
|
|||
|
||||
Client *Http1Session::get_client() { return client_; }
|
||||
|
||||
size_t Http1Session::max_concurrent_streams() {
|
||||
auto config = client_->worker->config;
|
||||
|
||||
return config->data_fd == -1 ? config->max_concurrent_streams : 1;
|
||||
}
|
||||
|
||||
} // namespace h2load
|
||||
|
|
|
@ -42,6 +42,7 @@ public:
|
|||
virtual int on_read(const uint8_t *data, size_t len);
|
||||
virtual int on_write();
|
||||
virtual void terminate();
|
||||
virtual size_t max_concurrent_streams();
|
||||
Client *get_client();
|
||||
int32_t stream_req_counter_;
|
||||
int32_t stream_resp_counter_;
|
||||
|
|
|
@ -289,4 +289,8 @@ void Http2Session::terminate() {
|
|||
nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR);
|
||||
}
|
||||
|
||||
size_t Http2Session::max_concurrent_streams() {
|
||||
return (size_t)client_->worker->config->max_concurrent_streams;
|
||||
}
|
||||
|
||||
} // namespace h2load
|
||||
|
|
|
@ -42,6 +42,7 @@ public:
|
|||
virtual int on_read(const uint8_t *data, size_t len);
|
||||
virtual int on_write();
|
||||
virtual void terminate();
|
||||
virtual size_t max_concurrent_streams();
|
||||
|
||||
private:
|
||||
Client *client_;
|
||||
|
|
|
@ -50,6 +50,8 @@ public:
|
|||
virtual int on_write() = 0;
|
||||
// Called when the underlying session must be terminated.
|
||||
virtual void terminate() = 0;
|
||||
// Return the maximum concurrency per connection
|
||||
virtual size_t max_concurrent_streams() = 0;
|
||||
};
|
||||
|
||||
} // namespace h2load
|
||||
|
|
|
@ -282,4 +282,8 @@ void SpdySession::handle_window_update(int32_t stream_id, size_t recvlen) {
|
|||
}
|
||||
}
|
||||
|
||||
size_t SpdySession::max_concurrent_streams() {
|
||||
return (size_t)client_->worker->config->max_concurrent_streams;
|
||||
}
|
||||
|
||||
} // namespace h2load
|
||||
|
|
|
@ -44,6 +44,7 @@ public:
|
|||
virtual int on_read(const uint8_t *data, size_t len);
|
||||
virtual int on_write();
|
||||
virtual void terminate();
|
||||
virtual size_t max_concurrent_streams();
|
||||
void handle_window_update(int32_t stream_id, size_t recvlen);
|
||||
|
||||
private:
|
||||
|
|
Loading…
Reference in New Issue