nghttpx: Refactor client handler write
Move write buffer to Upstream objects
This commit is contained in:
parent
fe79b6d118
commit
da89f9c150
|
@ -135,68 +135,30 @@ int ClientHandler::read_clear() {
|
|||
}
|
||||
|
||||
int ClientHandler::write_clear() {
|
||||
std::array<iovec, 2> iov;
|
||||
|
||||
ev_timer_again(conn_.loop, &conn_.rt);
|
||||
|
||||
for (;;) {
|
||||
if (wb_.rleft() > 0) {
|
||||
auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft());
|
||||
if (nwrite == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (nwrite < 0) {
|
||||
return -1;
|
||||
}
|
||||
wb_.drain(nwrite);
|
||||
continue;
|
||||
}
|
||||
wb_.reset();
|
||||
if (on_write() != 0) {
|
||||
return -1;
|
||||
}
|
||||
if (wb_.rleft() == 0) {
|
||||
|
||||
auto iovcnt = upstream_->response_riovec(iov.data(), iov.size());
|
||||
if (iovcnt == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
conn_.wlimit.stopw();
|
||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ClientHandler::writev_clear() {
|
||||
ev_timer_again(conn_.loop, &conn_.rt);
|
||||
|
||||
auto buf = upstream_->get_response_buf();
|
||||
if (!buf) {
|
||||
conn_.wlimit.stopw();
|
||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (buf->rleft() > 0) {
|
||||
std::array<iovec, 2> iov;
|
||||
auto iovcnt = buf->riovec(iov.data(), iov.size());
|
||||
auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
|
||||
if (nwrite == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (nwrite < 0) {
|
||||
return -1;
|
||||
}
|
||||
buf->drain(nwrite);
|
||||
continue;
|
||||
}
|
||||
if (on_write() != 0) {
|
||||
auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
|
||||
if (nwrite < 0) {
|
||||
return -1;
|
||||
}
|
||||
// buf may be destroyed inside on_write()
|
||||
buf = upstream_->get_response_buf();
|
||||
if (!buf || buf->rleft() == 0) {
|
||||
break;
|
||||
|
||||
if (nwrite == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
upstream_->response_drain(nwrite);
|
||||
}
|
||||
|
||||
conn_.wlimit.stopw();
|
||||
|
@ -229,12 +191,7 @@ int ClientHandler::tls_handshake() {
|
|||
}
|
||||
|
||||
read_ = &ClientHandler::read_tls;
|
||||
|
||||
if (alpn_ == "http/1.1") {
|
||||
write_ = &ClientHandler::writev_tls;
|
||||
} else {
|
||||
write_ = &ClientHandler::write_tls;
|
||||
}
|
||||
write_ = &ClientHandler::write_tls;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -271,85 +228,33 @@ int ClientHandler::read_tls() {
|
|||
}
|
||||
|
||||
int ClientHandler::write_tls() {
|
||||
struct iovec iov;
|
||||
|
||||
ev_timer_again(conn_.loop, &conn_.rt);
|
||||
|
||||
ERR_clear_error();
|
||||
|
||||
for (;;) {
|
||||
if (wb_.rleft() > 0) {
|
||||
auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft());
|
||||
|
||||
if (nwrite == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (nwrite < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
wb_.drain(nwrite);
|
||||
|
||||
continue;
|
||||
}
|
||||
wb_.reset();
|
||||
if (on_write() != 0) {
|
||||
return -1;
|
||||
}
|
||||
if (wb_.rleft() == 0) {
|
||||
|
||||
auto iovcnt = upstream_->response_riovec(&iov, 1);
|
||||
if (iovcnt == 0) {
|
||||
conn_.start_tls_write_idle();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
conn_.wlimit.stopw();
|
||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ClientHandler::writev_tls() {
|
||||
ev_timer_again(conn_.loop, &conn_.rt);
|
||||
|
||||
auto buf = upstream_->get_response_buf();
|
||||
if (!buf) {
|
||||
conn_.wlimit.stopw();
|
||||
ev_timer_stop(conn_.loop, &conn_.wt);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
ERR_clear_error();
|
||||
|
||||
for (;;) {
|
||||
if (buf->rleft() > 0) {
|
||||
iovec iov;
|
||||
auto iovcnt = buf->riovec(&iov, 1);
|
||||
if (iovcnt == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
|
||||
|
||||
if (nwrite == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (nwrite < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf->drain(nwrite);
|
||||
|
||||
continue;
|
||||
}
|
||||
if (on_write() != 0) {
|
||||
auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
|
||||
if (nwrite < 0) {
|
||||
return -1;
|
||||
}
|
||||
buf = upstream_->get_response_buf();
|
||||
if (!buf || buf->rleft() == 0) {
|
||||
conn_.start_tls_write_idle();
|
||||
break;
|
||||
|
||||
if (nwrite == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
upstream_->response_drain(nwrite);
|
||||
}
|
||||
|
||||
conn_.wlimit.stopw();
|
||||
|
@ -374,9 +279,7 @@ int ClientHandler::upstream_write() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (get_should_close_after_write() && wb_.rleft() == 0 &&
|
||||
(!upstream_->get_response_buf() ||
|
||||
upstream_->get_response_buf()->rleft() == 0)) {
|
||||
if (get_should_close_after_write() && upstream_->response_empty()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -506,7 +409,7 @@ void ClientHandler::setup_upstream_io_callback() {
|
|||
upstream_ = make_unique<HttpsUpstream>(this);
|
||||
alpn_ = "http/1.1";
|
||||
read_ = &ClientHandler::read_clear;
|
||||
write_ = &ClientHandler::writev_clear;
|
||||
write_ = &ClientHandler::write_clear;
|
||||
on_read_ = &ClientHandler::upstream_http1_connhd_read;
|
||||
on_write_ = &ClientHandler::upstream_noop;
|
||||
}
|
||||
|
@ -818,7 +721,6 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
|
|||
}
|
||||
// http pointer is now owned by upstream.
|
||||
upstream_.release();
|
||||
upstream_ = std::move(upstream);
|
||||
// TODO We might get other version id in HTTP2-settings, if we
|
||||
// support aliasing for h2, but we just use library default for now.
|
||||
alpn_ = NGHTTP2_CLEARTEXT_PROTO_VERSION_ID;
|
||||
|
@ -829,7 +731,9 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
|
|||
"Connection: Upgrade\r\n"
|
||||
"Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
|
||||
"\r\n";
|
||||
wb_.write(res, sizeof(res) - 1);
|
||||
upstream->response_write(res, sizeof(res) - 1);
|
||||
upstream_ = std::move(upstream);
|
||||
|
||||
signal_write();
|
||||
return 0;
|
||||
}
|
||||
|
@ -932,8 +836,6 @@ void ClientHandler::write_accesslog(int major, int minor, unsigned int status,
|
|||
});
|
||||
}
|
||||
|
||||
ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; }
|
||||
|
||||
ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
|
||||
|
||||
void ClientHandler::signal_write() { conn_.wlimit.startw(); }
|
||||
|
|
|
@ -60,13 +60,11 @@ public:
|
|||
// Performs clear text I/O
|
||||
int read_clear();
|
||||
int write_clear();
|
||||
int writev_clear();
|
||||
// Performs TLS handshake
|
||||
int tls_handshake();
|
||||
// Performs TLS I/O
|
||||
int read_tls();
|
||||
int write_tls();
|
||||
int writev_tls();
|
||||
|
||||
int upstream_noop();
|
||||
int upstream_read();
|
||||
|
@ -124,10 +122,8 @@ public:
|
|||
int64_t body_bytes_sent);
|
||||
Worker *get_worker() const;
|
||||
|
||||
using WriteBuf = Buffer<32768>;
|
||||
using ReadBuf = Buffer<8_k>;
|
||||
|
||||
WriteBuf *get_wb();
|
||||
ReadBuf *get_rb();
|
||||
|
||||
RateLimit *get_rlimit();
|
||||
|
@ -153,7 +149,6 @@ private:
|
|||
// The number of bytes of HTTP/2 client connection header to read
|
||||
size_t left_connhd_len_;
|
||||
bool should_close_after_write_;
|
||||
WriteBuf wb_;
|
||||
ReadBuf rb_;
|
||||
};
|
||||
|
||||
|
|
|
@ -852,9 +852,8 @@ int Http2Upstream::on_read() {
|
|||
rlimit->startw();
|
||||
}
|
||||
|
||||
auto wb = handler_->get_wb();
|
||||
if (nghttp2_session_want_read(session_) == 0 &&
|
||||
nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) {
|
||||
nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
|
||||
}
|
||||
|
@ -867,11 +866,13 @@ int Http2Upstream::on_read() {
|
|||
|
||||
// After this function call, downstream may be deleted.
|
||||
int Http2Upstream::on_write() {
|
||||
auto wb = handler_->get_wb();
|
||||
if (wb_.rleft() == 0) {
|
||||
wb_.reset();
|
||||
}
|
||||
|
||||
if (data_pending_) {
|
||||
auto n = std::min(wb->wleft(), data_pendinglen_);
|
||||
wb->write(data_pending_, n);
|
||||
auto n = std::min(wb_.wleft(), data_pendinglen_);
|
||||
wb_.write(data_pending_, n);
|
||||
if (n < data_pendinglen_) {
|
||||
data_pending_ += n;
|
||||
data_pendinglen_ -= n;
|
||||
|
@ -894,7 +895,7 @@ int Http2Upstream::on_write() {
|
|||
if (datalen == 0) {
|
||||
break;
|
||||
}
|
||||
auto n = wb->write(data, datalen);
|
||||
auto n = wb_.write(data, datalen);
|
||||
if (n < static_cast<decltype(n)>(datalen)) {
|
||||
data_pending_ = data + n;
|
||||
data_pendinglen_ = datalen - n;
|
||||
|
@ -903,7 +904,7 @@ int Http2Upstream::on_write() {
|
|||
}
|
||||
|
||||
if (nghttp2_session_want_read(session_) == 0 &&
|
||||
nghttp2_session_want_write(session_) == 0 && wb->rleft() == 0) {
|
||||
nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
|
||||
}
|
||||
|
@ -1755,4 +1756,23 @@ int Http2Upstream::initiate_push(Downstream *downstream, const char *uri,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int Http2Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
||||
if (iovcnt == 0 || wb_.rleft() == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
iov->iov_base = wb_.pos;
|
||||
iov->iov_len = wb_.rleft();
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void Http2Upstream::response_drain(size_t n) { wb_.drain(n); }
|
||||
|
||||
bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
|
||||
|
||||
void Http2Upstream::response_write(void *data, size_t len) {
|
||||
wb_.write(data, len);
|
||||
}
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "shrpx_upstream.h"
|
||||
#include "shrpx_downstream_queue.h"
|
||||
#include "memchunk.h"
|
||||
#include "buffer.h"
|
||||
|
||||
using namespace nghttp2;
|
||||
|
||||
|
@ -82,6 +83,11 @@ public:
|
|||
size_t bodylen);
|
||||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||
size_t len);
|
||||
virtual int response_riovec(struct iovec *iov, int iovcnt) const;
|
||||
virtual void response_drain(size_t n);
|
||||
virtual bool response_empty() const;
|
||||
|
||||
void response_write(void *data, size_t len);
|
||||
|
||||
bool get_flow_control() const;
|
||||
// Perform HTTP/2 upgrade from |upstream|. On success, this object
|
||||
|
@ -106,7 +112,10 @@ public:
|
|||
|
||||
int on_request_headers(Downstream *downstream, const nghttp2_frame *frame);
|
||||
|
||||
using WriteBuffer = Buffer<32_k>;
|
||||
|
||||
private:
|
||||
WriteBuffer wb_;
|
||||
std::unique_ptr<HttpsUpstream> pre_upstream_;
|
||||
DownstreamQueue downstream_queue_;
|
||||
ev_timer settings_timer_;
|
||||
|
|
|
@ -1160,12 +1160,34 @@ int HttpsUpstream::initiate_push(Downstream *downstream, const char *uri,
|
|||
return 0;
|
||||
}
|
||||
|
||||
DefaultMemchunks *HttpsUpstream::get_response_buf() const {
|
||||
int HttpsUpstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
||||
if (!downstream_) {
|
||||
return nullptr;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return downstream_->get_response_buf();
|
||||
auto buf = downstream_->get_response_buf();
|
||||
|
||||
return buf->riovec(iov, iovcnt);
|
||||
}
|
||||
|
||||
void HttpsUpstream::response_drain(size_t n) {
|
||||
if (!downstream_) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto buf = downstream_->get_response_buf();
|
||||
|
||||
buf->drain(n);
|
||||
}
|
||||
|
||||
bool HttpsUpstream::response_empty() const {
|
||||
if (!downstream_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto buf = downstream_->get_response_buf();
|
||||
|
||||
return buf->rleft() == 0;
|
||||
}
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -78,7 +78,9 @@ public:
|
|||
size_t bodylen);
|
||||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||
size_t len);
|
||||
virtual DefaultMemchunks *get_response_buf() const;
|
||||
virtual int response_riovec(struct iovec *iov, int iovcnt) const;
|
||||
virtual void response_drain(size_t n);
|
||||
virtual bool response_empty() const;
|
||||
|
||||
void reset_current_header_length();
|
||||
void log_response_headers(DefaultMemchunks *buf) const;
|
||||
|
|
|
@ -53,8 +53,7 @@ namespace {
|
|||
ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len,
|
||||
int flags, void *user_data) {
|
||||
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
||||
auto handler = upstream->get_client_handler();
|
||||
auto wb = handler->get_wb();
|
||||
auto wb = upstream->get_response_buf();
|
||||
|
||||
if (wb->wleft() == 0) {
|
||||
return SPDYLAY_ERR_WOULDBLOCK;
|
||||
|
@ -555,6 +554,10 @@ int SpdyUpstream::on_read() {
|
|||
int SpdyUpstream::on_write() {
|
||||
int rv = 0;
|
||||
|
||||
if (wb_.rleft() == 0) {
|
||||
wb_.reset();
|
||||
}
|
||||
|
||||
rv = spdylay_session_send(session_);
|
||||
if (rv != 0) {
|
||||
ULOG(ERROR, this) << "spdylay_session_send() returned error: "
|
||||
|
@ -563,8 +566,7 @@ int SpdyUpstream::on_write() {
|
|||
}
|
||||
|
||||
if (spdylay_session_want_read(session_) == 0 &&
|
||||
spdylay_session_want_write(session_) == 0 &&
|
||||
handler_->get_wb()->rleft() == 0) {
|
||||
spdylay_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
ULOG(INFO, this) << "No more read/write for this SPDY session";
|
||||
}
|
||||
|
@ -1213,4 +1215,21 @@ int SpdyUpstream::initiate_push(Downstream *downstream, const char *uri,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int SpdyUpstream::response_riovec(struct iovec *iov, int iovcnt) const {
|
||||
if (iovcnt == 0 || wb_.rleft() == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
iov->iov_base = wb_.pos;
|
||||
iov->iov_len = wb_.rleft();
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void SpdyUpstream::response_drain(size_t n) { wb_.drain(n); }
|
||||
|
||||
bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; }
|
||||
|
||||
SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; }
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
#include "shrpx_upstream.h"
|
||||
#include "shrpx_downstream_queue.h"
|
||||
#include "memchunk.h"
|
||||
#include "util.h"
|
||||
#include "buffer.h"
|
||||
|
||||
namespace shrpx {
|
||||
|
||||
|
@ -78,6 +78,9 @@ public:
|
|||
size_t bodylen);
|
||||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||
size_t len);
|
||||
virtual int response_riovec(struct iovec *iov, int iovcnt) const;
|
||||
virtual void response_drain(size_t n);
|
||||
virtual bool response_empty() const;
|
||||
|
||||
bool get_flow_control() const;
|
||||
|
||||
|
@ -86,7 +89,12 @@ public:
|
|||
void start_downstream(Downstream *downstream);
|
||||
void initiate_downstream(Downstream *downstream);
|
||||
|
||||
using WriteBuffer = Buffer<32_k>;
|
||||
|
||||
WriteBuffer *get_response_buf();
|
||||
|
||||
private:
|
||||
WriteBuffer wb_;
|
||||
DownstreamQueue downstream_queue_;
|
||||
ClientHandler *handler_;
|
||||
spdylay_session *session_;
|
||||
|
|
|
@ -71,9 +71,11 @@ public:
|
|||
virtual int initiate_push(Downstream *downstream, const char *uri,
|
||||
size_t len) = 0;
|
||||
|
||||
// Returns response buffer of Downstream directly. This exists for
|
||||
// optimization purpose for cleartext HttpsUpstream.
|
||||
virtual DefaultMemchunks *get_response_buf() const { return nullptr; }
|
||||
// Fills response data in |iov| whose capacity is |iovcnt|. Returns
|
||||
// the number of iovs filled.
|
||||
virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0;
|
||||
virtual void response_drain(size_t n) = 0;
|
||||
virtual bool response_empty() const = 0;
|
||||
};
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
Loading…
Reference in New Issue