diff --git a/lib/includes/nghttp2/nghttp2.h b/lib/includes/nghttp2/nghttp2.h index 6cedb5df..4b506157 100644 --- a/lib/includes/nghttp2/nghttp2.h +++ b/lib/includes/nghttp2/nghttp2.h @@ -1199,18 +1199,22 @@ typedef ssize_t (*nghttp2_send_callback)(nghttp2_session *session, * The application has to send complete DATA frame in this callback. * If all data were written successfully, return 0. * - * If it cannot send it all, just return + * If it cannot send any data at all, just return * :enum:`NGHTTP2_ERR_WOULDBLOCK`; the library will call this callback * with the same parameters later (It is recommended to send complete * DATA frame at once in this function to deal with error; if partial * frame data has already sent, it is impossible to send another data - * in that state, and all we can do is tear down connection). If - * application decided to reset this stream, return - * :enum:`NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE`, then the library - * will send RST_STREAM with INTERNAL_ERROR as error code. The - * application can also return :enum:`NGHTTP2_ERR_CALLBACK_FAILURE`, - * which will result in connection closure. Returning any other value - * is treated as :enum:`NGHTTP2_ERR_CALLBACK_FAILURE` is returned. + * in that state, and all we can do is tear down connection). When + * data is fully processed, but application wants to make + * `nghttp2_session_mem_send()` or `nghttp2_session_send()` return + * immediately without processing next frames, return + * :enum:`NGHTTP2_ERR_PAUSE`. If application decided to reset this + * stream, return :enum:`NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE`, then + * the library will send RST_STREAM with INTERNAL_ERROR as error code. + * The application can also return + * :enum:`NGHTTP2_ERR_CALLBACK_FAILURE`, which will result in + * connection closure. Returning any other value is treated as + * :enum:`NGHTTP2_ERR_CALLBACK_FAILURE` is returned. */ typedef int (*nghttp2_send_data_callback)(nghttp2_session *session, nghttp2_frame *frame, diff --git a/lib/nghttp2_session.c b/lib/nghttp2_session.c index 4b39ef4c..4ddb06f7 100644 --- a/lib/nghttp2_session.c +++ b/lib/nghttp2_session.c @@ -2610,12 +2610,15 @@ static int session_call_send_data(nghttp2_session *session, &aux_data->data_prd.source, session->user_data); - if (rv == 0 || rv == NGHTTP2_ERR_WOULDBLOCK || - rv == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) { + switch (rv) { + case 0: + case NGHTTP2_ERR_WOULDBLOCK: + case NGHTTP2_ERR_PAUSE: + case NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE: return rv; + default: + return NGHTTP2_ERR_CALLBACK_FAILURE; } - - return NGHTTP2_ERR_CALLBACK_FAILURE; } static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session, @@ -2790,6 +2793,7 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session, case NGHTTP2_OB_SEND_NO_COPY: { nghttp2_stream *stream; nghttp2_frame *frame; + int pause; DEBUGF(fprintf(stderr, "send: no copy DATA\n")); @@ -2833,7 +2837,7 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session, return 0; } - assert(rv == 0); + pause = (rv == NGHTTP2_ERR_PAUSE); rv = session_after_frame_sent1(session); if (rv < 0) { @@ -2848,6 +2852,10 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session, /* We have already adjusted the next state */ + if (pause) { + return 0; + } + break; } case NGHTTP2_OB_SEND_CLIENT_MAGIC: { diff --git a/src/memchunk.h b/src/memchunk.h index 24f0f27b..e7c22d72 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -222,7 +222,7 @@ template struct Memchunks { } return ndata - count; } - int riovec(struct iovec *iov, int iovcnt) { + int riovec(struct iovec *iov, int iovcnt) const { if (!head) { return 0; } diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 886b5516..86ab02b3 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -135,68 +135,30 @@ int ClientHandler::read_clear() { } int ClientHandler::write_clear() { + std::array iov; + ev_timer_again(conn_.loop, &conn_.rt); for (;;) { - if (wb_.rleft() > 0) { - auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft()); - if (nwrite == 0) { - return 0; - } - if (nwrite < 0) { - return -1; - } - wb_.drain(nwrite); - continue; - } - wb_.reset(); if (on_write() != 0) { return -1; } - if (wb_.rleft() == 0) { + + auto iovcnt = upstream_->response_riovec(iov.data(), iov.size()); + if (iovcnt == 0) { break; } - } - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; -} - -int ClientHandler::writev_clear() { - ev_timer_again(conn_.loop, &conn_.rt); - - auto buf = upstream_->get_response_buf(); - if (!buf) { - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; - } - - for (;;) { - if (buf->rleft() > 0) { - std::array iov; - auto iovcnt = buf->riovec(iov.data(), iov.size()); - auto nwrite = conn_.writev_clear(iov.data(), iovcnt); - if (nwrite == 0) { - return 0; - } - if (nwrite < 0) { - return -1; - } - buf->drain(nwrite); - continue; - } - if (on_write() != 0) { + auto nwrite = conn_.writev_clear(iov.data(), iovcnt); + if (nwrite < 0) { return -1; } - // buf may be destroyed inside on_write() - buf = upstream_->get_response_buf(); - if (!buf || buf->rleft() == 0) { - break; + + if (nwrite == 0) { + return 0; } + + upstream_->response_drain(nwrite); } conn_.wlimit.stopw(); @@ -229,12 +191,7 @@ int ClientHandler::tls_handshake() { } read_ = &ClientHandler::read_tls; - - if (alpn_ == "http/1.1") { - write_ = &ClientHandler::writev_tls; - } else { - write_ = &ClientHandler::write_tls; - } + write_ = &ClientHandler::write_tls; return 0; } @@ -271,85 +228,33 @@ int ClientHandler::read_tls() { } int ClientHandler::write_tls() { + struct iovec iov; + ev_timer_again(conn_.loop, &conn_.rt); ERR_clear_error(); for (;;) { - if (wb_.rleft() > 0) { - auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft()); - - if (nwrite == 0) { - return 0; - } - - if (nwrite < 0) { - return -1; - } - - wb_.drain(nwrite); - - continue; - } - wb_.reset(); if (on_write() != 0) { return -1; } - if (wb_.rleft() == 0) { + + auto iovcnt = upstream_->response_riovec(&iov, 1); + if (iovcnt == 0) { conn_.start_tls_write_idle(); break; } - } - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; -} - -int ClientHandler::writev_tls() { - ev_timer_again(conn_.loop, &conn_.rt); - - auto buf = upstream_->get_response_buf(); - if (!buf) { - conn_.wlimit.stopw(); - ev_timer_stop(conn_.loop, &conn_.wt); - - return 0; - } - - ERR_clear_error(); - - for (;;) { - if (buf->rleft() > 0) { - iovec iov; - auto iovcnt = buf->riovec(&iov, 1); - if (iovcnt == 0) { - return 0; - } - - auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len); - - if (nwrite == 0) { - return 0; - } - - if (nwrite < 0) { - return -1; - } - - buf->drain(nwrite); - - continue; - } - if (on_write() != 0) { + auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len); + if (nwrite < 0) { return -1; } - buf = upstream_->get_response_buf(); - if (!buf || buf->rleft() == 0) { - conn_.start_tls_write_idle(); - break; + + if (nwrite == 0) { + return 0; } + + upstream_->response_drain(nwrite); } conn_.wlimit.stopw(); @@ -374,9 +279,7 @@ int ClientHandler::upstream_write() { return -1; } - if (get_should_close_after_write() && wb_.rleft() == 0 && - (!upstream_->get_response_buf() || - upstream_->get_response_buf()->rleft() == 0)) { + if (get_should_close_after_write() && upstream_->response_empty()) { return -1; } @@ -506,7 +409,7 @@ void ClientHandler::setup_upstream_io_callback() { upstream_ = make_unique(this); alpn_ = "http/1.1"; read_ = &ClientHandler::read_clear; - write_ = &ClientHandler::writev_clear; + write_ = &ClientHandler::write_clear; on_read_ = &ClientHandler::upstream_http1_connhd_read; on_write_ = &ClientHandler::upstream_noop; } @@ -818,7 +721,6 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { } // http pointer is now owned by upstream. upstream_.release(); - upstream_ = std::move(upstream); // TODO We might get other version id in HTTP2-settings, if we // support aliasing for h2, but we just use library default for now. alpn_ = NGHTTP2_CLEARTEXT_PROTO_VERSION_ID; @@ -829,7 +731,9 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { "Connection: Upgrade\r\n" "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n" "\r\n"; - wb_.write(res, sizeof(res) - 1); + upstream->get_response_buf()->write(res, sizeof(res) - 1); + upstream_ = std::move(upstream); + signal_write(); return 0; } @@ -932,8 +836,6 @@ void ClientHandler::write_accesslog(int major, int minor, unsigned int status, }); } -ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; } - ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; } void ClientHandler::signal_write() { conn_.wlimit.startw(); } diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 1a243479..b05013c8 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -60,13 +60,11 @@ public: // Performs clear text I/O int read_clear(); int write_clear(); - int writev_clear(); // Performs TLS handshake int tls_handshake(); // Performs TLS I/O int read_tls(); int write_tls(); - int writev_tls(); int upstream_noop(); int upstream_read(); @@ -124,10 +122,8 @@ public: int64_t body_bytes_sent); Worker *get_worker() const; - using WriteBuf = Buffer<32768>; using ReadBuf = Buffer<8_k>; - WriteBuf *get_wb(); ReadBuf *get_rb(); RateLimit *get_rlimit(); @@ -153,7 +149,6 @@ private: // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; bool should_close_after_write_; - WriteBuf wb_; ReadBuf rb_; }; diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index eefa99f2..a1f73030 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -1241,4 +1241,8 @@ bool Downstream::can_detach_downstream_connection() const { !response_connection_close_; } +DefaultMemchunks Downstream::pop_response_buf() { + return std::move(response_buf_); +} + } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index f90e80ce..6960a236 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -342,6 +342,8 @@ public: // Returns true if downstream_connection can be detached and reused. bool can_detach_downstream_connection() const; + DefaultMemchunks pop_response_buf(); + enum { EVENT_ERROR = 0x1, EVENT_TIMEOUT = 0x2, diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index bcffba14..82858fab 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -647,6 +647,57 @@ int on_frame_not_send_callback(nghttp2_session *session, } } // namespace +void Http2Upstream::set_pending_data_downstream(Downstream *downstream, + size_t n) { + pending_data_downstream_ = downstream; + data_pendinglen_ = n; +} + +namespace { +int send_data_callback(nghttp2_session *session, nghttp2_frame *frame, + const uint8_t *framehd, size_t length, + nghttp2_data_source *source, void *user_data) { + auto downstream = static_cast(source->ptr); + auto upstream = static_cast(downstream->get_upstream()); + auto body = downstream->get_response_buf(); + + auto wb = upstream->get_response_buf(); + + if (wb->wleft() < 9) { + return NGHTTP2_ERR_WOULDBLOCK; + } + + wb->write(framehd, 9); + + auto nwrite = std::min(length, wb->wleft()); + body->remove(wb->last, nwrite); + wb->write(nwrite); + if (nwrite < length) { + // We must store unsent amount of data to somewhere. We just tell + // libnghttp2 that we wrote everything, so downstream could be + // deleted. We handle this situation in + // Http2Upstream::remove_downstream(). + upstream->set_pending_data_downstream(downstream, length - nwrite); + } + + if (wb->rleft() == 0) { + downstream->disable_upstream_wtimer(); + } else { + downstream->reset_upstream_wtimer(); + } + + if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) { + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + + if (length > 0) { + downstream->add_response_sent_bodylen(length); + } + + return nwrite < length ? NGHTTP2_ERR_PAUSE : 0; +} +} // namespace + namespace { uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) { // NGHTTP2_REFUSED_STREAM is important because it tells upstream @@ -748,6 +799,9 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() { nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks, on_begin_headers_callback); + nghttp2_session_callbacks_set_send_data_callback(callbacks, + send_data_callback); + if (get_config()->padding) { nghttp2_session_callbacks_set_select_padding_callback( callbacks, http::select_padding_callback); @@ -764,8 +818,9 @@ Http2Upstream::Http2Upstream(ClientHandler *handler) ? get_config()->downstream_connections_per_frontend : 0, !get_config()->http2_proxy), - handler_(handler), session_(nullptr), data_pending_(nullptr), - data_pendinglen_(0), shutdown_handled_(false) { + pending_response_buf_(handler->get_worker()->get_mcpool()), + pending_data_downstream_(nullptr), handler_(handler), session_(nullptr), + data_pending_(nullptr), data_pendinglen_(0), shutdown_handled_(false) { int rv; @@ -852,9 +907,8 @@ int Http2Upstream::on_read() { rlimit->startw(); } - auto wb = handler_->get_wb(); if (nghttp2_session_want_read(session_) == 0 && - nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) { + nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) { if (LOG_ENABLED(INFO)) { ULOG(INFO, this) << "No more read/write for this HTTP2 session"; } @@ -867,19 +921,46 @@ int Http2Upstream::on_read() { // After this function call, downstream may be deleted. int Http2Upstream::on_write() { - auto wb = handler_->get_wb(); + if (wb_.rleft() == 0) { + wb_.reset(); + } - if (data_pending_) { - auto n = std::min(wb->wleft(), data_pendinglen_); - wb->write(data_pending_, n); - if (n < data_pendinglen_) { + if (data_pendinglen_ > 0) { + if (data_pending_) { + auto n = std::min(wb_.wleft(), data_pendinglen_); + wb_.write(data_pending_, n); data_pending_ += n; data_pendinglen_ -= n; - return 0; - } - data_pending_ = nullptr; - data_pendinglen_ = 0; + if (data_pendinglen_ > 0) { + return 0; + } + + data_pending_ = nullptr; + } else { + auto n = std::min(wb_.wleft(), data_pendinglen_); + DefaultMemchunks *body; + if (pending_data_downstream_) { + body = pending_data_downstream_->get_response_buf(); + } else { + body = &pending_response_buf_; + } + body->remove(wb_.last, n); + wb_.write(n); + data_pendinglen_ -= n; + + if (data_pendinglen_ > 0) { + return 0; + } + + if (pending_data_downstream_) { + pending_data_downstream_ = nullptr; + } else { + // Downstream was already deleted, and we don't need its + // response data. + body->reset(); + } + } } for (;;) { @@ -894,7 +975,7 @@ int Http2Upstream::on_write() { if (datalen == 0) { break; } - auto n = wb->write(data, datalen); + auto n = wb_.write(data, datalen); if (n < static_cast(datalen)) { data_pending_ = data + n; data_pendinglen_ = datalen - n; @@ -903,7 +984,7 @@ int Http2Upstream::on_write() { } if (nghttp2_session_want_read(session_) == 0 && - nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) { + nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) { if (LOG_ENABLED(INFO)) { ULOG(INFO, this) << "No more read/write for this HTTP2 session"; } @@ -1117,8 +1198,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, } } - auto nread = body->remove(buf, length); - auto body_empty = body->rleft() == 0; + auto nread = std::min(body->rleft(), length); + auto body_empty = body->rleft() == nread; + + *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; if (body_empty && downstream->get_response_state() == Downstream::MSG_COMPLETE) { @@ -1146,24 +1229,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, } } - if (body_empty) { - downstream->disable_upstream_wtimer(); - } else { - downstream->reset_upstream_wtimer(); - } - - if (nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { return NGHTTP2_ERR_DEFERRED; } - if (nread > 0) { - downstream->add_response_sent_bodylen(nread); - } - return nread; } } // namespace @@ -1274,6 +1343,11 @@ void Http2Upstream::remove_downstream(Downstream *downstream) { nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(), nullptr); + if (downstream == pending_data_downstream_) { + pending_data_downstream_ = nullptr; + pending_response_buf_ = downstream->pop_response_buf(); + } + auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream); if (next_downstream) { @@ -1755,4 +1829,21 @@ int Http2Upstream::initiate_push(Downstream *downstream, const char *uri, return 0; } +int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const { + if (iovcnt == 0 || wb_.rleft() == 0) { + return 0; + } + + iov->iov_base = wb_.pos; + iov->iov_len = wb_.rleft(); + + return 1; +} + +void Http2Upstream::response_drain(size_t n) { wb_.drain(n); } + +bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; } + +Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; } + } // namespace shrpx diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index cc877f5c..6dac8c9a 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -36,6 +36,7 @@ #include "shrpx_upstream.h" #include "shrpx_downstream_queue.h" #include "memchunk.h" +#include "buffer.h" using namespace nghttp2; @@ -82,6 +83,9 @@ public: size_t bodylen); virtual int initiate_push(Downstream *downstream, const char *uri, size_t len); + virtual int response_riovec(struct iovec *iov, int iovcnt) const; + virtual void response_drain(size_t n); + virtual bool response_empty() const; bool get_flow_control() const; // Perform HTTP/2 upgrade from |upstream|. On success, this object @@ -106,15 +110,39 @@ public: int on_request_headers(Downstream *downstream, const nghttp2_frame *frame); + using WriteBuffer = Buffer<32_k>; + + WriteBuffer *get_response_buf(); + + void set_pending_data_downstream(Downstream *downstream, size_t n); + private: + WriteBuffer wb_; std::unique_ptr pre_upstream_; DownstreamQueue downstream_queue_; ev_timer settings_timer_; ev_timer shutdown_timer_; ev_prepare prep_; + // A response buffer used to belong to Downstream object. This is + // moved here when response is partially written to wb_ in + // send_data_callback, but before writing them all, Downstream + // object was destroyed. On destruction of Downstream, + // pending_data_downstream_ becomes nullptr. + DefaultMemchunks pending_response_buf_; + // Downstream object whose DATA frame payload is partillay written + // to wb_ in send_data_callback. This field exists to keep track of + // its lifetime. When it is destroyed, its response buffer is + // transferred to pending_response_buf_, and this field becomes + // nullptr. + Downstream *pending_data_downstream_; ClientHandler *handler_; nghttp2_session *session_; const uint8_t *data_pending_; + // The length of lending data to be written into wb_. If + // data_pending_ is not nullptr, data_pending_ points to the data to + // write. Otherwise, pending_data_downstream_->get_response_buf() + // if pending_data_downstream_ is not nullptr, or + // pending_response_buf_ holds data to write. size_t data_pendinglen_; bool flow_control_; bool shutdown_handled_; diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 853dc087..31c444f1 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -455,7 +455,7 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) { int HttpDownstreamConnection::resume_read(IOCtrlReason reason, size_t consumed) { - if (!downstream_->response_buf_full()) { + if (downstream_->get_response_buf()->rleft() == 0) { ioctrl_.resume_read(reason); } diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 3e11c7ea..7e5398ef 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -1160,12 +1160,34 @@ int HttpsUpstream::initiate_push(Downstream *downstream, const char *uri, return 0; } -DefaultMemchunks *HttpsUpstream::get_response_buf() const { +int HttpsUpstream::response_riovec(struct iovec *iov, int iovcnt) const { if (!downstream_) { - return nullptr; + return 0; } - return downstream_->get_response_buf(); + auto buf = downstream_->get_response_buf(); + + return buf->riovec(iov, iovcnt); +} + +void HttpsUpstream::response_drain(size_t n) { + if (!downstream_) { + return; + } + + auto buf = downstream_->get_response_buf(); + + buf->drain(n); +} + +bool HttpsUpstream::response_empty() const { + if (!downstream_) { + return true; + } + + auto buf = downstream_->get_response_buf(); + + return buf->rleft() == 0; } } // namespace shrpx diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index df390e3f..6f3b5742 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -78,7 +78,9 @@ public: size_t bodylen); virtual int initiate_push(Downstream *downstream, const char *uri, size_t len); - virtual DefaultMemchunks *get_response_buf() const; + virtual int response_riovec(struct iovec *iov, int iovcnt) const; + virtual void response_drain(size_t n); + virtual bool response_empty() const; void reset_current_header_length(); void log_response_headers(DefaultMemchunks *buf) const; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 0be56597..6b45a2e6 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -53,8 +53,7 @@ namespace { ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len, int flags, void *user_data) { auto upstream = static_cast(user_data); - auto handler = upstream->get_client_handler(); - auto wb = handler->get_wb(); + auto wb = upstream->get_response_buf(); if (wb->wleft() == 0) { return SPDYLAY_ERR_WOULDBLOCK; @@ -555,6 +554,10 @@ int SpdyUpstream::on_read() { int SpdyUpstream::on_write() { int rv = 0; + if (wb_.rleft() == 0) { + wb_.reset(); + } + rv = spdylay_session_send(session_); if (rv != 0) { ULOG(ERROR, this) << "spdylay_session_send() returned error: " @@ -563,8 +566,7 @@ int SpdyUpstream::on_write() { } if (spdylay_session_want_read(session_) == 0 && - spdylay_session_want_write(session_) == 0 && - handler_->get_wb()->rleft() == 0) { + spdylay_session_want_write(session_) == 0 && wb_.rleft() == 0) { if (LOG_ENABLED(INFO)) { ULOG(INFO, this) << "No more read/write for this SPDY session"; } @@ -1213,4 +1215,21 @@ int SpdyUpstream::initiate_push(Downstream *downstream, const char *uri, return 0; } +int SpdyUpstream::response_riovec(struct iovec *iov, int iovcnt) const { + if (iovcnt == 0 || wb_.rleft() == 0) { + return 0; + } + + iov->iov_base = wb_.pos; + iov->iov_len = wb_.rleft(); + + return 1; +} + +void SpdyUpstream::response_drain(size_t n) { wb_.drain(n); } + +bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; } + +SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; } + } // namespace shrpx diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index b39f885f..7d509340 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -36,7 +36,7 @@ #include "shrpx_upstream.h" #include "shrpx_downstream_queue.h" #include "memchunk.h" -#include "util.h" +#include "buffer.h" namespace shrpx { @@ -78,6 +78,9 @@ public: size_t bodylen); virtual int initiate_push(Downstream *downstream, const char *uri, size_t len); + virtual int response_riovec(struct iovec *iov, int iovcnt) const; + virtual void response_drain(size_t n); + virtual bool response_empty() const; bool get_flow_control() const; @@ -86,7 +89,12 @@ public: void start_downstream(Downstream *downstream); void initiate_downstream(Downstream *downstream); + using WriteBuffer = Buffer<32_k>; + + WriteBuffer *get_response_buf(); + private: + WriteBuffer wb_; DownstreamQueue downstream_queue_; ClientHandler *handler_; spdylay_session *session_; diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index 8d80097f..92fe5190 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -71,9 +71,11 @@ public: virtual int initiate_push(Downstream *downstream, const char *uri, size_t len) = 0; - // Returns response buffer of Downstream directly. This exists for - // optimization purpose for cleartext HttpsUpstream. - virtual DefaultMemchunks *get_response_buf() const { return nullptr; } + // Fills response data in |iov| whose capacity is |iovcnt|. Returns + // the number of iovs filled. + virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0; + virtual void response_drain(size_t n) = 0; + virtual bool response_empty() const = 0; }; } // namespace shrpx