nghttpx: Slightly faster version of HTTP/1 backend

This commit is contained in:
Tatsuhiro Tsujikawa 2016-02-07 01:07:44 +09:00
parent e763770f3e
commit cde79052dd
2 changed files with 181 additions and 162 deletions

View File

@ -127,8 +127,7 @@ HttpDownstreamConnection::HttpDownstreamConnection(
ioctrl_(&conn_.rlimit),
response_htp_{0},
group_(group),
addr_idx_(0),
connected_(false) {}
addr_idx_(0) {}
HttpDownstreamConnection::~HttpDownstreamConnection() {
if (conn_.tls.ssl) {
@ -789,78 +788,13 @@ http_parser_settings htp_hooks = {
};
} // namespace
ssize_t HttpDownstreamConnection::read_clear(uint8_t *buf, size_t len) {
return conn_.read_clear(buf, len);
}
ssize_t HttpDownstreamConnection::write_clear(struct iovec *iov,
size_t iovlen) {
return conn_.writev_clear(iov, iovlen);
}
int HttpDownstreamConnection::tls_handshake() {
ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error();
auto rv = conn_.tls_handshake();
if (rv == SHRPX_ERR_INPROGRESS) {
return 0;
}
if (rv < 0) {
return rv;
}
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "SSL/TLS handshake completed";
}
if (!get_config()->tls.insecure &&
ssl::check_cert(conn_.tls.ssl, &get_config()
->conn.downstream.addr_groups[group_]
.addrs[addr_idx_]) != 0) {
return -1;
}
read_ = &HttpDownstreamConnection::read_tls;
write_ = &HttpDownstreamConnection::write_tls;
do_read_ = &HttpDownstreamConnection::do_read;
do_write_ = &HttpDownstreamConnection::do_write;
// TODO Check negotiated ALPN
return on_write();
}
ssize_t HttpDownstreamConnection::read_tls(uint8_t *buf, size_t buflen) {
return conn_.read_tls(buf, buflen);
}
ssize_t HttpDownstreamConnection::write_tls(struct iovec *iov, size_t iovlen) {
assert(iovlen > 0);
return conn_.write_tls(iov[0].iov_base, iov[0].iov_len);
}
int HttpDownstreamConnection::do_read() {
if (!connected_) {
return 0;
}
if (conn_.tls.ssl) {
ERR_clear_error();
}
int HttpDownstreamConnection::read_clear() {
ev_timer_again(conn_.loop, &conn_.rt);
std::array<uint8_t, 8_k> buf;
int rv;
if (downstream_->get_upgraded()) {
// For upgraded connection, just pass data to the upstream.
for (;;) {
auto nread = read_(*this, buf.data(), buf.size());
auto nread = conn_.read_clear(buf.data(), buf.size());
if (nread == 0) {
return 0;
}
@ -869,87 +803,18 @@ int HttpDownstreamConnection::do_read() {
return nread;
}
rv = downstream_->get_upstream()->on_downstream_body(
downstream_, buf.data(), nread, true);
rv = process_input(buf.data(), nread);
if (rv != 0) {
return rv;
}
if (downstream_->response_buf_full()) {
downstream_->pause_read(SHRPX_NO_BUFFER);
return 0;
}
}
}
for (;;) {
auto nread = read_(*this, buf.data(), buf.size());
if (nread == 0) {
return 0;
}
if (nread < 0) {
return nread;
}
auto nproc =
http_parser_execute(&response_htp_, &htp_hooks,
reinterpret_cast<char *>(buf.data()), nread);
auto htperr = HTTP_PARSER_ERRNO(&response_htp_);
if (htperr != HPE_OK) {
// Handling early return (in other words, response was hijacked
// by mruby scripting).
if (downstream_->get_response_state() == Downstream::MSG_COMPLETE) {
return SHRPX_ERR_DCONN_CANCELED;
}
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "HTTP parser failure: "
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr);
}
return -1;
}
if (downstream_->get_upgraded()) {
if (nproc < static_cast<size_t>(nread)) {
// Data from buf.data() + nproc are for upgraded protocol.
rv = downstream_->get_upstream()->on_downstream_body(
downstream_, buf.data() + nproc, nread - nproc, true);
if (rv != 0) {
return rv;
}
if (downstream_->response_buf_full()) {
downstream_->pause_read(SHRPX_NO_BUFFER);
return 0;
}
}
// call on_read(), so that we can process data left in buffer as
// upgrade.
return on_read();
}
if (downstream_->response_buf_full()) {
downstream_->pause_read(SHRPX_NO_BUFFER);
if (!ev_is_active(&conn_.rev)) {
return 0;
}
}
}
int HttpDownstreamConnection::do_write() {
if (!connected_) {
return 0;
}
if (conn_.tls.ssl) {
ERR_clear_error();
}
int HttpDownstreamConnection::write_clear() {
ev_timer_again(conn_.loop, &conn_.rt);
auto upstream = downstream_->get_upstream();
@ -960,7 +825,7 @@ int HttpDownstreamConnection::do_write() {
while (input->rleft() > 0) {
auto iovcnt = input->riovec(iov.data(), iov.size());
auto nwrite = write_(*this, iov.data(), iovcnt);
auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
if (nwrite == 0) {
return 0;
@ -986,6 +851,171 @@ int HttpDownstreamConnection::do_write() {
return 0;
}
int HttpDownstreamConnection::tls_handshake() {
ERR_clear_error();
ev_timer_again(conn_.loop, &conn_.rt);
auto rv = conn_.tls_handshake();
if (rv == SHRPX_ERR_INPROGRESS) {
return 0;
}
if (rv < 0) {
return rv;
}
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "SSL/TLS handshake completed";
}
if (!get_config()->tls.insecure &&
ssl::check_cert(conn_.tls.ssl, &get_config()
->conn.downstream.addr_groups[group_]
.addrs[addr_idx_]) != 0) {
return -1;
}
do_read_ = &HttpDownstreamConnection::read_tls;
do_write_ = &HttpDownstreamConnection::write_tls;
// TODO Check negotiated ALPN
return on_write();
}
int HttpDownstreamConnection::read_tls() {
ERR_clear_error();
ev_timer_again(conn_.loop, &conn_.rt);
std::array<uint8_t, 8_k> buf;
int rv;
for (;;) {
auto nread = conn_.read_tls(buf.data(), buf.size());
if (nread == 0) {
return 0;
}
if (nread < 0) {
return nread;
}
rv = process_input(buf.data(), nread);
if (rv != 0) {
return rv;
}
if (!ev_is_active(&conn_.rev)) {
return 0;
}
}
}
int HttpDownstreamConnection::write_tls() {
ERR_clear_error();
ev_timer_again(conn_.loop, &conn_.rt);
auto upstream = downstream_->get_upstream();
auto input = downstream_->get_request_buf();
std::array<struct iovec, 1> iov;
while (input->rleft() > 0) {
auto iovcnt = input->riovec(iov.data(), iov.size());
assert(iovcnt == 1);
auto nwrite = conn_.write_tls(iov[0].iov_base, iov[0].iov_len);
if (nwrite == 0) {
return 0;
}
if (nwrite < 0) {
return nwrite;
}
input->drain(nwrite);
}
conn_.wlimit.stopw();
ev_timer_stop(conn_.loop, &conn_.wt);
if (input->rleft() == 0) {
auto &req = downstream_->request();
upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
req.unconsumed_body_length);
}
return 0;
}
int HttpDownstreamConnection::process_input(const uint8_t *data,
size_t datalen) {
int rv;
if (downstream_->get_upgraded()) {
// For upgraded connection, just pass data to the upstream.
rv = downstream_->get_upstream()->on_downstream_body(downstream_, data,
datalen, true);
if (rv != 0) {
return rv;
}
if (downstream_->response_buf_full()) {
downstream_->pause_read(SHRPX_NO_BUFFER);
return 0;
}
}
auto nproc =
http_parser_execute(&response_htp_, &htp_hooks,
reinterpret_cast<const char *>(data), datalen);
auto htperr = HTTP_PARSER_ERRNO(&response_htp_);
if (htperr != HPE_OK) {
// Handling early return (in other words, response was hijacked
// by mruby scripting).
if (downstream_->get_response_state() == Downstream::MSG_COMPLETE) {
return SHRPX_ERR_DCONN_CANCELED;
}
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "HTTP parser failure: "
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr);
}
return -1;
}
if (downstream_->get_upgraded()) {
if (nproc < datalen) {
// Data from data + nproc are for upgraded protocol.
rv = downstream_->get_upstream()->on_downstream_body(
downstream_, data + nproc, datalen - nproc, true);
if (rv != 0) {
return rv;
}
if (downstream_->response_buf_full()) {
downstream_->pause_read(SHRPX_NO_BUFFER);
return 0;
}
}
return 0;
}
if (downstream_->response_buf_full()) {
downstream_->pause_read(SHRPX_NO_BUFFER);
return 0;
}
return 0;
}
int HttpDownstreamConnection::connected() {
auto connect_blocker = client_handler_->get_connect_blocker();
@ -1005,8 +1035,6 @@ int HttpDownstreamConnection::connected() {
DLOG(INFO, this) << "Connected to downstream host";
}
connected_ = true;
connect_blocker->on_success();
conn_.rlimit.startw();
@ -1020,11 +1048,8 @@ int HttpDownstreamConnection::connected() {
return 0;
}
do_read_ = &HttpDownstreamConnection::do_read;
do_write_ = &HttpDownstreamConnection::do_write;
read_ = &HttpDownstreamConnection::read_clear;
write_ = &HttpDownstreamConnection::write_clear;
do_read_ = &HttpDownstreamConnection::read_clear;
do_write_ = &HttpDownstreamConnection::write_clear;
return 0;
}

View File

@ -62,13 +62,12 @@ public:
virtual bool poolable() const { return true; }
ssize_t read_clear(uint8_t *buf, size_t buflen);
ssize_t write_clear(struct iovec *iov, size_t iovlen);
ssize_t read_tls(uint8_t *buf, size_t buflen);
ssize_t write_tls(struct iovec *iov, size_t iovlen);
int read_clear();
int write_clear();
int read_tls();
int write_tls();
int do_read();
int do_write();
int process_input(const uint8_t *data, size_t datalen);
int tls_handshake();
int connected();
@ -79,10 +78,6 @@ public:
private:
Connection conn_;
std::function<int(HttpDownstreamConnection &)> do_read_, do_write_;
std::function<ssize_t(HttpDownstreamConnection &, uint8_t *buf,
size_t buflen)> read_;
std::function<ssize_t(HttpDownstreamConnection &, struct iovec *iov,
size_t iovlen)> write_;
Worker *worker_;
// nullptr if TLS is not used.
SSL_CTX *ssl_ctx_;
@ -91,7 +86,6 @@ private:
size_t group_;
// index of get_config()->downstream_addrs this object is using
size_t addr_idx_;
bool connected_;
};
} // namespace shrpx