src: Avoid copy in evbuffer_pullup()

Previously, we use evbuffer_pullup(buf, -1) to linearize the memory
region and it may cause buffer copy.  To avoid this, we use the return
value of evbuffer_get_contiguous_space() as 2nd parameter.  According
to the libevent manual, by doing so evbuffer_pullup() will not copy or
modify any data in evbuffer.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-06-01 21:01:01 +09:00
parent cc250386df
commit 8c67bbe3a8
7 changed files with 350 additions and 177 deletions

View File

@ -153,16 +153,33 @@ void Http2Session::submit_request()
ssize_t Http2Session::on_read() ssize_t Http2Session::on_read()
{ {
int rv; int rv;
size_t nread = 0;
auto input = bufferevent_get_input(client_->bev); auto input = bufferevent_get_input(client_->bev);
auto inputlen = evbuffer_get_length(input);
auto mem = evbuffer_pullup(input, -1); for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return nread;
}
auto mem = evbuffer_pullup(input, inputlen);
rv = nghttp2_session_mem_recv(session_, mem, inputlen); rv = nghttp2_session_mem_recv(session_, mem, inputlen);
if(rv < 0) { if(rv < 0) {
return -1; return -1;
} }
evbuffer_drain(input, rv);
return rv; nread += rv;
if(evbuffer_drain(input, rv) != 0) {
return -1;
}
}
} }
int Http2Session::on_write() int Http2Session::on_write()

View File

@ -24,6 +24,8 @@
*/ */
#include "h2load_spdy_session.h" #include "h2load_spdy_session.h"
#include <cassert>
#include "h2load.h" #include "h2load.h"
namespace h2load { namespace h2load {
@ -163,16 +165,32 @@ void SpdySession::submit_request()
ssize_t SpdySession::on_read() ssize_t SpdySession::on_read()
{ {
int rv; int rv;
size_t nread = 0;
auto input = bufferevent_get_input(client_->bev); auto input = bufferevent_get_input(client_->bev);
auto inputlen = evbuffer_get_length(input);
auto mem = evbuffer_pullup(input, -1); for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return nread;
}
auto mem = evbuffer_pullup(input, inputlen);
rv = spdylay_session_mem_recv(session_, mem, inputlen); rv = spdylay_session_mem_recv(session_, mem, inputlen);
if(rv < 0) { if(rv < 0) {
return -1; return -1;
} }
evbuffer_drain(input, rv);
return rv; nread += rv;
if(evbuffer_drain(input, rv) != 0) {
return -1;
}
}
} }
int SpdySession::on_write() int SpdySession::on_write()

View File

@ -651,55 +651,77 @@ struct HttpClient {
{ {
int rv; int rv;
auto input = bufferevent_get_input(bev); auto input = bufferevent_get_input(bev);
auto inputlen = evbuffer_get_length(input);
for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) { if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return 0; return 0;
} }
auto mem = evbuffer_pullup(input, -1);
auto mem = evbuffer_pullup(input, inputlen);
auto nread = http_parser_execute(htp.get(), &htp_hooks, auto nread = http_parser_execute(htp.get(), &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
inputlen); inputlen);
if(config.verbose) { if(config.verbose) {
std::cout.write(reinterpret_cast<const char*>(mem), nread); std::cout.write(reinterpret_cast<const char*>(mem), nread);
} }
evbuffer_drain(input, nread);
auto htperr = HTTP_PARSER_ERRNO(htp.get()); if(evbuffer_drain(input, nread) != 0) {
if(htperr == HPE_OK) {
if(upgrade_response_complete) {
if(config.verbose) {
std::cout << std::endl;
}
if(upgrade_response_status_code == 101) {
if(config.verbose) {
print_timer();
std::cout << " HTTP Upgrade success" << std::endl;
}
bufferevent_setcb(bev, readcb, writecb, eventcb, this);
rv = on_connect();
if(rv != 0) {
return rv;
}
// Read remaining data in the buffer because it is not
// notified callback anymore.
rv = on_read();
if(rv != 0) {
return rv;
}
} else {
std::cerr << "HTTP Upgrade failed" << std::endl;
return -1; return -1;
} }
}
} else { auto htperr = HTTP_PARSER_ERRNO(htp.get());
if(htperr != HPE_OK) {
std::cerr << "Failed to parse HTTP Upgrade response header: " std::cerr << "Failed to parse HTTP Upgrade response header: "
<< "(" << http_errno_name(htperr) << ") " << "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr) << std::endl; << http_errno_description(htperr) << std::endl;
return -1; return -1;
} }
if(upgrade_response_complete) {
if(config.verbose) {
std::cout << std::endl;
}
if(upgrade_response_status_code == 101) {
if(config.verbose) {
print_timer();
std::cout << " HTTP Upgrade success" << std::endl;
}
bufferevent_setcb(bev, readcb, writecb, eventcb, this);
rv = on_connect();
if(rv != 0) {
return rv;
}
// Read remaining data in the buffer because it is not
// notified callback anymore.
rv = on_read();
if(rv != 0) {
return rv;
}
return 0; return 0;
} }
std::cerr << "HTTP Upgrade failed" << std::endl;
return -1;
}
}
}
int on_connect() int on_connect()
{ {
int rv; int rv;
@ -776,18 +798,30 @@ struct HttpClient {
{ {
int rv; int rv;
auto input = bufferevent_get_input(bev); auto input = bufferevent_get_input(bev);
auto inputlen = evbuffer_get_length(input);
auto mem = evbuffer_pullup(input, -1); for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return on_write();
}
auto mem = evbuffer_pullup(input, inputlen);
rv = nghttp2_session_mem_recv(session, mem, inputlen); rv = nghttp2_session_mem_recv(session, mem, inputlen);
if(rv < 0) { if(rv < 0) {
std::cerr << "nghttp2_session_mem_recv() returned error: " std::cerr << "nghttp2_session_mem_recv() returned error: "
<< nghttp2_strerror(rv) << std::endl; << nghttp2_strerror(rv) << std::endl;
return -1; return -1;
} }
evbuffer_drain(input, rv);
return on_write(); if(evbuffer_drain(input, rv) != 0) {
return -1;
}
}
} }
int on_write() int on_write()

View File

@ -541,23 +541,34 @@ http_parser_settings htp_hooks = {
int Http2Session::on_read_proxy() int Http2Session::on_read_proxy()
{ {
auto input = bufferevent_get_input(bev_); auto input = bufferevent_get_input(bev_);
auto mem = evbuffer_pullup(input, -1);
for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return 0;
}
auto mem = evbuffer_pullup(input, inputlen);
size_t nread = http_parser_execute(proxy_htp_.get(), &htp_hooks, size_t nread = http_parser_execute(proxy_htp_.get(), &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
evbuffer_get_length(input)); inputlen);
if(evbuffer_drain(input, nread) != 0) { if(evbuffer_drain(input, nread) != 0) {
SSLOG(FATAL, this) << "evbuffer_drain() failed"; SSLOG(FATAL, this) << "evbuffer_drain() failed";
return -1; return -1;
} }
auto htperr = HTTP_PARSER_ERRNO(proxy_htp_.get()); auto htperr = HTTP_PARSER_ERRNO(proxy_htp_.get());
if(htperr == HPE_OK) {
return 0; if(htperr != HPE_OK) {
} else {
return -1; return -1;
} }
} }
}
void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn) void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn)
{ {
@ -1250,17 +1261,31 @@ int Http2Session::on_read()
{ {
ssize_t rv = 0; ssize_t rv = 0;
auto input = bufferevent_get_input(bev_); auto input = bufferevent_get_input(bev_);
auto inputlen = evbuffer_get_length(input);
auto mem = evbuffer_pullup(input, -1); for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return send();
}
auto mem = evbuffer_pullup(input, inputlen);
rv = nghttp2_session_mem_recv(session_, mem, inputlen); rv = nghttp2_session_mem_recv(session_, mem, inputlen);
if(rv < 0) { if(rv < 0) {
SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: " SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: "
<< nghttp2_strerror(rv); << nghttp2_strerror(rv);
return -1; return -1;
} }
evbuffer_drain(input, rv);
return send(); if(evbuffer_drain(input, rv) != 0) {
SSLOG(FATAL, this) << "evbuffer_drain() faild";
return -1;
}
}
} }
int Http2Session::on_write() int Http2Session::on_write()

View File

@ -594,8 +594,17 @@ int Http2Upstream::on_read()
ssize_t rv = 0; ssize_t rv = 0;
auto bev = handler_->get_bev(); auto bev = handler_->get_bev();
auto input = bufferevent_get_input(bev); auto input = bufferevent_get_input(bev);
auto inputlen = evbuffer_get_length(input);
auto mem = evbuffer_pullup(input, -1); for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return send();
}
auto mem = evbuffer_pullup(input, inputlen);
rv = nghttp2_session_mem_recv(session_, mem, inputlen); rv = nghttp2_session_mem_recv(session_, mem, inputlen);
if(rv < 0) { if(rv < 0) {
@ -603,8 +612,12 @@ int Http2Upstream::on_read()
<< nghttp2_strerror(rv); << nghttp2_strerror(rv);
return -1; return -1;
} }
evbuffer_drain(input, rv);
return send(); if(evbuffer_drain(input, rv) != 0) {
DCLOG(FATAL, this) << "evbuffer_drain() failed";
return -1;
}
}
} }
int Http2Upstream::on_write() int Http2Upstream::on_write()

View File

@ -502,10 +502,20 @@ http_parser_settings htp_hooks = {
int HttpDownstreamConnection::on_read() int HttpDownstreamConnection::on_read()
{ {
auto input = bufferevent_get_input(bev_); auto input = bufferevent_get_input(bev_);
size_t inputlen = evbuffer_get_length(input);
auto mem = evbuffer_pullup(input, -1);
if(downstream_->get_upgraded()) { if(downstream_->get_upgraded()) {
// For upgraded connection, just pass data to the upstream. // For upgraded connection, just pass data to the upstream.
for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return 0;
}
auto mem = evbuffer_pullup(input, inputlen);
int rv; int rv;
rv = downstream_->get_upstream()->on_downstream_body rv = downstream_->get_upstream()->on_downstream_body
(downstream_, reinterpret_cast<const uint8_t*>(mem), inputlen, true); (downstream_, reinterpret_cast<const uint8_t*>(mem), inputlen, true);
@ -516,9 +526,21 @@ int HttpDownstreamConnection::on_read()
DCLOG(FATAL, this) << "evbuffer_drain() failed"; DCLOG(FATAL, this) << "evbuffer_drain() failed";
return -1; return -1;
} }
}
}
for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
assert(evbuffer_get_length(input) == 0);
return 0; return 0;
} }
size_t nread = http_parser_execute(&response_htp_, &htp_hooks,
auto mem = evbuffer_pullup(input, inputlen);
auto nread = http_parser_execute(&response_htp_, &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
inputlen); inputlen);
@ -526,18 +548,20 @@ int HttpDownstreamConnection::on_read()
DCLOG(FATAL, this) << "evbuffer_drain() failed"; DCLOG(FATAL, this) << "evbuffer_drain() failed";
return -1; return -1;
} }
auto htperr = HTTP_PARSER_ERRNO(&response_htp_); auto htperr = HTTP_PARSER_ERRNO(&response_htp_);
if(htperr == HPE_OK) {
return 0; if(htperr != HPE_OK) {
} else {
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "HTTP parser failure: " DCLOG(INFO, this) << "HTTP parser failure: "
<< "(" << http_errno_name(htperr) << ") " << "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr); << http_errno_description(htperr);
} }
return SHRPX_ERR_HTTP_PARSE; return SHRPX_ERR_HTTP_PARSE;
} }
} }
}
int HttpDownstreamConnection::on_write() int HttpDownstreamConnection::on_write()
{ {

View File

@ -264,49 +264,75 @@ int HttpsUpstream::on_read()
{ {
auto bev = handler_->get_bev(); auto bev = handler_->get_bev();
auto input = bufferevent_get_input(bev); auto input = bufferevent_get_input(bev);
size_t inputlen = evbuffer_get_length(input); auto downstream = get_downstream();
auto mem = evbuffer_pullup(input, -1);
// downstream can be nullptr here, because it is initialized in the
// callback chain called by http_parser_execute()
if(downstream && downstream->get_upgraded()) {
for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) { if(inputlen == 0) {
return 0; return 0;
} }
auto downstream = get_downstream();
// downstream can be nullptr here, because it is initialized in the auto mem = evbuffer_pullup(input, inputlen);
// callback chain called by http_parser_execute()
if(downstream && downstream->get_upgraded()) { auto rv = downstream->push_upload_data_chunk
int rv = downstream->push_upload_data_chunk
(reinterpret_cast<const uint8_t*>(mem), inputlen); (reinterpret_cast<const uint8_t*>(mem), inputlen);
if(rv != 0) { if(rv != 0) {
return -1; return -1;
} }
if(evbuffer_drain(input, inputlen) != 0) { if(evbuffer_drain(input, inputlen) != 0) {
ULOG(FATAL, this) << "evbuffer_drain() failed"; ULOG(FATAL, this) << "evbuffer_drain() failed";
return -1; return -1;
} }
if(downstream->get_output_buffer_full()) { if(downstream->get_output_buffer_full()) {
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream output buffer is full"; ULOG(INFO, this) << "Downstream output buffer is full";
} }
pause_read(SHRPX_NO_BUFFER); pause_read(SHRPX_NO_BUFFER);
return 0;
} }
}
}
for(;;) {
auto inputlen = evbuffer_get_contiguous_space(input);
if(inputlen == 0) {
return 0; return 0;
} }
size_t nread = http_parser_execute(&htp_, &htp_hooks, auto mem = evbuffer_pullup(input, inputlen);
auto nread = http_parser_execute(&htp_, &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
inputlen); inputlen);
if(evbuffer_drain(input, nread) != 0) { if(evbuffer_drain(input, nread) != 0) {
ULOG(FATAL, this) << "evbuffer_drain() failed"; ULOG(FATAL, this) << "evbuffer_drain() failed";
return -1; return -1;
} }
// Well, actually header length + some body bytes // Well, actually header length + some body bytes
current_header_length_ += nread; current_header_length_ += nread;
// Get downstream again because it may be initialized in http parser // Get downstream again because it may be initialized in http parser
// execution // execution
downstream = get_downstream(); downstream = get_downstream();
auto handler = get_client_handler(); auto handler = get_client_handler();
auto htperr = HTTP_PARSER_ERRNO(&htp_); auto htperr = HTTP_PARSER_ERRNO(&htp_);
if(htperr == HPE_PAUSED) { if(htperr == HPE_PAUSED) {
assert(downstream);
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
handler->set_should_close_after_write(true); handler->set_should_close_after_write(true);
// Following paues_read is needed to avoid reading next data. // Following paues_read is needed to avoid reading next data.
@ -315,48 +341,64 @@ int HttpsUpstream::on_read()
return -1; return -1;
} }
// Downstream gets deleted after response body is read. // Downstream gets deleted after response body is read.
} else { return 0;
}
assert(downstream->get_request_state() == Downstream::MSG_COMPLETE); assert(downstream->get_request_state() == Downstream::MSG_COMPLETE);
if(downstream->get_downstream_connection() == 0) {
if(downstream->get_downstream_connection() == nullptr) {
// Error response has already be sent // Error response has already be sent
assert(downstream->get_response_state() == Downstream::MSG_COMPLETE); assert(downstream->get_response_state() == Downstream::MSG_COMPLETE);
delete_downstream(); delete_downstream();
} else {
return 0;
}
if(handler->get_http2_upgrade_allowed() && if(handler->get_http2_upgrade_allowed() &&
downstream->http2_upgrade_request()) { downstream->http2_upgrade_request()) {
if(handler->perform_http2_upgrade(this) != 0) { if(handler->perform_http2_upgrade(this) != 0) {
return -1; return -1;
} }
return 0; return 0;
} }
pause_read(SHRPX_MSG_BLOCK); pause_read(SHRPX_MSG_BLOCK);
return 0;
} }
}
} else if(htperr == HPE_OK) { if(htperr != HPE_OK) {
// downstream can be NULL here.
if(downstream) {
if(downstream->get_output_buffer_full()) {
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream output buffer is full";
}
pause_read(SHRPX_NO_BUFFER);
}
}
} else {
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "HTTP parse failure: " ULOG(INFO, this) << "HTTP parse failure: "
<< "(" << http_errno_name(htperr) << ") " << "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr); << http_errno_description(htperr);
} }
handler->set_should_close_after_write(true); handler->set_should_close_after_write(true);
pause_read(SHRPX_MSG_BLOCK); pause_read(SHRPX_MSG_BLOCK);
if(error_reply(400) != 0) { if(error_reply(400) != 0) {
return -1; return -1;
} }
}
return 0; return 0;
} }
// downstream can be NULL here.
if(downstream && downstream->get_output_buffer_full()) {
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream output buffer is full";
}
pause_read(SHRPX_NO_BUFFER);
return 0;
}
}
}
int HttpsUpstream::on_write() int HttpsUpstream::on_write()
{ {
int rv = 0; int rv = 0;