From b2fb888363c08e98aae0638db62cdf7d164ea1d1 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 4 Feb 2015 21:15:58 +0900 Subject: [PATCH] Share I/O code with all upstreams/downstream objects --- src/Makefile.am | 1 + src/memchunk.h | 23 ++ src/ringbuf.h | 15 -- src/shrpx_client_handler.cc | 340 +++++------------------- src/shrpx_client_handler.h | 32 +-- src/shrpx_connection.cc | 321 ++++++++++++++++++++++ src/shrpx_connection.h | 107 ++++++++ src/shrpx_downstream_connection.h | 2 - src/shrpx_error.h | 8 +- src/shrpx_http2_session.cc | 340 ++++++++---------------- src/shrpx_http2_session.h | 19 +- src/shrpx_http2_upstream.cc | 6 +- src/shrpx_http_downstream_connection.cc | 172 +++++------- src/shrpx_http_downstream_connection.h | 9 +- src/shrpx_https_upstream.cc | 4 +- src/shrpx_spdy_upstream.cc | 8 +- src/shrpx_ssl.cc | 8 +- 17 files changed, 733 insertions(+), 682 deletions(-) create mode 100644 src/shrpx_connection.cc create mode 100644 src/shrpx_connection.h diff --git a/src/Makefile.am b/src/Makefile.am index da7ebacb..5b2f7ac1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -119,6 +119,7 @@ NGHTTPX_SRCS = \ shrpx_connect_blocker.cc shrpx_connect_blocker.h \ shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \ shrpx_rate_limit.cc shrpx_rate_limit.h \ + shrpx_connection.cc shrpx_connection.h \ buffer.h memchunk.h if HAVE_SPDYLAY diff --git a/src/memchunk.h b/src/memchunk.h index 9116d477..58600bb7 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -231,6 +231,29 @@ using Memchunk16K = Memchunk<16384>; using MemchunkPool = Pool; using DefaultMemchunks = Memchunks; +#define DEFAULT_WR_IOVCNT 16 + +#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT +#define MAX_WR_IOVCNT IOV_MAX +#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT +#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT +#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT + +inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) { + if (max == 0) { + return 0; + } + for (int i = 0; i < iovcnt; ++i) { + auto d = std::min(max, iov[i].iov_len); + iov[i].iov_len = d; + max -= d; + if (max == 0) { + return i + 1; + } + } + return iovcnt; +} + } // namespace nghttp2 #endif // MEMCHUNK_H diff --git a/src/ringbuf.h b/src/ringbuf.h index 10b62b40..867293c5 100644 --- a/src/ringbuf.h +++ b/src/ringbuf.h @@ -123,21 +123,6 @@ template struct RingBuf { uint8_t begin[N]; }; -inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) { - if (max == 0) { - return 0; - } - for (int i = 0; i < iovcnt; ++i) { - auto d = std::min(max, iov[i].iov_len); - iov[i].iov_len = d; - max -= d; - if (max == 0) { - return i + 1; - } - } - return iovcnt; -} - } // namespace nghttp2 #endif // RINGBUF_H diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 544fcc00..e098cf4b 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -49,7 +49,8 @@ namespace shrpx { namespace { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { - auto handler = static_cast(w->data); + auto conn = static_cast(w->data); + auto handler = static_cast(conn->data); if (LOG_ENABLED(INFO)) { CLOG(INFO, handler) << "Time out"; @@ -73,7 +74,8 @@ void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) { namespace { void readcb(struct ev_loop *loop, ev_io *w, int revents) { - auto handler = static_cast(w->data); + auto conn = static_cast(w->data); + auto handler = static_cast(conn->data); if (handler->do_read() != 0) { delete handler; @@ -84,7 +86,8 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { namespace { void writecb(struct ev_loop *loop, ev_io *w, int revents) { - auto handler = static_cast(w->data); + auto conn = static_cast(w->data); + auto handler = static_cast(conn->data); if (handler->do_write() != 0) { delete handler; @@ -94,7 +97,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { } // namespace int ClientHandler::read_clear() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); for (;;) { // we should process buffered data first before we read EOF. @@ -106,53 +109,33 @@ int ClientHandler::read_clear() { } rb_.reset(); - ssize_t nread = std::min(rb_.wleft(), rlimit_.avail()); - if (nread == 0) { - break; - } - - while ((nread = read(fd_, rb_.last, nread)) == -1 && errno == EINTR) - ; - if (nread == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - break; - } - return -1; - } + auto nread = conn_.read_clear(rb_.last, rb_.wleft()); if (nread == 0) { + return 0; + } + + if (nread < 0) { return -1; } rb_.write(nread); - rlimit_.drain(nread); } - - return 0; } int ClientHandler::write_clear() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); for (;;) { if (wb_.rleft() > 0) { - ssize_t nwrite = std::min(wb_.rleft(), wlimit_.avail()); + auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft()); if (nwrite == 0) { return 0; } - - while ((nwrite = write(fd_, wb_.pos, nwrite)) == -1 && errno == EINTR) - ; - if (nwrite == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - wlimit_.startw(); - ev_timer_again(loop_, &wt_); - return 0; - } + if (nwrite < 0) { return -1; } wb_.drain(nwrite); - wlimit_.drain(nwrite); continue; } wb_.reset(); @@ -164,54 +147,34 @@ int ClientHandler::write_clear() { } } - wlimit_.stopw(); - ev_timer_stop(loop_, &wt_); + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); return 0; } int ClientHandler::tls_handshake() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); - auto rv = SSL_do_handshake(ssl_); + auto rv = conn_.tls_handshake(); - if (rv == 0) { - return -1; + if (rv == SHRPX_ERR_INPROGRESS) { + return 0; } if (rv < 0) { - auto err = SSL_get_error(ssl_, rv); - switch (err) { - case SSL_ERROR_WANT_READ: - wlimit_.stopw(); - ev_timer_stop(loop_, &wt_); - return 0; - case SSL_ERROR_WANT_WRITE: - wlimit_.startw(); - ev_timer_again(loop_, &wt_); - return 0; - default: - return -1; - } + return -1; } - wlimit_.stopw(); - ev_timer_stop(loop_, &wt_); - - set_tls_handshake(true); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "SSL/TLS handshake completed"; } + if (validate_next_proto() != 0) { return -1; } - if (LOG_ENABLED(INFO)) { - if (SSL_session_reused(ssl_)) { - CLOG(INFO, this) << "SSL/TLS session reused"; - } - } read_ = &ClientHandler::read_tls; write_ = &ClientHandler::write_tls; @@ -220,7 +183,7 @@ int ClientHandler::tls_handshake() { } int ClientHandler::read_tls() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); @@ -234,116 +197,38 @@ int ClientHandler::read_tls() { } rb_.reset(); - ssize_t nread; - // SSL_read requires the same arguments (buf pointer and its - // length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE. - // rlimit_.avail() or rlimit_.avail() may return different length - // than the length previously passed to SSL_read, which violates - // OpenSSL assumption. To avoid this, we keep last legnth passed - // to SSL_read to tls_last_readlen_ if SSL_read indicated I/O - // blocking. - if (tls_last_readlen_ == 0) { - nread = std::min(rb_.wleft(), rlimit_.avail()); - if (nread == 0) { - return 0; - } - } else { - nread = tls_last_readlen_; - tls_last_readlen_ = 0; + auto nread = conn_.read_tls(rb_.last, rb_.wleft()); + + if (nread == 0) { + return 0; } - auto rv = SSL_read(ssl_, rb_.last, nread); - - if (rv == 0) { + if (nread < 0) { return -1; } - if (rv < 0) { - auto err = SSL_get_error(ssl_, rv); - switch (err) { - case SSL_ERROR_WANT_READ: - tls_last_readlen_ = nread; - return 0; - case SSL_ERROR_WANT_WRITE: - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Close connection due to TLS renegotiation"; - } - return -1; - default: - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "SSL_read: SSL_get_error returned " << err; - } - return -1; - } - } - - rb_.write(rv); - rlimit_.drain(rv); + rb_.write(nread); } } int ClientHandler::write_tls() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); for (;;) { if (wb_.rleft() > 0) { - ssize_t nwrite; - // SSL_write requires the same arguments (buf pointer and its - // length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE. - // get_write_limit() may return smaller length than previously - // passed to SSL_write, which violates OpenSSL assumption. To - // avoid this, we keep last legnth passed to SSL_write to - // tls_last_writelen_ if SSL_write indicated I/O blocking. - if (tls_last_writelen_ == 0) { - nwrite = std::min(wb_.rleft(), wlimit_.avail()); - if (nwrite == 0) { - return 0; - } + auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft()); - auto limit = get_write_limit(); - if (limit != -1) { - nwrite = std::min(nwrite, limit); - } - } else { - nwrite = tls_last_writelen_; - tls_last_writelen_ = 0; + if (nwrite == 0) { + return 0; } - auto rv = SSL_write(ssl_, wb_.pos, nwrite); - - if (rv == 0) { + if (nwrite < 0) { return -1; } - update_last_write_time(); - - if (rv < 0) { - auto err = SSL_get_error(ssl_, rv); - switch (err) { - case SSL_ERROR_WANT_READ: - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Close connection due to TLS renegotiation"; - } - return -1; - case SSL_ERROR_WANT_WRITE: - tls_last_writelen_ = nwrite; - wlimit_.startw(); - ev_timer_again(loop_, &wt_); - return 0; - default: - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "SSL_write: SSL_get_error returned " << err; - } - return -1; - } - } - - wb_.drain(rv); - wlimit_.drain(rv); - - update_warmup_writelen(rv); + wb_.drain(nwrite); continue; } @@ -356,8 +241,8 @@ int ClientHandler::write_tls() { } } - wlimit_.stopw(); - ev_timer_stop(loop_, &wt_); + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); return 0; } @@ -464,40 +349,27 @@ ClientHandler::ClientHandler(struct ev_loop *loop, int fd, SSL *ssl, const char *ipaddr, const char *port, WorkerStat *worker_stat, DownstreamConnectionPool *dconn_pool) - : ipaddr_(ipaddr), port_(port), - wlimit_(loop, &wev_, get_config()->write_rate, get_config()->write_burst), - rlimit_(loop, &rev_, get_config()->read_rate, get_config()->read_burst), - loop_(loop), dconn_pool_(dconn_pool), http2session_(nullptr), - http1_connect_blocker_(nullptr), ssl_(ssl), worker_stat_(worker_stat), - last_write_time_(0.), warmup_writelen_(0), + : conn_(loop, fd, ssl, get_config()->upstream_write_timeout, + get_config()->upstream_read_timeout, get_config()->write_rate, + get_config()->write_burst, get_config()->read_rate, + get_config()->read_burst, writecb, readcb, timeoutcb, this), + ipaddr_(ipaddr), port_(port), dconn_pool_(dconn_pool), + http2session_(nullptr), http1_connect_blocker_(nullptr), + worker_stat_(worker_stat), left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN), - tls_last_writelen_(0), tls_last_readlen_(0), fd_(fd), - should_close_after_write_(false), tls_handshake_(false), - tls_renegotiation_(false) { + should_close_after_write_(false) { ++worker_stat->num_connections; - ev_io_init(&wev_, writecb, fd_, EV_WRITE); - ev_io_init(&rev_, readcb, fd_, EV_READ); - - wev_.data = this; - rev_.data = this; - - ev_timer_init(&wt_, timeoutcb, 0., get_config()->upstream_write_timeout); - ev_timer_init(&rt_, timeoutcb, 0., get_config()->upstream_read_timeout); - - wt_.data = this; - rt_.data = this; - ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.); reneg_shutdown_timer_.data = this; - rlimit_.startw(); - ev_timer_again(loop_, &rt_); + conn_.rlimit.startw(); + ev_timer_again(conn_.loop, &conn_.rt); - if (ssl_) { - SSL_set_app_data(ssl_, reinterpret_cast(this)); + if (conn_.tls.ssl) { + SSL_set_app_data(conn_.tls.ssl, &conn_); read_ = write_ = &ClientHandler::tls_handshake; on_read_ = &ClientHandler::upstream_noop; on_write_ = &ClientHandler::upstream_write; @@ -525,33 +397,14 @@ ClientHandler::~ClientHandler() { --worker_stat_->num_connections; - ev_timer_stop(loop_, &reneg_shutdown_timer_); - - ev_timer_stop(loop_, &rt_); - ev_timer_stop(loop_, &wt_); - - ev_io_stop(loop_, &rev_); - ev_io_stop(loop_, &wev_); + ev_timer_stop(conn_.loop, &reneg_shutdown_timer_); // TODO If backend is http/2, and it is in CONNECTED state, signal // it and make it loopbreak when output is zero. if (worker_config->graceful_shutdown && worker_stat_->num_connections == 0) { - ev_break(loop_); + ev_break(conn_.loop); } - if (ssl_) { - SSL_set_app_data(ssl_, nullptr); - SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN); - ERR_clear_error(); - SSL_shutdown(ssl_); - } - - if (ssl_) { - SSL_free(ssl_); - } - - shutdown(fd_, SHUT_WR); - close(fd_); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Deleted"; } @@ -560,20 +413,20 @@ ClientHandler::~ClientHandler() { Upstream *ClientHandler::get_upstream() { return upstream_.get(); } struct ev_loop *ClientHandler::get_loop() const { - return loop_; + return conn_.loop; } void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) { - rt_.repeat = t; - if (ev_is_active(&rt_)) { - ev_timer_again(loop_, &rt_); + conn_.rt.repeat = t; + if (ev_is_active(&conn_.rt)) { + ev_timer_again(conn_.loop, &conn_.rt); } } void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) { - wt_.repeat = t; - if (ev_is_active(&wt_)) { - ev_timer_again(loop_, &wt_); + conn_.wt.repeat = t; + if (ev_is_active(&conn_.wt)) { + ev_timer_again(conn_.loop, &conn_.wt); } } @@ -585,7 +438,7 @@ int ClientHandler::validate_next_proto() { // First set callback for catch all cases on_read_ = &ClientHandler::upstream_read; - SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len); + SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len); for (int i = 0; i < 2; ++i) { if (next_proto) { if (LOG_ENABLED(INFO)) { @@ -604,7 +457,7 @@ int ClientHandler::validate_next_proto() { auto http2_upstream = util::make_unique(this); - if (!ssl::check_http2_requirement(ssl_)) { + if (!ssl::check_http2_requirement(conn_.tls.ssl)) { rv = http2_upstream->terminate_session(NGHTTP2_INADEQUATE_SECURITY); if (rv != 0) { @@ -744,7 +597,8 @@ ClientHandler::get_downstream_connection() { dconn = util::make_unique(dconn_pool_, http2session_); } else { - dconn = util::make_unique(dconn_pool_, loop_); + dconn = + util::make_unique(dconn_pool_, conn_.loop); } dconn->set_client_handler(this); return dconn; @@ -760,7 +614,7 @@ ClientHandler::get_downstream_connection() { return dconn; } -SSL *ClientHandler::get_ssl() const { return ssl_; } +SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; } void ClientHandler::set_http2_session(Http2Session *http2session) { http2session_ = http2session; @@ -806,70 +660,18 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { return 0; } -bool ClientHandler::get_http2_upgrade_allowed() const { return !ssl_; } +bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; } std::string ClientHandler::get_upstream_scheme() const { - if (ssl_) { + if (conn_.tls.ssl) { return "https"; } else { return "http"; } } -void ClientHandler::set_tls_handshake(bool f) { tls_handshake_ = f; } - -bool ClientHandler::get_tls_handshake() const { return tls_handshake_; } - -void ClientHandler::set_tls_renegotiation(bool f) { - if (tls_renegotiation_ == false) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "TLS renegotiation detected. " - << "Start shutdown timer now."; - } - - ev_timer_start(loop_, &reneg_shutdown_timer_); - } - tls_renegotiation_ = f; -} - -bool ClientHandler::get_tls_renegotiation() const { return tls_renegotiation_; } - -namespace { -const size_t SHRPX_SMALL_WRITE_LIMIT = 1300; -const size_t SHRPX_WARMUP_THRESHOLD = 1 << 20; -} // namespace - -ssize_t ClientHandler::get_write_limit() { - if (!ssl_) { - return -1; - } - - auto t = ev_now(loop_); - - if (t - last_write_time_ > 1.0) { - // Time out, use small record size - warmup_writelen_ = 0; - return SHRPX_SMALL_WRITE_LIMIT; - } - - // If event_base_gettimeofday_cached() failed, we just skip timer - // checking. Don't know how to treat this. - - if (warmup_writelen_ >= SHRPX_WARMUP_THRESHOLD) { - return -1; - } - - return SHRPX_SMALL_WRITE_LIMIT; -} - -void ClientHandler::update_warmup_writelen(size_t n) { - if (warmup_writelen_ < SHRPX_WARMUP_THRESHOLD) { - warmup_writelen_ += n; - } -} - -void ClientHandler::update_last_write_time() { - last_write_time_ = ev_now(loop_); +void ClientHandler::start_immediate_shutdown() { + ev_timer_start(conn_.loop, &reneg_shutdown_timer_); } void ClientHandler::write_accesslog(Downstream *downstream) { @@ -922,9 +724,9 @@ ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; } ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; } -void ClientHandler::signal_write() { wlimit_.startw(); } +void ClientHandler::signal_write() { conn_.wlimit.startw(); } -RateLimit *ClientHandler::get_rlimit() { return &rlimit_; } -RateLimit *ClientHandler::get_wlimit() { return &wlimit_; } +RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; } +RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; } } // namespace shrpx diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index e2ebda34..7a6b8f65 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -34,6 +34,7 @@ #include #include "shrpx_rate_limit.h" +#include "shrpx_connection.h" #include "buffer.h" using namespace nghttp2; @@ -106,20 +107,7 @@ public: bool get_http2_upgrade_allowed() const; // Returns upstream scheme, either "http" or "https" std::string get_upstream_scheme() const; - void set_tls_handshake(bool f); - bool get_tls_handshake() const; - void set_tls_renegotiation(bool f); - bool get_tls_renegotiation() const; - // Returns maximum write size. The intention of this chunk size is - // control the TLS record size. - // - // This function returns -1, if TLS is not enabled or no limitation - // is required. - ssize_t get_write_limit(); - // Updates the number of bytes written in warm up period. - void update_warmup_writelen(size_t n); - // Updates the time when last write was done. - void update_last_write_time(); + void start_immediate_shutdown(); // Writes upstream accesslog using |downstream|. The |downstream| // must not be nullptr. @@ -143,10 +131,7 @@ public: void signal_write(); private: - ev_io wev_; - ev_io rev_; - ev_timer wt_; - ev_timer rt_; + Connection conn_; ev_timer reneg_shutdown_timer_; std::unique_ptr upstream_; std::string ipaddr_; @@ -155,26 +140,15 @@ private: std::string alpn_; std::function read_, write_; std::function on_read_, on_write_; - RateLimit wlimit_; - RateLimit rlimit_; - struct ev_loop *loop_; DownstreamConnectionPool *dconn_pool_; // Shared HTTP2 session for each thread. NULL if backend is not // HTTP2. Not deleted by this object. Http2Session *http2session_; ConnectBlocker *http1_connect_blocker_; - SSL *ssl_; WorkerStat *worker_stat_; - double last_write_time_; - size_t warmup_writelen_; // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; - size_t tls_last_writelen_; - size_t tls_last_readlen_; - int fd_; bool should_close_after_write_; - bool tls_handshake_; - bool tls_renegotiation_; WriteBuf wb_; ReadBuf rb_; }; diff --git a/src/shrpx_connection.cc b/src/shrpx_connection.cc new file mode 100644 index 00000000..2dc4b177 --- /dev/null +++ b/src/shrpx_connection.cc @@ -0,0 +1,321 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_connection.h" + +#include + +#include + +#include + +#include "memchunk.h" + +using namespace nghttp2; + +namespace shrpx { +Connection::Connection(struct ev_loop *loop, int fd, SSL *ssl, + ev_tstamp write_timeout, ev_tstamp read_timeout, + size_t write_rate, size_t write_burst, size_t read_rate, + size_t read_burst, IOCb writecb, IOCb readcb, + TimerCb timeoutcb, void *data) + : tls{ssl}, wlimit(loop, &wev, write_rate, write_burst), + rlimit(loop, &rev, read_rate, read_burst), writecb(writecb), + readcb(readcb), timeoutcb(timeoutcb), loop(loop), data(data), fd(fd) { + + ev_io_init(&wev, writecb, fd, EV_WRITE); + ev_io_init(&rev, readcb, fd, EV_READ); + + wev.data = this; + rev.data = this; + + ev_timer_init(&wt, timeoutcb, 0., write_timeout); + ev_timer_init(&rt, timeoutcb, 0., read_timeout); + + wt.data = this; + rt.data = this; +} + +Connection::~Connection() { disconnect(); } + +void Connection::disconnect() { + ev_timer_stop(loop, &rt); + ev_timer_stop(loop, &wt); + + rlimit.stopw(); + wlimit.stopw(); + + if (tls.ssl) { + SSL_set_app_data(tls.ssl, nullptr); + SSL_set_shutdown(tls.ssl, SSL_RECEIVED_SHUTDOWN); + ERR_clear_error(); + SSL_shutdown(tls.ssl); + SSL_free(tls.ssl); + tls.ssl = nullptr; + } + + if (fd != -1) { + shutdown(fd, SHUT_WR); + close(fd); + fd = -1; + } +} + +int Connection::tls_handshake() { + auto rv = SSL_do_handshake(tls.ssl); + + if (rv == 0) { + return SHRPX_ERR_NETWORK; + } + + if (rv < 0) { + auto err = SSL_get_error(tls.ssl, rv); + switch (err) { + case SSL_ERROR_WANT_READ: + wlimit.stopw(); + ev_timer_stop(loop, &wt); + return SHRPX_ERR_INPROGRESS; + case SSL_ERROR_WANT_WRITE: + wlimit.startw(); + ev_timer_again(loop, &wt); + return SHRPX_ERR_INPROGRESS; + default: + return SHRPX_ERR_NETWORK; + } + } + + wlimit.stopw(); + ev_timer_stop(loop, &wt); + + tls.initial_handshake_done = true; + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "SSL/TLS handshake completed"; + if (SSL_session_reused(tls.ssl)) { + LOG(INFO) << "SSL/TLS session reused"; + } + } + + return 0; +} + +namespace { +const size_t SHRPX_SMALL_WRITE_LIMIT = 1300; +const size_t SHRPX_WARMUP_THRESHOLD = 1 << 20; +} // namespace + +ssize_t Connection::get_tls_write_limit() { + auto t = ev_now(loop); + + if (t - tls.last_write_time > 1.) { + // Time out, use small record size + tls.warmup_writelen = 0; + return SHRPX_SMALL_WRITE_LIMIT; + } + + if (tls.warmup_writelen >= SHRPX_WARMUP_THRESHOLD) { + return std::numeric_limits::max(); + } + + return SHRPX_SMALL_WRITE_LIMIT; +} + +void Connection::update_tls_warmup_writelen(size_t n) { + if (tls.warmup_writelen < SHRPX_WARMUP_THRESHOLD) { + tls.warmup_writelen += n; + } +} + +ssize_t Connection::write_tls(const void *data, size_t len) { + ssize_t nwrite; + // SSL_write requires the same arguments (buf pointer and its + // length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE. + // get_write_limit() may return smaller length than previously + // passed to SSL_write, which violates OpenSSL assumption. To avoid + // this, we keep last legnth passed to SSL_write to + // tls.last_writelen if SSL_write indicated I/O blocking. + if (tls.last_writelen == 0) { + nwrite = std::min(len, wlimit.avail()); + nwrite = std::min(nwrite, get_tls_write_limit()); + if (nwrite == 0) { + return 0; + } + } else { + nwrite = tls.last_writelen; + tls.last_writelen = 0; + } + + auto rv = SSL_write(tls.ssl, data, nwrite); + + if (rv == 0) { + return SHRPX_ERR_NETWORK; + } + + tls.last_write_time = ev_now(loop); + + if (rv < 0) { + auto err = SSL_get_error(tls.ssl, rv); + switch (err) { + case SSL_ERROR_WANT_READ: + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Close connection due to TLS renegotiation"; + } + return SHRPX_ERR_NETWORK; + case SSL_ERROR_WANT_WRITE: + tls.last_writelen = nwrite; + wlimit.startw(); + ev_timer_again(loop, &wt); + return 0; + default: + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "SSL_write: SSL_get_error returned " << err; + } + return SHRPX_ERR_NETWORK; + } + } + + wlimit.drain(rv); + + update_tls_warmup_writelen(rv); + + return rv; +} + +ssize_t Connection::read_tls(void *data, size_t len) { + ssize_t nread; + // SSL_read requires the same arguments (buf pointer and its + // length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE. + // rlimit_.avail() or rlimit_.avail() may return different length + // than the length previously passed to SSL_read, which violates + // OpenSSL assumption. To avoid this, we keep last legnth passed + // to SSL_read to tls_last_readlen_ if SSL_read indicated I/O + // blocking. + if (tls.last_readlen == 0) { + nread = std::min(len, rlimit.avail()); + if (nread == 0) { + return 0; + } + } else { + nread = tls.last_readlen; + tls.last_readlen = 0; + } + + auto rv = SSL_read(tls.ssl, data, nread); + + if (rv <= 0) { + auto err = SSL_get_error(tls.ssl, rv); + switch (err) { + case SSL_ERROR_WANT_READ: + tls.last_readlen = nread; + return 0; + case SSL_ERROR_WANT_WRITE: + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Close connection due to TLS renegotiation"; + } + return SHRPX_ERR_NETWORK; + case SSL_ERROR_ZERO_RETURN: + return SHRPX_ERR_EOF; + default: + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "SSL_read: SSL_get_error returned " << err; + } + return SHRPX_ERR_NETWORK; + } + } + + rlimit.drain(rv); + + return rv; +} + +ssize_t Connection::write_clear(const void *data, size_t len) { + ssize_t nwrite = std::min(len, wlimit.avail()); + if (nwrite == 0) { + return 0; + } + + while ((nwrite = write(fd, data, nwrite)) == -1 && errno == EINTR) + ; + if (nwrite == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + wlimit.startw(); + ev_timer_again(loop, &wt); + return 0; + } + return SHRPX_ERR_NETWORK; + } + + wlimit.drain(nwrite); + + return nwrite; +} + +ssize_t Connection::writev_clear(struct iovec *iov, int iovcnt) { + iovcnt = limit_iovec(iov, iovcnt, wlimit.avail()); + if (iovcnt == 0) { + return 0; + } + + ssize_t nwrite; + while ((nwrite = writev(fd, iov, iovcnt)) == -1 && errno == EINTR) + ; + if (nwrite == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + wlimit.startw(); + ev_timer_again(loop, &wt); + return 0; + } + return SHRPX_ERR_NETWORK; + } + + wlimit.drain(nwrite); + + return nwrite; +} + +ssize_t Connection::read_clear(void *data, size_t len) { + ssize_t nread = std::min(len, rlimit.avail()); + if (nread == 0) { + return 0; + } + + while ((nread = read(fd, data, nread)) == -1 && errno == EINTR) + ; + if (nread == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + return SHRPX_ERR_NETWORK; + } + + if (nread == 0) { + return SHRPX_ERR_EOF; + } + + rlimit.drain(nread); + + return nread; +} + +} // namespace shrpx diff --git a/src/shrpx_connection.h b/src/shrpx_connection.h new file mode 100644 index 00000000..8a7e7be2 --- /dev/null +++ b/src/shrpx_connection.h @@ -0,0 +1,107 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_CONNECTION_H +#define SHRPX_CONNECTION_H + +#include "shrpx_config.h" + +#include + +#include + +#include + +#include "shrpx_rate_limit.h" +#include "shrpx_error.h" + +namespace shrpx { + +struct TLSConnection { + SSL *ssl; + ev_tstamp last_write_time; + size_t warmup_writelen; + // length passed to SSL_write and SSL_read last time. This is + // required since these functions require the exact same parameters + // on non-blocking I/O. + size_t last_writelen, last_readlen; + bool initial_handshake_done; + bool reneg_started; +}; + +template using EVCb = void (*)(struct ev_loop *, T *, int); + +using IOCb = EVCb; +using TimerCb = EVCb; + +struct Connection { + Connection(struct ev_loop *loop, int fd, SSL *ssl, ev_tstamp write_timeout, + ev_tstamp read_timeout, size_t write_rate, size_t write_burst, + size_t read_rate, size_t read_burst, IOCb writecb, IOCb readcb, + TimerCb timeoutcb, void *data); + ~Connection(); + + void disconnect(); + + int tls_handshake(); + + // All write_* and writev_clear functions return number of bytes + // written. If nothing cannot be written (e.g., there is no + // allowance in RateLimit or underlying connection blocks), return + // 0. SHRPX_ERR_NETWORK is returned in case of error. + // + // All read_* functions return number of bytes read. If nothing + // cannot be read (e.g., there is no allowance in Ratelimit or + // underlying connection blocks), return 0. SHRPX_ERR_EOF is + // returned in case of EOF and no data was read. Otherwise + // SHRPX_ERR_NETWORK is return in case of error. + ssize_t write_tls(const void *data, size_t len); + ssize_t read_tls(void *data, size_t len); + + ssize_t get_tls_write_limit(); + // Updates the number of bytes written in warm up period. + void update_tls_warmup_writelen(size_t n); + + ssize_t write_clear(const void *data, size_t len); + ssize_t writev_clear(struct iovec *iov, int iovcnt); + ssize_t read_clear(void *data, size_t len); + + TLSConnection tls; + ev_io wev; + ev_io rev; + ev_timer wt; + ev_timer rt; + RateLimit wlimit; + RateLimit rlimit; + IOCb writecb; + IOCb readcb; + TimerCb timeoutcb; + struct ev_loop *loop; + void *data; + int fd; +}; + +} // namespace shrpx + +#endif // SHRPX_CONNECTION_H diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index fad0b491..5594ccf7 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -51,8 +51,6 @@ public: virtual int resume_read(IOCtrlReason reason, size_t consumed) = 0; virtual void force_resume_read() = 0; - enum { ERR_EOF = -100, ERR_NET = -101 }; - virtual int on_read() = 0; virtual int on_write() = 0; virtual int on_timeout() { return 0; } diff --git a/src/shrpx_error.h b/src/shrpx_error.h index a519f67d..f1d1bffa 100644 --- a/src/shrpx_error.h +++ b/src/shrpx_error.h @@ -29,11 +29,13 @@ namespace shrpx { +// Deprecated, do not use. enum ErrorCode { SHRPX_ERR_SUCCESS = 0, - SHRPX_ERR_UNKNOWN = -1, - SHRPX_ERR_HTTP_PARSE = -2, - SHRPX_ERR_NETWORK = -3 + SHRPX_ERR_ERROR = -1, + SHRPX_ERR_NETWORK = -100, + SHRPX_ERR_EOF = -101, + SHRPX_ERR_INPROGRESS = -102, }; } // namespace shrpx diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index f421fc00..5fc4018d 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -73,7 +73,8 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { namespace { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { - auto http2session = static_cast(w->data); + auto conn = static_cast(w->data); + auto http2session = static_cast(conn->data); if (LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Timeout"; @@ -87,7 +88,8 @@ void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { namespace { void readcb(struct ev_loop *loop, ev_io *w, int revents) { int rv; - auto http2session = static_cast(w->data); + auto conn = static_cast(w->data); + auto http2session = static_cast(conn->data); http2session->connection_alive(); rv = http2session->do_read(); if (rv != 0) { @@ -99,7 +101,8 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { namespace { void writecb(struct ev_loop *loop, ev_io *w, int revents) { int rv; - auto http2session = static_cast(w->data); + auto conn = static_cast(w->data); + auto http2session = static_cast(conn->data); http2session->clear_write_request(); http2session->connection_alive(); rv = http2session->do_write(); @@ -132,26 +135,17 @@ void wrschedcb(struct ev_loop *loop, ev_prepare *w, int revents) { } // namespace Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx) - : loop_(loop), ssl_ctx_(ssl_ctx), ssl_(nullptr), session_(nullptr), - data_pending_(nullptr), data_pendinglen_(0), fd_(-1), - state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE), - flow_control_(false), write_requested_(false) { - // We do not know fd yet, so just set dummy fd 0 - ev_io_init(&wev_, writecb, 0, EV_WRITE); - ev_io_init(&rev_, readcb, 0, EV_READ); - - wev_.data = this; - rev_.data = this; + : conn_(loop, -1, nullptr, get_config()->downstream_write_timeout, + get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb, + timeoutcb, this), + ssl_ctx_(ssl_ctx), session_(nullptr), data_pending_(nullptr), + data_pendinglen_(0), state_(DISCONNECTED), + connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false), + write_requested_(false) { read_ = write_ = &Http2Session::noop; on_read_ = on_write_ = &Http2Session::noop; - ev_timer_init(&wt_, timeoutcb, 0., get_config()->downstream_write_timeout); - ev_timer_init(&rt_, timeoutcb, 0., get_config()->downstream_read_timeout); - - wt_.data = this; - rt_.data = this; - // We will resuse this many times, so use repeat timeout value. ev_timer_init(&connchk_timer_, connchk_timeout_cb, 0., 5.); @@ -166,7 +160,7 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx) ev_prepare_init(&wrsched_prep_, &wrschedcb); wrsched_prep_.data = this; - ev_prepare_start(loop_, &wrsched_prep_); + ev_prepare_start(conn_.loop, &wrsched_prep_); } Http2Session::~Http2Session() { disconnect(); } @@ -181,34 +175,13 @@ int Http2Session::disconnect(bool hard) { rb_.reset(); wb_.reset(); - ev_timer_stop(loop_, &settings_timer_); - ev_timer_stop(loop_, &connchk_timer_); - - ev_timer_stop(loop_, &rt_); - ev_timer_stop(loop_, &wt_); + ev_timer_stop(conn_.loop, &settings_timer_); + ev_timer_stop(conn_.loop, &connchk_timer_); read_ = write_ = &Http2Session::noop; on_read_ = on_write_ = &Http2Session::noop; - ev_io_stop(loop_, &rev_); - ev_io_stop(loop_, &wev_); - - if (ssl_) { - SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN); - ERR_clear_error(); - SSL_shutdown(ssl_); - SSL_free(ssl_); - ssl_ = nullptr; - } - - if (fd_ != -1) { - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Closing fd=" << fd_; - } - shutdown(fd_, SHUT_WR); - close(fd_); - fd_ = -1; - } + conn_.disconnect(); if (proxy_htp_) { proxy_htp_.reset(); @@ -251,7 +224,7 @@ int Http2Session::disconnect(bool hard) { return 0; } -int Http2Session::check_cert() { return ssl::check_cert(ssl_); } +int Http2Session::check_cert() { return ssl::check_cert(conn_.tls.ssl); } int Http2Session::initiate_connection() { int rv = 0; @@ -262,15 +235,15 @@ int Http2Session::initiate_connection() { << get_config()->downstream_http_proxy_port; } - fd_ = util::create_nonblock_socket( + conn_.fd = util::create_nonblock_socket( get_config()->downstream_http_proxy_addr.storage.ss_family); - if (fd_ == -1) { + if (conn_.fd == -1) { return -1; } - rv = connect(fd_, const_cast( - &get_config()->downstream_http_proxy_addr.sa), + rv = connect(conn_.fd, const_cast( + &get_config()->downstream_http_proxy_addr.sa), get_config()->downstream_http_proxy_addrlen); if (rv != 0 && errno != EINPROGRESS) { SSLOG(ERROR, this) << "Failed to connect to the proxy " @@ -279,13 +252,13 @@ int Http2Session::initiate_connection() { return -1; } - ev_io_set(&rev_, fd_, EV_READ); - ev_io_set(&wev_, fd_, EV_WRITE); + ev_io_set(&conn_.rev, conn_.fd, EV_READ); + ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); - ev_io_start(loop_, &wev_); + conn_.wlimit.startw(); // TODO we should have timeout for connection establishment - ev_timer_again(loop_, &wt_); + ev_timer_again(conn_.loop, &conn_.wt); write_ = &Http2Session::connected; @@ -307,8 +280,8 @@ int Http2Session::initiate_connection() { } if (ssl_ctx_) { // We are establishing TLS connection. - ssl_ = SSL_new(ssl_ctx_); - if (!ssl_) { + conn_.tls.ssl = SSL_new(ssl_ctx_); + if (!conn_.tls.ssl) { SSLOG(ERROR, this) << "SSL_new() failed: " << ERR_error_string(ERR_get_error(), NULL); return -1; @@ -325,21 +298,21 @@ int Http2Session::initiate_connection() { // TLS extensions: SNI. There is no documentation about the return // code for this function (actually this is macro wrapping SSL_ctrl // at the time of this writing). - SSL_set_tlsext_host_name(ssl_, sni_name); + SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name); } // If state_ == PROXY_CONNECTED, we has connected to the proxy - // using fd_ and tunnel has been established. + // using conn_.fd and tunnel has been established. if (state_ == DISCONNECTED) { - assert(fd_ == -1); + assert(conn_.fd == -1); - fd_ = util::create_nonblock_socket( + conn_.fd = util::create_nonblock_socket( get_config()->downstream_addrs[0].addr.storage.ss_family); - if (fd_ == -1) { + if (conn_.fd == -1) { return -1; } rv = connect( - fd_, + conn_.fd, // TODO maybe not thread-safe? const_cast(&get_config()->downstream_addrs[0].addr.sa), get_config()->downstream_addrs[0].addrlen); @@ -348,25 +321,25 @@ int Http2Session::initiate_connection() { } } - if (SSL_set_fd(ssl_, fd_) == 0) { + if (SSL_set_fd(conn_.tls.ssl, conn_.fd) == 0) { return -1; } - SSL_set_connect_state(ssl_); + SSL_set_connect_state(conn_.tls.ssl); } else { if (state_ == DISCONNECTED) { // Without TLS and proxy. - assert(fd_ == -1); + assert(conn_.fd == -1); - fd_ = util::create_nonblock_socket( + conn_.fd = util::create_nonblock_socket( get_config()->downstream_addrs[0].addr.storage.ss_family); - if (fd_ == -1) { + if (conn_.fd == -1) { return -1; } - rv = connect(fd_, const_cast( - &get_config()->downstream_addrs[0].addr.sa), + rv = connect(conn_.fd, const_cast( + &get_config()->downstream_addrs[0].addr.sa), get_config()->downstream_addrs[0].addrlen); if (rv != 0 && errno != EINPROGRESS) { return -1; @@ -384,13 +357,11 @@ int Http2Session::initiate_connection() { // rev_ and wev_ could possibly be active here. Since calling // ev_io_set is not allowed while watcher is active, we have to // stop them just in case. - ev_io_stop(loop_, &rev_); - ev_io_stop(loop_, &wev_); + conn_.rlimit.stopw(); + conn_.wlimit.stopw(); - ev_io_set(&rev_, fd_, EV_READ); - ev_io_set(&wev_, fd_, EV_WRITE); - - ev_io_start(loop_, &wev_); + ev_io_set(&conn_.rev, conn_.fd, EV_READ); + ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); write_ = &Http2Session::connected; @@ -400,11 +371,11 @@ int Http2Session::initiate_connection() { // We have been already connected when no TLS and proxy is used. if (state_ != CONNECTED) { state_ = CONNECTING; - ev_io_start(loop_, &wev_); + conn_.wlimit.startw(); // TODO we should have timeout for connection establishment - ev_timer_again(loop_, &wt_); + ev_timer_again(conn_.loop, &conn_.wt); } else { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); } return 0; @@ -450,17 +421,13 @@ http_parser_settings htp_hooks = { int Http2Session::downstream_read_proxy() { for (;;) { - const void *data; - size_t datalen; - std::tie(data, datalen) = rb_.get(); - - if (datalen == 0) { - break; + if (rb_.rleft() == 0) { + return 0; } - size_t nread = - http_parser_execute(proxy_htp_.get(), &htp_hooks, - reinterpret_cast(data), datalen); + size_t nread = http_parser_execute(proxy_htp_.get(), &htp_hooks, + reinterpret_cast(rb_.pos), + rb_.rleft()); rb_.drain(nread); @@ -476,12 +443,11 @@ int Http2Session::downstream_read_proxy() { if (initiate_connection() != 0) { return -1; } - break; + return 0; case Http2Session::PROXY_FAILED: return -1; } } - return 0; } int Http2Session::downstream_connect_proxy() { @@ -679,11 +645,11 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, } // namespace void Http2Session::start_settings_timer() { - ev_timer_again(loop_, &settings_timer_); + ev_timer_again(conn_.loop, &settings_timer_); } void Http2Session::stop_settings_timer() { - ev_timer_stop(loop_, &settings_timer_); + ev_timer_stop(conn_.loop, &settings_timer_); } namespace { @@ -1176,7 +1142,7 @@ int Http2Session::on_connect() { if (ssl_ctx_) { const unsigned char *next_proto = nullptr; unsigned int next_proto_len; - SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len); + SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len); for (int i = 0; i < 2; ++i) { if (next_proto) { if (LOG_ENABLED(INFO)) { @@ -1276,8 +1242,8 @@ int Http2Session::on_connect() { return -1; } - auto must_terminate = - !get_config()->downstream_no_tls && !ssl::check_http2_requirement(ssl_); + auto must_terminate = !get_config()->downstream_no_tls && + !ssl::check_http2_requirement(conn_.tls.ssl); if (must_terminate) { rv = terminate_session(NGHTTP2_INADEQUATE_SECURITY); @@ -1329,16 +1295,9 @@ int Http2Session::on_write() { return on_write_(*this); } int Http2Session::downstream_read() { ssize_t rv = 0; - for (;;) { - const void *data; - size_t nread; - std::tie(data, nread) = rb_.get(); - if (nread == 0) { - break; - } - + if (rb_.rleft() > 0) { rv = nghttp2_session_mem_recv( - session_, reinterpret_cast(data), nread); + session_, reinterpret_cast(rb_.pos), rb_.rleft()); if (rv < 0) { SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: " @@ -1346,7 +1305,9 @@ int Http2Session::downstream_read() { return -1; } - rb_.drain(nread); + // nghttp2_session_mem_recv() should consume all input data in + // case of success. + rb_.reset(); } if (nghttp2_session_want_read(session_) == 0 && @@ -1413,10 +1374,10 @@ void Http2Session::clear_write_request() { write_requested_ = false; } bool Http2Session::write_requested() const { return write_requested_; } struct ev_loop *Http2Session::get_loop() const { - return loop_; + return conn_.loop; } -ev_io *Http2Session::get_wev() { return &wev_; } +ev_io *Http2Session::get_wev() { return &conn_.wev; } int Http2Session::get_state() const { return state_; } @@ -1431,7 +1392,7 @@ int Http2Session::terminate_session(uint32_t error_code) { return 0; } -SSL *Http2Session::get_ssl() const { return ssl_; } +SSL *Http2Session::get_ssl() const { return conn_.tls.ssl; } int Http2Session::consume(int32_t stream_id, size_t len) { int rv; @@ -1473,7 +1434,7 @@ void Http2Session::start_checking_connection() { } void Http2Session::reset_connection_check_timer() { - ev_timer_again(loop_, &connchk_timer_); + ev_timer_again(conn_.loop, &connchk_timer_); } void Http2Session::connection_alive() { @@ -1518,7 +1479,7 @@ void Http2Session::set_connection_check_state(int state) { int Http2Session::noop() { return 0; } int Http2Session::connected() { - if (!util::check_socket_connected(fd_)) { + if (!util::check_socket_connected(conn_.fd)) { return -1; } @@ -1526,9 +1487,9 @@ int Http2Session::connected() { SSLOG(INFO, this) << "Connection established"; } - ev_io_start(loop_, &rev_); + conn_.rlimit.startw(); - if (ssl_) { + if (conn_.tls.ssl) { read_ = &Http2Session::tls_handshake; write_ = &Http2Session::tls_handshake; @@ -1551,7 +1512,7 @@ int Http2Session::connected() { } int Http2Session::read_clear() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); for (;;) { // we should process buffered data first before we read EOF. @@ -1562,50 +1523,36 @@ int Http2Session::read_clear() { return 0; } rb_.reset(); - struct iovec iov[2]; - auto iovcnt = rb_.wiovec(iov); - if (iovcnt > 0) { - ssize_t nread; - while ((nread = readv(fd_, iov, iovcnt)) == -1 && errno == EINTR) - ; - if (nread == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - break; - } - return -1; - } + auto nread = conn_.read_clear(rb_.last, rb_.wleft()); - if (nread == 0) { - return -1; - } - - rb_.write(nread); + if (nread == 0) { + return 0; } - } - return 0; + if (nread < 0) { + return nread; + } + + rb_.write(nread); + } } int Http2Session::write_clear() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); for (;;) { if (wb_.rleft() > 0) { - struct iovec iov[2]; - auto iovcnt = wb_.riovec(iov); + auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft()); - ssize_t nwrite; - while ((nwrite = writev(fd_, iov, iovcnt)) == -1 && errno == EINTR) - ; - if (nwrite == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - ev_io_start(loop_, &wev_); - ev_timer_again(loop_, &wt_); - return 0; - } - return -1; + if (nwrite == 0) { + return 0; } + + if (nwrite < 0) { + return nwrite; + } + wb_.drain(nwrite); continue; } @@ -1619,50 +1566,30 @@ int Http2Session::write_clear() { } } - ev_io_stop(loop_, &wev_); - ev_timer_stop(loop_, &wt_); + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); return 0; } int Http2Session::tls_handshake() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); - auto rv = SSL_do_handshake(ssl_); + auto rv = conn_.tls_handshake(); - if (rv == 0) { - return -1; + if (rv == SHRPX_ERR_INPROGRESS) { + return 0; } if (rv < 0) { - auto err = SSL_get_error(ssl_, rv); - switch (err) { - case SSL_ERROR_WANT_READ: - ev_io_stop(loop_, &wev_); - ev_timer_stop(loop_, &wt_); - return 0; - case SSL_ERROR_WANT_WRITE: - ev_io_start(loop_, &wev_); - ev_timer_again(loop_, &wt_); - return 0; - default: - return -1; - } + return rv; } - ev_io_stop(loop_, &wev_); - ev_timer_stop(loop_, &wt_); - if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "SSL/TLS handshake completed"; } - if (LOG_ENABLED(INFO)) { - if (SSL_session_reused(ssl_)) { - CLOG(INFO, this) << "SSL/TLS session reused"; - } - } if (!get_config()->downstream_no_tls && !get_config()->insecure && check_cert() != 0) { @@ -1681,7 +1608,7 @@ int Http2Session::tls_handshake() { } int Http2Session::read_tls() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); @@ -1694,78 +1621,39 @@ int Http2Session::read_tls() { return 0; } rb_.reset(); - struct iovec iov[2]; - auto iovcnt = rb_.wiovec(iov); - if (iovcnt == 0) { + + auto nread = conn_.read_tls(rb_.last, rb_.wleft()); + + if (nread == 0) { return 0; } - auto rv = SSL_read(ssl_, iov[0].iov_base, iov[0].iov_len); - - if (rv == 0) { - return -1; + if (nread < 0) { + return nread; } - if (rv < 0) { - auto err = SSL_get_error(ssl_, rv); - switch (err) { - case SSL_ERROR_WANT_READ: - return 0; - case SSL_ERROR_WANT_WRITE: - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Close connection due to TLS renegotiation"; - } - return -1; - default: - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "SSL_read: SSL_get_error returned " << err; - } - return -1; - } - } - - rb_.write(rv); + rb_.write(nread); } } int Http2Session::write_tls() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); for (;;) { if (wb_.rleft() > 0) { - const void *p; - size_t len; - std::tie(p, len) = wb_.get(); + auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft()); - auto rv = SSL_write(ssl_, p, len); - - if (rv == 0) { - return -1; + if (nwrite == 0) { + return 0; } - if (rv < 0) { - auto err = SSL_get_error(ssl_, rv); - switch (err) { - case SSL_ERROR_WANT_READ: - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Close connection due to TLS renegotiation"; - } - return -1; - case SSL_ERROR_WANT_WRITE: - ev_io_start(loop_, &wev_); - ev_timer_again(loop_, &wt_); - return 0; - default: - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "SSL_write: SSL_get_error returned " << err; - } - return -1; - } + if (nwrite < 0) { + return nwrite; } - wb_.drain(rv); + wb_.drain(nwrite); continue; } @@ -1778,8 +1666,8 @@ int Http2Session::write_tls() { } } - ev_io_stop(loop_, &wev_); - ev_timer_stop(loop_, &wt_); + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); return 0; } diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index 422f876d..1fc48524 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -38,7 +38,8 @@ #include "http-parser/http_parser.h" -#include "ringbuf.h" +#include "shrpx_connection.h" +#include "buffer.h" using namespace nghttp2; @@ -168,14 +169,11 @@ public: CONNECTION_CHECK_STARTED }; - using ReadBuf = RingBuf<8192>; - using WriteBuf = RingBuf<65536>; + using ReadBuf = Buffer<8192>; + using WriteBuf = Buffer<32768>; private: - ev_io wev_; - ev_io rev_; - ev_timer wt_; - ev_timer rt_; + Connection conn_; ev_timer settings_timer_; ev_timer connchk_timer_; ev_prepare wrsched_prep_; @@ -185,18 +183,11 @@ private: std::function on_read_, on_write_; // Used to parse the response from HTTP proxy std::unique_ptr proxy_htp_; - struct ev_loop *loop_; // NULL if no TLS is configured SSL_CTX *ssl_ctx_; - SSL *ssl_; nghttp2_session *session_; const uint8_t *data_pending_; size_t data_pendinglen_; - // fd_ is used for proxy connection and no TLS connection. For - // direct or TLS connection, it may be -1 even after connection is - // established. Use bufferevent_getfd(bev_) to get file descriptor - // in these cases. - int fd_; int state_; int connection_check_state_; bool flow_control_; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index f7c8ee7f..c22b57d1 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -889,11 +889,11 @@ int Http2Upstream::downstream_read(DownstreamConnection *dconn) { dconn = nullptr; } else { auto rv = downstream->on_read(); - if (rv == DownstreamConnection::ERR_EOF) { + if (rv == SHRPX_ERR_EOF) { return downstream_eof(dconn); } if (rv != 0) { - if (rv != DownstreamConnection::ERR_NET) { + if (rv != SHRPX_ERR_NETWORK) { if (LOG_ENABLED(INFO)) { DCLOG(INFO, dconn) << "HTTP parser failure"; } @@ -919,7 +919,7 @@ int Http2Upstream::downstream_read(DownstreamConnection *dconn) { int Http2Upstream::downstream_write(DownstreamConnection *dconn) { int rv; rv = dconn->on_write(); - if (rv == DownstreamConnection::ERR_NET) { + if (rv == SHRPX_ERR_NETWORK) { return downstream_error(dconn, Downstream::EVENT_ERROR); } if (rv != 0) { diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 559e898f..0e6a6d2b 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -43,7 +43,8 @@ namespace shrpx { namespace { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { - auto dconn = static_cast(w->data); + auto conn = static_cast(w->data); + auto dconn = static_cast(conn->data); if (LOG_ENABLED(INFO)) { DCLOG(INFO, dconn) << "Time out"; @@ -64,7 +65,8 @@ void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { namespace { void readcb(struct ev_loop *loop, ev_io *w, int revents) { - auto dconn = static_cast(w->data); + auto conn = static_cast(w->data); + auto dconn = static_cast(conn->data); auto downstream = dconn->get_downstream(); auto upstream = downstream->get_upstream(); auto handler = upstream->get_client_handler(); @@ -77,7 +79,8 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { namespace { void writecb(struct ev_loop *loop, ev_io *w, int revents) { - auto dconn = static_cast(w->data); + auto conn = static_cast(w->data); + auto dconn = static_cast(conn->data); auto downstream = dconn->get_downstream(); auto upstream = downstream->get_upstream(); auto handler = upstream->get_client_handler(); @@ -90,7 +93,8 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { namespace { void connectcb(struct ev_loop *loop, ev_io *w, int revents) { - auto dconn = static_cast(w->data); + auto conn = static_cast(w->data); + auto dconn = static_cast(conn->data); auto downstream = dconn->get_downstream(); auto upstream = downstream->get_upstream(); auto handler = upstream->get_client_handler(); @@ -106,32 +110,13 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) { HttpDownstreamConnection::HttpDownstreamConnection( DownstreamConnectionPool *dconn_pool, struct ev_loop *loop) - : DownstreamConnection(dconn_pool), rlimit_(loop, &rev_, 0, 0), - ioctrl_(&rlimit_), response_htp_{0}, loop_(loop), addr_idx_(0), fd_(-1) { - // We do not know fd yet, so just set dummy fd 0 - ev_io_init(&wev_, connectcb, 0, EV_WRITE); - ev_io_init(&rev_, readcb, 0, EV_READ); - - wev_.data = this; - rev_.data = this; - - ev_timer_init(&wt_, timeoutcb, 0., get_config()->downstream_write_timeout); - ev_timer_init(&rt_, timeoutcb, 0., get_config()->downstream_read_timeout); - - wt_.data = this; - rt_.data = this; -} + : DownstreamConnection(dconn_pool), + conn_(loop, -1, nullptr, get_config()->downstream_write_timeout, + get_config()->downstream_read_timeout, 0, 0, 0, 0, connectcb, + readcb, timeoutcb, this), + ioctrl_(&conn_.rlimit), response_htp_{0}, addr_idx_(0) {} HttpDownstreamConnection::~HttpDownstreamConnection() { - ev_timer_stop(loop_, &rt_); - ev_timer_stop(loop_, &wt_); - ev_io_stop(loop_, &rev_); - ev_io_stop(loop_, &wev_); - - if (fd_ != -1) { - shutdown(fd_, SHUT_WR); - close(fd_); - } // Downstream and DownstreamConnection may be deleted // asynchronously. if (downstream_) { @@ -144,7 +129,7 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream; } - if (fd_ == -1) { + if (conn_.fd == -1) { auto connect_blocker = client_handler_->get_http1_connect_blocker(); if (connect_blocker->blocked()) { @@ -162,10 +147,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { ++worker_stat->next_downstream; worker_stat->next_downstream %= get_config()->downstream_addrs.size(); - fd_ = util::create_nonblock_socket( + conn_.fd = util::create_nonblock_socket( get_config()->downstream_addrs[i].addr.storage.ss_family); - if (fd_ == -1) { + if (conn_.fd == -1) { auto error = errno; DCLOG(WARN, this) << "socket() failed; errno=" << error; @@ -175,16 +160,16 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { } int rv; - rv = connect(fd_, const_cast( - &get_config()->downstream_addrs[i].addr.sa), + rv = connect(conn_.fd, const_cast( + &get_config()->downstream_addrs[i].addr.sa), get_config()->downstream_addrs[i].addrlen); if (rv != 0 && errno != EINPROGRESS) { auto error = errno; DCLOG(WARN, this) << "connect() failed; errno=" << error; connect_blocker->on_failure(); - close(fd_); - fd_ = -1; + close(conn_.fd); + conn_.fd = -1; if (end == worker_stat->next_downstream) { return SHRPX_ERR_NETWORK; @@ -200,10 +185,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { addr_idx_ = i; - ev_io_set(&wev_, fd_, EV_WRITE); - ev_io_set(&rev_, fd_, EV_READ); + ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); + ev_io_set(&conn_.rev, conn_.fd, EV_READ); - ev_io_start(loop_, &wev_); + conn_.wlimit.startw(); break; } @@ -214,12 +199,12 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { http_parser_init(&response_htp_, HTTP_RESPONSE); response_htp_.data = downstream_; - ev_set_cb(&rev_, readcb); + ev_set_cb(&conn_.rev, readcb); - rt_.repeat = get_config()->downstream_read_timeout; - ev_timer_again(loop_, &rt_); + conn_.rt.repeat = get_config()->downstream_read_timeout; + ev_timer_again(conn_.loop, &conn_.rt); // TODO we should have timeout for connection establishment - ev_timer_again(loop_, &wt_); + ev_timer_again(conn_.loop, &conn_.wt); return 0; } @@ -421,7 +406,8 @@ int HttpDownstreamConnection::end_upload_data() { namespace { void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) { - auto dconn = static_cast(w->data); + auto conn = static_cast(w->data); + auto dconn = static_cast(conn->data); if (LOG_ENABLED(INFO)) { DCLOG(INFO, dconn) << "Idle connection EOF"; } @@ -433,7 +419,8 @@ void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) { namespace { void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { - auto dconn = static_cast(w->data); + auto conn = static_cast(w->data); + auto dconn = static_cast(conn->data); if (LOG_ENABLED(INFO)) { DCLOG(INFO, dconn) << "Idle connection timeout"; } @@ -450,16 +437,16 @@ void HttpDownstreamConnection::detach_downstream(Downstream *downstream) { downstream_ = nullptr; ioctrl_.force_resume_read(); - ev_io_start(loop_, &rev_); - ev_io_stop(loop_, &wev_); + conn_.rlimit.startw(); + conn_.wlimit.stopw(); - ev_set_cb(&rev_, idle_readcb); + ev_set_cb(&conn_.rev, idle_readcb); - ev_timer_stop(loop_, &wt_); + ev_timer_stop(conn_.loop, &conn_.wt); - rt_.repeat = get_config()->downstream_idle_read_timeout; - ev_set_cb(&rt_, idle_timeoutcb); - ev_timer_again(loop_, &rt_); + conn_.rt.repeat = get_config()->downstream_idle_read_timeout; + ev_set_cb(&conn_.rt, idle_timeoutcb); + ev_timer_again(conn_.loop, &conn_.rt); } void HttpDownstreamConnection::pause_read(IOCtrlReason reason) { @@ -649,25 +636,21 @@ http_parser_settings htp_hooks = { } // namespace int HttpDownstreamConnection::on_read() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); uint8_t buf[8192]; int rv; if (downstream_->get_upgraded()) { // For upgraded connection, just pass data to the upstream. for (;;) { - ssize_t nread; - while ((nread = read(fd_, buf, sizeof(buf))) == -1 && errno == EINTR) - ; - if (nread == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return 0; - } - return DownstreamConnection::ERR_NET; - } + auto nread = conn_.read_clear(buf, sizeof(buf)); if (nread == 0) { - return DownstreamConnection::ERR_EOF; + return 0; + } + + if (nread < 0) { + return nread; } rv = downstream_->get_upstream()->on_downstream_body(downstream_, buf, @@ -684,18 +667,14 @@ int HttpDownstreamConnection::on_read() { } for (;;) { - ssize_t nread; - while ((nread = read(fd_, buf, sizeof(buf))) == -1 && errno == EINTR) - ; - if (nread == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return 0; - } - return DownstreamConnection::ERR_NET; - } + auto nread = conn_.read_clear(buf, sizeof(buf)); if (nread == 0) { - return DownstreamConnection::ERR_EOF; + return 0; + } + + if (nread < 0) { + return nread; } auto nproc = http_parser_execute(&response_htp_, &htp_hooks, @@ -727,16 +706,8 @@ int HttpDownstreamConnection::on_read() { } } -#define DEFAULT_WR_IOVCNT 16 - -#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT -#define MAX_WR_IOVCNT IOV_MAX -#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT -#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT -#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT - int HttpDownstreamConnection::on_write() { - ev_timer_again(loop_, &rt_); + ev_timer_again(conn_.loop, &conn_.rt); auto upstream = downstream_->get_upstream(); auto input = downstream_->get_request_buf(); @@ -746,29 +717,22 @@ int HttpDownstreamConnection::on_write() { while (input->rleft() > 0) { auto iovcnt = input->riovec(iov, util::array_size(iov)); - ssize_t nwrite; - while ((nwrite = writev(fd_, iov, iovcnt)) == -1 && errno == EINTR) - ; - if (nwrite == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - ev_io_start(loop_, &wev_); - ev_timer_again(loop_, &wt_); - goto end; - } - return DownstreamConnection::ERR_NET; + auto nwrite = conn_.writev_clear(iov, iovcnt); + + if (nwrite == 0) { + return 0; } + + if (nwrite < 0) { + return nwrite; + } + input->drain(nwrite); } - if (input->rleft() == 0) { - ev_io_stop(loop_, &wev_); - ev_timer_stop(loop_, &wt_); - } else { - ev_io_start(loop_, &wev_); - ev_timer_again(loop_, &wt_); - } + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); -end: if (input->rleft() == 0) { upstream->resume_read(SHRPX_NO_BUFFER, downstream_, downstream_->get_request_datalen()); @@ -780,8 +744,8 @@ end: int HttpDownstreamConnection::on_connect() { auto connect_blocker = client_handler_->get_http1_connect_blocker(); - if (!util::check_socket_connected(fd_)) { - ev_io_stop(loop_, &wev_); + if (!util::check_socket_connected(conn_.fd)) { + conn_.wlimit.stopw(); if (LOG_ENABLED(INFO)) { DLOG(INFO, this) << "downstream connect failed"; @@ -792,14 +756,14 @@ int HttpDownstreamConnection::on_connect() { connect_blocker->on_success(); - ev_io_start(loop_, &rev_); - ev_set_cb(&wev_, writecb); + conn_.rlimit.startw(); + ev_set_cb(&conn_.wev, writecb); return 0; } void HttpDownstreamConnection::on_upstream_change(Upstream *upstream) {} -void HttpDownstreamConnection::signal_write() { ev_io_start(loop_, &wev_); } +void HttpDownstreamConnection::signal_write() { conn_.wlimit.startw(); } } // namespace shrpx diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index fb95848b..2eec5def 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -31,6 +31,7 @@ #include "shrpx_downstream_connection.h" #include "shrpx_io_control.h" +#include "shrpx_connection.h" namespace shrpx { @@ -62,17 +63,11 @@ public: void signal_write(); private: - ev_io wev_; - ev_io rev_; - ev_timer wt_; - ev_timer rt_; - RateLimit rlimit_; + Connection conn_; IOControl ioctrl_; http_parser response_htp_; - struct ev_loop *loop_; // index of get_config()->downstream_addrs this object is using size_t addr_idx_; - int fd_; }; } // namespace shrpx diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 34b04479..c8703987 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -477,7 +477,7 @@ int HttpsUpstream::downstream_read(DownstreamConnection *dconn) { downstream->detach_downstream_connection(); } - if (rv == DownstreamConnection::ERR_EOF) { + if (rv == SHRPX_ERR_EOF) { return downstream_eof(dconn); } @@ -494,7 +494,7 @@ end: int HttpsUpstream::downstream_write(DownstreamConnection *dconn) { int rv; rv = dconn->on_write(); - if (rv == DownstreamConnection::ERR_NET) { + if (rv == SHRPX_ERR_NETWORK) { return downstream_error(dconn, Downstream::EVENT_ERROR); } diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 3a202756..d3b1c80c 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -57,8 +57,6 @@ ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len, auto nread = wb->write(data, len); - handler->update_warmup_writelen(nread); - return nread; } } // namespace @@ -541,11 +539,11 @@ int SpdyUpstream::downstream_read(DownstreamConnection *dconn) { dconn = nullptr; } else { auto rv = downstream->on_read(); - if (rv == DownstreamConnection::ERR_EOF) { + if (rv == SHRPX_ERR_EOF) { return downstream_eof(dconn); } if (rv != 0) { - if (rv != DownstreamConnection::ERR_NET) { + if (rv != SHRPX_ERR_NETWORK) { if (LOG_ENABLED(INFO)) { DCLOG(INFO, dconn) << "HTTP parser failure"; } @@ -570,7 +568,7 @@ int SpdyUpstream::downstream_read(DownstreamConnection *dconn) { int SpdyUpstream::downstream_write(DownstreamConnection *dconn) { int rv; rv = dconn->on_write(); - if (rv == DownstreamConnection::ERR_NET) { + if (rv == SHRPX_ERR_NETWORK) { return downstream_error(dconn, Downstream::EVENT_ERROR); } if (rv != 0) { diff --git a/src/shrpx_ssl.cc b/src/shrpx_ssl.cc index b05deffe..e6e1aeb8 100644 --- a/src/shrpx_ssl.cc +++ b/src/shrpx_ssl.cc @@ -218,12 +218,14 @@ void info_callback(const SSL *ssl, int where, int ret) { // to disable it, we check that renegotiation is started in this // callback. if (where & SSL_CB_HANDSHAKE_START) { - auto handler = static_cast(SSL_get_app_data(ssl)); - if (handler && handler->get_tls_handshake()) { - handler->set_tls_renegotiation(true); + auto conn = static_cast(SSL_get_app_data(ssl)); + if (conn && conn->tls.initial_handshake_done) { + // We only set SSL_get_app_data for ClientHandler for now. + auto handler = static_cast(conn->data); if (LOG_ENABLED(INFO)) { CLOG(INFO, handler) << "TLS renegotiation started"; } + handler->start_immediate_shutdown(); } } }