nghttpx: Use Memchunk based read buffer for frontend connection
Previously, we have dedicated read buffer for each frontend connection. With this commit, the buffer spaces are only used when needed, and pooled if they are not used. This reduces memory usage for idle client connections.
This commit is contained in:
parent
e8b2508036
commit
4fa150c494
|
@ -434,6 +434,102 @@ inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) {
|
|||
return iovcnt;
|
||||
}
|
||||
|
||||
// MemchunkBuffer is similar to Buffer, but it uses pooled Memchunk
|
||||
// for its underlying buffer.
|
||||
template <typename Memchunk> struct MemchunkBuffer {
|
||||
MemchunkBuffer(Pool<Memchunk> *pool) : pool(pool), chunk(nullptr) {}
|
||||
MemchunkBuffer(const MemchunkBuffer &) = delete;
|
||||
MemchunkBuffer(MemchunkBuffer &&other) noexcept
|
||||
: pool(other.pool), chunk(other.chunk) {
|
||||
other.chunk = nullptr;
|
||||
}
|
||||
MemchunkBuffer &operator=(const MemchunkBuffer &) = delete;
|
||||
MemchunkBuffer &operator=(MemchunkBuffer &&other) noexcept {
|
||||
if (this == &other) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
pool = other.pool;
|
||||
chunk = other.chunk;
|
||||
|
||||
other.chunk = nullptr;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
~MemchunkBuffer() {
|
||||
if (!pool || !chunk) {
|
||||
return;
|
||||
}
|
||||
pool->recycle(chunk);
|
||||
}
|
||||
|
||||
// Ensures that the underlying buffer is allocated.
|
||||
void ensure_chunk() {
|
||||
if (chunk) {
|
||||
return;
|
||||
}
|
||||
chunk = pool->get();
|
||||
}
|
||||
|
||||
// Releases the underlying buffer.
|
||||
void release_chunk() {
|
||||
if (!chunk) {
|
||||
return;
|
||||
}
|
||||
pool->recycle(chunk);
|
||||
chunk = nullptr;
|
||||
}
|
||||
|
||||
// Returns true if the underlying buffer is allocated.
|
||||
bool chunk_avail() const { return chunk != nullptr; }
|
||||
|
||||
// The functions below must be called after the underlying buffer is
|
||||
// allocated (use ensure_chunk).
|
||||
|
||||
// MemchunkBuffer provides the same interface functions with Buffer.
|
||||
// Since we has chunk as a member variable, pos and last are
|
||||
// implemented as wrapper functions.
|
||||
|
||||
uint8_t *pos() const { return chunk->pos; }
|
||||
uint8_t *last() const { return chunk->last; }
|
||||
|
||||
size_t rleft() const { return chunk->len(); }
|
||||
size_t wleft() const { return chunk->left(); }
|
||||
size_t write(const void *src, size_t count) {
|
||||
count = std::min(count, wleft());
|
||||
auto p = static_cast<const uint8_t *>(src);
|
||||
chunk->last = std::copy_n(p, count, chunk->last);
|
||||
return count;
|
||||
}
|
||||
size_t write(size_t count) {
|
||||
count = std::min(count, wleft());
|
||||
chunk->last += count;
|
||||
return count;
|
||||
}
|
||||
size_t drain(size_t count) {
|
||||
count = std::min(count, rleft());
|
||||
chunk->pos += count;
|
||||
return count;
|
||||
}
|
||||
size_t drain_reset(size_t count) {
|
||||
count = std::min(count, rleft());
|
||||
std::copy(chunk->pos + count, chunk->last, std::begin(chunk->buf));
|
||||
chunk->last = std::begin(chunk->buf) + (chunk->last - (chunk->pos + count));
|
||||
chunk->pos = std::begin(chunk->buf);
|
||||
return count;
|
||||
}
|
||||
void reset() { chunk->reset(); }
|
||||
uint8_t *begin() { return std::begin(chunk->buf); }
|
||||
uint8_t &operator[](size_t n) { return chunk->buf[n]; }
|
||||
const uint8_t &operator[](size_t n) const { return chunk->buf[n]; }
|
||||
|
||||
Pool<Memchunk> *pool;
|
||||
Memchunk *chunk;
|
||||
};
|
||||
|
||||
using DefaultMemchunkBuffer = MemchunkBuffer<Memchunk16K>;
|
||||
|
||||
} // namespace nghttp2
|
||||
|
||||
#endif // MEMCHUNK_H
|
||||
|
|
|
@ -117,6 +117,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
|
|||
int ClientHandler::noop() { return 0; }
|
||||
|
||||
int ClientHandler::read_clear() {
|
||||
rb_.ensure_chunk();
|
||||
for (;;) {
|
||||
if (rb_.rleft() && on_read() != 0) {
|
||||
return -1;
|
||||
|
@ -132,9 +133,12 @@ int ClientHandler::read_clear() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
auto nread = conn_.read_clear(rb_.last, rb_.wleft());
|
||||
auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
|
||||
|
||||
if (nread == 0) {
|
||||
if (rb_.rleft() == 0) {
|
||||
rb_.release_chunk();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -209,6 +213,8 @@ int ClientHandler::tls_handshake() {
|
|||
int ClientHandler::read_tls() {
|
||||
ERR_clear_error();
|
||||
|
||||
rb_.ensure_chunk();
|
||||
|
||||
for (;;) {
|
||||
// we should process buffered data first before we read EOF.
|
||||
if (rb_.rleft() && on_read() != 0) {
|
||||
|
@ -225,9 +231,12 @@ int ClientHandler::read_tls() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
auto nread = conn_.read_tls(rb_.last, rb_.wleft());
|
||||
auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
|
||||
|
||||
if (nread == 0) {
|
||||
if (rb_.rleft() == 0) {
|
||||
rb_.release_chunk();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -303,7 +312,7 @@ int ClientHandler::upstream_write() {
|
|||
int ClientHandler::upstream_http2_connhd_read() {
|
||||
auto nread = std::min(left_connhd_len_, rb_.rleft());
|
||||
if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_,
|
||||
rb_.pos, nread) != 0) {
|
||||
rb_.pos(), nread) != 0) {
|
||||
// There is no downgrade path here. Just drop the connection.
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "invalid client connection header";
|
||||
|
@ -332,7 +341,7 @@ int ClientHandler::upstream_http2_connhd_read() {
|
|||
int ClientHandler::upstream_http1_connhd_read() {
|
||||
auto nread = std::min(left_connhd_len_, rb_.rleft());
|
||||
if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_,
|
||||
rb_.pos, nread) != 0) {
|
||||
rb_.pos(), nread) != 0) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "This is HTTP/1.1 connection, "
|
||||
<< "but may be upgraded to HTTP/2 later.";
|
||||
|
@ -386,6 +395,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
|
|||
// so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
|
||||
// 32 + 8 + 8 * 8 = 328.
|
||||
balloc_(512, 512),
|
||||
rb_(worker->get_mcpool()),
|
||||
conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
|
||||
get_config()->conn.upstream.timeout.write,
|
||||
get_config()->conn.upstream.timeout.read,
|
||||
|
@ -647,10 +657,12 @@ int ClientHandler::do_read() { return read_(*this); }
|
|||
int ClientHandler::do_write() { return write_(*this); }
|
||||
|
||||
int ClientHandler::on_read() {
|
||||
if (rb_.chunk_avail()) {
|
||||
auto rv = on_read_(*this);
|
||||
if (rv != 0) {
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
conn_.handle_tls_pending_read();
|
||||
return 0;
|
||||
}
|
||||
|
@ -1325,7 +1337,7 @@ ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
|
|||
|
||||
int ClientHandler::on_proxy_protocol_finish() {
|
||||
if (conn_.tls.ssl) {
|
||||
conn_.tls.rbuf.append(rb_.pos, rb_.rleft());
|
||||
conn_.tls.rbuf.append(rb_.pos(), rb_.rleft());
|
||||
rb_.reset();
|
||||
}
|
||||
|
||||
|
@ -1346,7 +1358,7 @@ int ClientHandler::proxy_protocol_read() {
|
|||
CLOG(INFO, this) << "PROXY-protocol: Started";
|
||||
}
|
||||
|
||||
auto first = rb_.pos;
|
||||
auto first = rb_.pos();
|
||||
|
||||
// NULL character really destroys functions which expects NULL
|
||||
// terminated string. We won't expect it in PROXY protocol line, so
|
||||
|
@ -1355,12 +1367,12 @@ int ClientHandler::proxy_protocol_read() {
|
|||
|
||||
constexpr size_t MAX_PROXY_LINELEN = 107;
|
||||
|
||||
auto bufend = rb_.pos + std::min(MAX_PROXY_LINELEN, rb_.rleft());
|
||||
auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
|
||||
|
||||
auto end =
|
||||
std::find_first_of(rb_.pos, bufend, std::begin(chrs), std::end(chrs));
|
||||
std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
|
||||
|
||||
if (end == bufend || *end == '\0' || end == rb_.pos || *(end - 1) != '\r') {
|
||||
if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
|
||||
}
|
||||
|
@ -1371,14 +1383,14 @@ int ClientHandler::proxy_protocol_read() {
|
|||
|
||||
constexpr auto HEADER = StringRef::from_lit("PROXY ");
|
||||
|
||||
if (static_cast<size_t>(end - rb_.pos) < HEADER.size()) {
|
||||
if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!util::streq(HEADER, StringRef{rb_.pos, HEADER.size()})) {
|
||||
if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
|
||||
}
|
||||
|
@ -1389,22 +1401,22 @@ int ClientHandler::proxy_protocol_read() {
|
|||
|
||||
int family;
|
||||
|
||||
if (rb_.pos[0] == 'T') {
|
||||
if (end - rb_.pos < 5) {
|
||||
if (rb_.pos()[0] == 'T') {
|
||||
if (end - rb_.pos() < 5) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (rb_.pos[1] != 'C' || rb_.pos[2] != 'P') {
|
||||
if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
switch (rb_.pos[3]) {
|
||||
switch (rb_.pos()[3]) {
|
||||
case '4':
|
||||
family = AF_INET;
|
||||
break;
|
||||
|
@ -1420,26 +1432,26 @@ int ClientHandler::proxy_protocol_read() {
|
|||
|
||||
rb_.drain(5);
|
||||
} else {
|
||||
if (end - rb_.pos < 7) {
|
||||
if (end - rb_.pos() < 7) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
if (!util::streq_l("UNKNOWN", rb_.pos, 7)) {
|
||||
if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
rb_.drain(end + 2 - rb_.pos);
|
||||
rb_.drain(end + 2 - rb_.pos());
|
||||
|
||||
return on_proxy_protocol_finish();
|
||||
}
|
||||
|
||||
// source address
|
||||
auto token_end = std::find(rb_.pos, end, ' ');
|
||||
auto token_end = std::find(rb_.pos(), end, ' ');
|
||||
if (token_end == end) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
|
||||
|
@ -1448,20 +1460,20 @@ int ClientHandler::proxy_protocol_read() {
|
|||
}
|
||||
|
||||
*token_end = '\0';
|
||||
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos), family)) {
|
||||
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto src_addr = rb_.pos;
|
||||
auto src_addrlen = token_end - rb_.pos;
|
||||
auto src_addr = rb_.pos();
|
||||
auto src_addrlen = token_end - rb_.pos();
|
||||
|
||||
rb_.drain(token_end - rb_.pos + 1);
|
||||
rb_.drain(token_end - rb_.pos() + 1);
|
||||
|
||||
// destination address
|
||||
token_end = std::find(rb_.pos, end, ' ');
|
||||
token_end = std::find(rb_.pos(), end, ' ');
|
||||
if (token_end == end) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
|
||||
|
@ -1470,7 +1482,7 @@ int ClientHandler::proxy_protocol_read() {
|
|||
}
|
||||
|
||||
*token_end = '\0';
|
||||
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos), family)) {
|
||||
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
|
||||
}
|
||||
|
@ -1479,26 +1491,26 @@ int ClientHandler::proxy_protocol_read() {
|
|||
|
||||
// Currently we don't use destination address
|
||||
|
||||
rb_.drain(token_end - rb_.pos + 1);
|
||||
rb_.drain(token_end - rb_.pos() + 1);
|
||||
|
||||
// source port
|
||||
auto n = parse_proxy_line_port(rb_.pos, end);
|
||||
if (n <= 0 || *(rb_.pos + n) != ' ') {
|
||||
auto n = parse_proxy_line_port(rb_.pos(), end);
|
||||
if (n <= 0 || *(rb_.pos() + n) != ' ') {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
rb_.pos[n] = '\0';
|
||||
auto src_port = rb_.pos;
|
||||
rb_.pos()[n] = '\0';
|
||||
auto src_port = rb_.pos();
|
||||
auto src_portlen = n;
|
||||
|
||||
rb_.drain(n + 1);
|
||||
|
||||
// destination port
|
||||
n = parse_proxy_line_port(rb_.pos, end);
|
||||
if (n <= 0 || rb_.pos + n != end) {
|
||||
n = parse_proxy_line_port(rb_.pos(), end);
|
||||
if (n <= 0 || rb_.pos() + n != end) {
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
|
||||
}
|
||||
|
@ -1507,14 +1519,14 @@ int ClientHandler::proxy_protocol_read() {
|
|||
|
||||
// Currently we don't use destination port
|
||||
|
||||
rb_.drain(end + 2 - rb_.pos);
|
||||
rb_.drain(end + 2 - rb_.pos());
|
||||
|
||||
ipaddr_ =
|
||||
make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
|
||||
port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
|
||||
|
||||
if (LOG_ENABLED(INFO)) {
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos - first)
|
||||
CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
|
||||
<< " bytes read";
|
||||
}
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ public:
|
|||
int64_t body_bytes_sent);
|
||||
Worker *get_worker() const;
|
||||
|
||||
using ReadBuf = Buffer<16_k>;
|
||||
using ReadBuf = DefaultMemchunkBuffer;
|
||||
|
||||
ReadBuf *get_rb();
|
||||
|
||||
|
@ -171,6 +171,7 @@ private:
|
|||
// sure that the allocations must be bounded, and not proportional
|
||||
// to the number of requests.
|
||||
BlockAllocator balloc_;
|
||||
DefaultMemchunkBuffer rb_;
|
||||
Connection conn_;
|
||||
ev_timer reneg_shutdown_timer_;
|
||||
std::unique_ptr<Upstream> upstream_;
|
||||
|
@ -197,7 +198,6 @@ private:
|
|||
bool should_close_after_write_;
|
||||
// true if affinity_hash_ is computed
|
||||
bool affinity_hash_computed_;
|
||||
ReadBuf rb_;
|
||||
};
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -1055,7 +1055,7 @@ int Http2Upstream::on_read() {
|
|||
auto rlimit = handler_->get_rlimit();
|
||||
|
||||
if (rb->rleft()) {
|
||||
rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft());
|
||||
rv = nghttp2_session_mem_recv(session_, rb->pos(), rb->rleft());
|
||||
if (rv < 0) {
|
||||
if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
|
||||
ULOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: "
|
||||
|
|
|
@ -532,7 +532,7 @@ int HttpsUpstream::on_read() {
|
|||
// callback chain called by http_parser_execute()
|
||||
if (downstream && downstream->get_upgraded()) {
|
||||
|
||||
auto rv = downstream->push_upload_data_chunk(rb->pos, rb->rleft());
|
||||
auto rv = downstream->push_upload_data_chunk(rb->pos(), rb->rleft());
|
||||
|
||||
if (rv != 0) {
|
||||
return -1;
|
||||
|
@ -565,8 +565,9 @@ int HttpsUpstream::on_read() {
|
|||
}
|
||||
|
||||
// http_parser_execute() does nothing once it entered error state.
|
||||
auto nread = http_parser_execute(
|
||||
&htp_, &htp_hooks, reinterpret_cast<const char *>(rb->pos), rb->rleft());
|
||||
auto nread = http_parser_execute(&htp_, &htp_hooks,
|
||||
reinterpret_cast<const char *>(rb->pos()),
|
||||
rb->rleft());
|
||||
|
||||
rb->drain(nread);
|
||||
rlimit->startw();
|
||||
|
|
|
@ -105,7 +105,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len,
|
|||
|
||||
auto nread = std::min(rb->rleft(), len);
|
||||
|
||||
memcpy(buf, rb->pos, nread);
|
||||
memcpy(buf, rb->pos(), nread);
|
||||
rb->drain(nread);
|
||||
rlimit->startw();
|
||||
|
||||
|
|
Loading…
Reference in New Issue