diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 886b5516..d2947bcd 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -135,68 +135,30 @@ int ClientHandler::read_clear() { } int ClientHandler::write_clear() { + std::array iov; + ev_timer_again(conn_.loop, &conn_.rt); for (;;) { - if (wb_.rleft() > 0) { - auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft()); - if (nwrite == 0) { - return 0; - } - if (nwrite < 0) { - return -1; - } - wb_.drain(nwrite); - continue; - } - wb_.reset(); if (on_write() != 0) { return -1; } - if (wb_.rleft() == 0) { + + auto iovcnt = upstream_->response_riovec(iov.data(), iov.size()); + if (iovcnt == 0) { break; } - } - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; -} - -int ClientHandler::writev_clear() { - ev_timer_again(conn_.loop, &conn_.rt); - - auto buf = upstream_->get_response_buf(); - if (!buf) { - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; - } - - for (;;) { - if (buf->rleft() > 0) { - std::array iov; - auto iovcnt = buf->riovec(iov.data(), iov.size()); - auto nwrite = conn_.writev_clear(iov.data(), iovcnt); - if (nwrite == 0) { - return 0; - } - if (nwrite < 0) { - return -1; - } - buf->drain(nwrite); - continue; - } - if (on_write() != 0) { + auto nwrite = conn_.writev_clear(iov.data(), iovcnt); + if (nwrite < 0) { return -1; } - // buf may be destroyed inside on_write() - buf = upstream_->get_response_buf(); - if (!buf || buf->rleft() == 0) { - break; + + if (nwrite == 0) { + return 0; } + + upstream_->response_drain(nwrite); } conn_.wlimit.stopw(); @@ -229,12 +191,7 @@ int ClientHandler::tls_handshake() { } read_ = &ClientHandler::read_tls; - - if (alpn_ == "http/1.1") { - write_ = &ClientHandler::writev_tls; - } else { - write_ = &ClientHandler::write_tls; - } + write_ = &ClientHandler::write_tls; return 0; } @@ -271,85 +228,33 @@ int ClientHandler::read_tls() { } int ClientHandler::write_tls() { + struct iovec iov; + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); for (;;) { - if (wb_.rleft() > 0) { - auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft()); - - if (nwrite == 0) { - return 0; - } - - if (nwrite < 0) { - return -1; - } - - wb_.drain(nwrite); - - continue; - } - wb_.reset(); if (on_write() != 0) { return -1; } - if (wb_.rleft() == 0) { + + auto iovcnt = upstream_->response_riovec(&iov, 1); + if (iovcnt == 0) { conn_.start_tls_write_idle(); break; } - } - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; -} - -int ClientHandler::writev_tls() { - ev_timer_again(conn_.loop, &conn_.rt); - - auto buf = upstream_->get_response_buf(); - if (!buf) { - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; - } - - ERR_clear_error(); - - for (;;) { - if (buf->rleft() > 0) { - iovec iov; - auto iovcnt = buf->riovec(&iov, 1); - if (iovcnt == 0) { - return 0; - } - - auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len); - - if (nwrite == 0) { - return 0; - } - - if (nwrite < 0) { - return -1; - } - - buf->drain(nwrite); - - continue; - } - if (on_write() != 0) { + auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len); + if (nwrite < 0) { return -1; } - buf = upstream_->get_response_buf(); - if (!buf || buf->rleft() == 0) { - conn_.start_tls_write_idle(); - break; + + if (nwrite == 0) { + return 0; } + + upstream_->response_drain(nwrite); } conn_.wlimit.stopw(); @@ -374,9 +279,7 @@ int ClientHandler::upstream_write() { return -1; } - if (get_should_close_after_write() && wb_.rleft() == 0 && - (!upstream_->get_response_buf() || - upstream_->get_response_buf()->rleft() == 0)) { + if (get_should_close_after_write() && upstream_->response_empty()) { return -1; } @@ -506,7 +409,7 @@ void ClientHandler::setup_upstream_io_callback() { upstream_ = make_unique(this); alpn_ = "http/1.1"; read_ = &ClientHandler::read_clear; - write_ = &ClientHandler::writev_clear; + write_ = &ClientHandler::write_clear; on_read_ = &ClientHandler::upstream_http1_connhd_read; on_write_ = &ClientHandler::upstream_noop; } @@ -818,7 +721,6 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { } // http pointer is now owned by upstream. upstream_.release(); - upstream_ = std::move(upstream); // TODO We might get other version id in HTTP2-settings, if we // support aliasing for h2, but we just use library default for now. alpn_ = NGHTTP2_CLEARTEXT_PROTO_VERSION_ID; @@ -829,7 +731,9 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { "Connection: Upgrade\r\n" "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n" "\r\n"; - wb_.write(res, sizeof(res) - 1); + upstream->response_write(res, sizeof(res) - 1); + upstream_ = std::move(upstream); + signal_write(); return 0; } @@ -932,8 +836,6 @@ void ClientHandler::write_accesslog(int major, int minor, unsigned int status, }); } -ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; } - ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; } void ClientHandler::signal_write() { conn_.wlimit.startw(); } diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 1a243479..b05013c8 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -60,13 +60,11 @@ public: // Performs clear text I/O int read_clear(); int write_clear(); - int writev_clear(); // Performs TLS handshake int tls_handshake(); // Performs TLS I/O int read_tls(); int write_tls(); - int writev_tls(); int upstream_noop(); int upstream_read(); @@ -124,10 +122,8 @@ public: int64_t body_bytes_sent); Worker *get_worker() const; - using WriteBuf = Buffer<32768>; using ReadBuf = Buffer<8_k>; - WriteBuf *get_wb(); ReadBuf *get_rb(); RateLimit *get_rlimit(); @@ -153,7 +149,6 @@ private: // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; bool should_close_after_write_; - WriteBuf wb_; ReadBuf rb_; }; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index bcffba14..c885f2e7 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -852,9 +852,8 @@ int Http2Upstream::on_read() { rlimit->startw(); } - auto wb = handler_->get_wb(); if (nghttp2_session_want_read(session_) == 0 && - nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) { + nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) { if (LOG_ENABLED(INFO)) { ULOG(INFO, this) << "No more read/write for this HTTP2 session"; } @@ -867,11 +866,13 @@ int Http2Upstream::on_read() { // After this function call, downstream may be deleted. int Http2Upstream::on_write() { - auto wb = handler_->get_wb(); + if (wb_.rleft() == 0) { + wb_.reset(); + } if (data_pending_) { - auto n = std::min(wb->wleft(), data_pendinglen_); - wb->write(data_pending_, n); + auto n = std::min(wb_.wleft(), data_pendinglen_); + wb_.write(data_pending_, n); if (n < data_pendinglen_) { data_pending_ += n; data_pendinglen_ -= n; @@ -894,7 +895,7 @@ int Http2Upstream::on_write() { if (datalen == 0) { break; } - auto n = wb->write(data, datalen); + auto n = wb_.write(data, datalen); if (n < static_cast(datalen)) { data_pending_ = data + n; data_pendinglen_ = datalen - n; @@ -903,7 +904,7 @@ int Http2Upstream::on_write() { } if (nghttp2_session_want_read(session_) == 0 && - nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) { + nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) { if (LOG_ENABLED(INFO)) { ULOG(INFO, this) << "No more read/write for this HTTP2 session"; } @@ -1755,4 +1756,23 @@ int Http2Upstream::initiate_push(Downstream *downstream, const char *uri, return 0; } +int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const { + if (iovcnt == 0 || wb_.rleft() == 0) { + return 0; + } + + iov->iov_base = wb_.pos; + iov->iov_len = wb_.rleft(); + + return 1; +} + +void Http2Upstream::response_drain(size_t n) { wb_.drain(n); } + +bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; } + +void Http2Upstream::response_write(void *data, size_t len) { + wb_.write(data, len); +} + } // namespace shrpx diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index cc877f5c..11be767c 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -36,6 +36,7 @@ #include "shrpx_upstream.h" #include "shrpx_downstream_queue.h" #include "memchunk.h" +#include "buffer.h" using namespace nghttp2; @@ -82,6 +83,11 @@ public: size_t bodylen); virtual int initiate_push(Downstream *downstream, const char *uri, size_t len); + virtual int response_riovec(struct iovec *iov, int iovcnt) const; + virtual void response_drain(size_t n); + virtual bool response_empty() const; + + void response_write(void *data, size_t len); bool get_flow_control() const; // Perform HTTP/2 upgrade from |upstream|. On success, this object @@ -106,7 +112,10 @@ public: int on_request_headers(Downstream *downstream, const nghttp2_frame *frame); + using WriteBuffer = Buffer<32_k>; + private: + WriteBuffer wb_; std::unique_ptr pre_upstream_; DownstreamQueue downstream_queue_; ev_timer settings_timer_; diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 3e11c7ea..7e5398ef 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -1160,12 +1160,34 @@ int HttpsUpstream::initiate_push(Downstream *downstream, const char *uri, return 0; } -DefaultMemchunks *HttpsUpstream::get_response_buf() const { +int HttpsUpstream::response_riovec(struct iovec *iov, int iovcnt) const { if (!downstream_) { - return nullptr; + return 0; } - return downstream_->get_response_buf(); + auto buf = downstream_->get_response_buf(); + + return buf->riovec(iov, iovcnt); +} + +void HttpsUpstream::response_drain(size_t n) { + if (!downstream_) { + return; + } + + auto buf = downstream_->get_response_buf(); + + buf->drain(n); +} + +bool HttpsUpstream::response_empty() const { + if (!downstream_) { + return true; + } + + auto buf = downstream_->get_response_buf(); + + return buf->rleft() == 0; } } // namespace shrpx diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index df390e3f..6f3b5742 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -78,7 +78,9 @@ public: size_t bodylen); virtual int initiate_push(Downstream *downstream, const char *uri, size_t len); - virtual DefaultMemchunks *get_response_buf() const; + virtual int response_riovec(struct iovec *iov, int iovcnt) const; + virtual void response_drain(size_t n); + virtual bool response_empty() const; void reset_current_header_length(); void log_response_headers(DefaultMemchunks *buf) const; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 0be56597..6b45a2e6 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -53,8 +53,7 @@ namespace { ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len, int flags, void *user_data) { auto upstream = static_cast(user_data); - auto handler = upstream->get_client_handler(); - auto wb = handler->get_wb(); + auto wb = upstream->get_response_buf(); if (wb->wleft() == 0) { return SPDYLAY_ERR_WOULDBLOCK; @@ -555,6 +554,10 @@ int SpdyUpstream::on_read() { int SpdyUpstream::on_write() { int rv = 0; + if (wb_.rleft() == 0) { + wb_.reset(); + } + rv = spdylay_session_send(session_); if (rv != 0) { ULOG(ERROR, this) << "spdylay_session_send() returned error: " @@ -563,8 +566,7 @@ int SpdyUpstream::on_write() { } if (spdylay_session_want_read(session_) == 0 && - spdylay_session_want_write(session_) == 0 && - handler_->get_wb()->rleft() == 0) { + spdylay_session_want_write(session_) == 0 && wb_.rleft() == 0) { if (LOG_ENABLED(INFO)) { ULOG(INFO, this) << "No more read/write for this SPDY session"; } @@ -1213,4 +1215,21 @@ int SpdyUpstream::initiate_push(Downstream *downstream, const char *uri, return 0; } +int SpdyUpstream::response_riovec(struct iovec *iov, int iovcnt) const { + if (iovcnt == 0 || wb_.rleft() == 0) { + return 0; + } + + iov->iov_base = wb_.pos; + iov->iov_len = wb_.rleft(); + + return 1; +} + +void SpdyUpstream::response_drain(size_t n) { wb_.drain(n); } + +bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; } + +SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; } + } // namespace shrpx diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index b39f885f..7d509340 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -36,7 +36,7 @@ #include "shrpx_upstream.h" #include "shrpx_downstream_queue.h" #include "memchunk.h" -#include "util.h" +#include "buffer.h" namespace shrpx { @@ -78,6 +78,9 @@ public: size_t bodylen); virtual int initiate_push(Downstream *downstream, const char *uri, size_t len); + virtual int response_riovec(struct iovec *iov, int iovcnt) const; + virtual void response_drain(size_t n); + virtual bool response_empty() const; bool get_flow_control() const; @@ -86,7 +89,12 @@ public: void start_downstream(Downstream *downstream); void initiate_downstream(Downstream *downstream); + using WriteBuffer = Buffer<32_k>; + + WriteBuffer *get_response_buf(); + private: + WriteBuffer wb_; DownstreamQueue downstream_queue_; ClientHandler *handler_; spdylay_session *session_; diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index 8d80097f..92fe5190 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -71,9 +71,11 @@ public: virtual int initiate_push(Downstream *downstream, const char *uri, size_t len) = 0; - // Returns response buffer of Downstream directly. This exists for - // optimization purpose for cleartext HttpsUpstream. - virtual DefaultMemchunks *get_response_buf() const { return nullptr; } + // Fills response data in |iov| whose capacity is |iovcnt|. Returns + // the number of iovs filled. + virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0; + virtual void response_drain(size_t n) = 0; + virtual bool response_empty() const = 0; }; } // namespace shrpx