diff --git a/src/libevent_util.cc b/src/libevent_util.cc index c3c44126..eb8e9cfb 100644 --- a/src/libevent_util.cc +++ b/src/libevent_util.cc @@ -25,6 +25,7 @@ #include "libevent_util.h" #include +#include namespace nghttp2 { @@ -32,34 +33,78 @@ namespace util { EvbufferBuffer::EvbufferBuffer() : evbuffer_(nullptr), + bucket_(nullptr), buf_(nullptr), bufmax_(0), - buflen_(0) + buflen_(0), + limit_(0), + writelen_(0) {} -EvbufferBuffer::EvbufferBuffer(evbuffer *evbuffer, uint8_t *buf, size_t bufmax) +EvbufferBuffer::EvbufferBuffer(evbuffer *evbuffer, uint8_t *buf, size_t bufmax, + ssize_t limit) : evbuffer_(evbuffer), + bucket_(limit == -1 ? nullptr : evbuffer_new()), buf_(buf), bufmax_(bufmax), - buflen_(0) + buflen_(0), + limit_(limit), + writelen_(0) {} -void EvbufferBuffer::reset(evbuffer *evbuffer, uint8_t *buf, size_t bufmax) +void EvbufferBuffer::reset(evbuffer *evbuffer, uint8_t *buf, size_t bufmax, + ssize_t limit) { evbuffer_ = evbuffer; buf_ = buf; + if(limit != -1 && !bucket_) { + bucket_ = evbuffer_new(); + } bufmax_ = bufmax; buflen_ = 0; + limit_ = limit; + writelen_ = 0; +} + +EvbufferBuffer::~EvbufferBuffer() +{ + if(bucket_) { + evbuffer_free(bucket_); + } +} + +int EvbufferBuffer::write_buffer() +{ + for(auto pos = buf_, end = buf_ + buflen_; pos < end;) { + // To avoid merging chunks in evbuffer, we first add to temporal + // buffer bucket_ and then move its chain to evbuffer_. + auto nwrite = std::min(end - pos, limit_); + auto rv = evbuffer_add(bucket_, pos, nwrite); + if(rv == -1) { + return -1; + } + rv = evbuffer_add_buffer(evbuffer_, bucket_); + if(rv == -1) { + return -1; + } + pos += nwrite; + } + return 0; } int EvbufferBuffer::flush() { int rv; if(buflen_ > 0) { - rv = evbuffer_add(evbuffer_, buf_, buflen_); + if(limit_ == -1) { + rv = evbuffer_add(evbuffer_, buf_, buflen_); + } else { + rv = write_buffer(); + } if(rv == -1) { return -1; } + writelen_ += buflen_; buflen_ = 0; } return 0; @@ -70,17 +115,27 @@ int EvbufferBuffer::add(const uint8_t *data, size_t datalen) int rv; if(buflen_ + datalen > bufmax_) { if(buflen_ > 0) { - rv = evbuffer_add(evbuffer_, buf_, buflen_); + if(limit_ == -1) { + rv = evbuffer_add(evbuffer_, buf_, buflen_); + } else { + rv = write_buffer(); + } if(rv == -1) { return -1; } + writelen_ += buflen_; buflen_ = 0; } if(datalen > bufmax_) { - rv = evbuffer_add(evbuffer_, data, datalen); + if(limit_ == -1) { + rv = evbuffer_add(evbuffer_, data, datalen); + } else { + rv = write_buffer(); + } if(rv == -1) { return -1; } + writelen_ += buflen_; return 0; } } @@ -94,6 +149,11 @@ size_t EvbufferBuffer::get_buflen() const return buflen_; } +size_t EvbufferBuffer::get_writelen() const +{ + return writelen_; +} + void bev_enable_unless(bufferevent *bev, int events) { if((bufferevent_get_enabled(bev) & events) == events) { diff --git a/src/libevent_util.h b/src/libevent_util.h index 2a020d36..2a79f65e 100644 --- a/src/libevent_util.h +++ b/src/libevent_util.h @@ -37,16 +37,28 @@ namespace util { class EvbufferBuffer { public: EvbufferBuffer(); - EvbufferBuffer(evbuffer *evbuffer, uint8_t *buf, size_t bufmax); - void reset(evbuffer *evbuffer, uint8_t *buf, size_t bufmax); + // If |limit| is not -1, at most min(limit, bufmax) size bytes are + // added to evbuffer_. + EvbufferBuffer(evbuffer *evbuffer, uint8_t *buf, size_t bufmax, + ssize_t limit = -1); + ~EvbufferBuffer(); + void reset(evbuffer *evbuffer, uint8_t *buf, size_t bufmax, + ssize_t limit = -1); int flush(); int add(const uint8_t *data, size_t datalen); size_t get_buflen() const; + int write_buffer(); + // Returns the number of written bytes to evbuffer_ so far. reset() + // resets this value to 0. + size_t get_writelen() const; private: evbuffer *evbuffer_; + evbuffer *bucket_; uint8_t *buf_; size_t bufmax_; size_t buflen_; + ssize_t limit_; + size_t writelen_; }; // These functions are provided to reduce epoll_ctl syscall. Avoid diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index f135a268..e6b1b9a5 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -71,6 +71,8 @@ void upstream_writecb(bufferevent *bev, void *arg) upstream->reset_timeouts(); } + handler->update_last_write_time(); + // We actually depend on write low-water mark == 0. if(handler->get_outbuf_length() > 0) { // Possibly because of deferred callback, we may get this callback @@ -174,6 +176,8 @@ ClientHandler::ClientHandler(bufferevent *bev, ssl_(ssl), reneg_shutdown_timerev_(nullptr), worker_stat_(worker_stat), + last_write_time_(0), + warmup_writelen_(0), left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN), fd_(fd), should_close_after_write_(false), @@ -672,4 +676,50 @@ bool ClientHandler::get_tls_renegotiation() const return tls_renegotiation_; } +namespace { +const size_t SHRPX_SMALL_WRITE_LIMIT = 1300; +const size_t SHRPX_WARMUP_THRESHOLD = 1 << 20; +} // namespace + +ssize_t ClientHandler::get_write_limit() +{ + if(!ssl_) { + return -1; + } + + timeval tv; + if(event_base_gettimeofday_cached(get_evbase(), &tv) == 0) { + auto now = util::to_time64(tv); + if(now - last_write_time_ > 1000000) { + // Time out, use small record size + warmup_writelen_ = 0; + return SHRPX_SMALL_WRITE_LIMIT; + } + } + + // If event_base_gettimeofday_cached() failed, we just skip timer + // checking. Don't know how to treat this. + + if(warmup_writelen_ >= SHRPX_WARMUP_THRESHOLD) { + return -1; + } + + return SHRPX_SMALL_WRITE_LIMIT; +} + +void ClientHandler::update_warmup_writelen(size_t n) +{ + if(warmup_writelen_ < SHRPX_WARMUP_THRESHOLD) { + warmup_writelen_ += n; + } +} + +void ClientHandler::update_last_write_time() +{ + timeval tv; + if(event_base_gettimeofday_cached(get_evbase(), &tv) == 0) { + last_write_time_ = util::to_time64(tv); + } +} + } // namespace shrpx diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 54e8a026..7f027abb 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -91,6 +91,20 @@ public: bool get_tls_renegotiation() const; int on_http2_connhd_read(); int on_http1_connhd_read(); + // Returns maximum chunk size for one evbuffer_add(). The intention + // of this chunk size is control the TLS record size. The actual + // SSL_write() call is done under libevent control. In + // libevent-2.0.21, libevent calls SSL_write() for each chunk inside + // evbuffer. This means that we can control TLS record size by + // adjusting the chunk size to evbuffer_add(). + // + // This function returns -1, if TLS is not enabled or no limitation + // is required. + ssize_t get_write_limit(); + // Updates the number of bytes written in warm up period. + void update_warmup_writelen(size_t n); + // Updates the time when last write was done. + void update_last_write_time(); private: std::unique_ptr upstream_; std::string ipaddr_; @@ -103,6 +117,8 @@ private: SSL *ssl_; event *reneg_shutdown_timerev_; WorkerStat *worker_stat_; + int64_t last_write_time_; + size_t warmup_writelen_; // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; int fd_; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 3d4a52ea..a90914de 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -38,7 +38,6 @@ #include "shrpx_worker_config.h" #include "http2.h" #include "util.h" -#include "libevent_util.h" #include "base64.h" #include "app_helper.h" @@ -785,10 +784,11 @@ int Http2Upstream::send() uint8_t buf[16384]; auto bev = handler_->get_bev(); auto output = bufferevent_get_output(bev); - util::EvbufferBuffer evbbuf(output, buf, sizeof(buf)); + + sendbuf.reset(output, buf, sizeof(buf), handler_->get_write_limit()); for(;;) { // Check buffer length and break if it is large enough. - if(handler_->get_outbuf_length() + evbbuf.get_buflen() >= + if(handler_->get_outbuf_length() + sendbuf.get_buflen() >= OUTBUF_MAX_THRES) { break; } @@ -804,19 +804,21 @@ int Http2Upstream::send() if(datalen == 0) { break; } - rv = evbbuf.add(data, datalen); + rv = sendbuf.add(data, datalen); if(rv != 0) { ULOG(FATAL, this) << "evbuffer_add() failed"; return -1; } } - rv = evbbuf.flush(); + rv = sendbuf.flush(); if(rv != 0) { ULOG(FATAL, this) << "evbuffer_add() failed"; return -1; } + handler_->update_warmup_writelen(sendbuf.get_writelen()); + if(nghttp2_session_want_read(session_) == 0 && nghttp2_session_want_write(session_) == 0 && handler_->get_outbuf_length() == 0) { diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 4167ad99..0b846cf3 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -33,6 +33,7 @@ #include "shrpx_upstream.h" #include "shrpx_downstream_queue.h" +#include "libevent_util.h" namespace shrpx { @@ -87,6 +88,8 @@ public: const std::vector& nva) const; void maintain_downstream_concurrency(); void initiate_downstream(std::unique_ptr downstream); + + nghttp2::util::EvbufferBuffer sendbuf; private: DownstreamQueue downstream_queue_; std::unique_ptr pre_upstream_; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 51b1e2b3..e91e4e94 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -537,7 +537,8 @@ int SpdyUpstream::send() int rv = 0; uint8_t buf[16384]; - sendbuf.reset(bufferevent_get_output(handler_->get_bev()), buf, sizeof(buf)); + sendbuf.reset(bufferevent_get_output(handler_->get_bev()), buf, sizeof(buf), + handler_->get_write_limit()); rv = spdylay_session_send(session_); if(rv != 0) { @@ -552,6 +553,8 @@ int SpdyUpstream::send() return -1; } + handler_->update_warmup_writelen(sendbuf.get_writelen()); + if(spdylay_session_want_read(session_) == 0 && spdylay_session_want_write(session_) == 0 && handler_->get_outbuf_length() == 0) { diff --git a/src/util.cc b/src/util.cc index 99b096df..8be0a949 100644 --- a/src/util.cc +++ b/src/util.cc @@ -636,6 +636,11 @@ bool check_path(const std::string& path) !util::endsWith(path, "/..") && !util::endsWith(path, "/."); } +int64_t to_time64(const timeval& tv) +{ + return tv.tv_sec * 1000000 + tv.tv_usec; +} + } // namespace util } // namespace nghttp2 diff --git a/src/util.h b/src/util.h index f914a9cc..ee3c789f 100644 --- a/src/util.h +++ b/src/util.h @@ -474,6 +474,10 @@ char* get_exec_path(int argc, char **const argv, const char *cwd); // percent-decode was performed. bool check_path(const std::string& path); +// Returns the |tv| value as 64 bit integer using a microsecond as an +// unit. +int64_t to_time64(const timeval& tv); + } // namespace util } // namespace nghttp2