nghttpx: Remove rb_ from HTTP/2 backend session
This commit is contained in:
parent
88eaeb5d1c
commit
62c43ce2be
|
@ -167,7 +167,9 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
|
||||||
flow_control_(false) {
|
flow_control_(false) {
|
||||||
|
|
||||||
read_ = write_ = &Http2Session::noop;
|
read_ = write_ = &Http2Session::noop;
|
||||||
on_read_ = on_write_ = &Http2Session::noop;
|
|
||||||
|
on_read_ = &Http2Session::read_noop;
|
||||||
|
on_write_ = &Http2Session::write_noop;
|
||||||
|
|
||||||
// We will resuse this many times, so use repeat timeout value. The
|
// We will resuse this many times, so use repeat timeout value. The
|
||||||
// timeout value is set later.
|
// timeout value is set later.
|
||||||
|
@ -191,7 +193,6 @@ int Http2Session::disconnect(bool hard) {
|
||||||
nghttp2_session_del(session_);
|
nghttp2_session_del(session_);
|
||||||
session_ = nullptr;
|
session_ = nullptr;
|
||||||
|
|
||||||
rb_.reset();
|
|
||||||
wb_.reset();
|
wb_.reset();
|
||||||
|
|
||||||
conn_.rlimit.stopw();
|
conn_.rlimit.stopw();
|
||||||
|
@ -201,7 +202,9 @@ int Http2Session::disconnect(bool hard) {
|
||||||
ev_timer_stop(conn_.loop, &connchk_timer_);
|
ev_timer_stop(conn_.loop, &connchk_timer_);
|
||||||
|
|
||||||
read_ = write_ = &Http2Session::noop;
|
read_ = write_ = &Http2Session::noop;
|
||||||
on_read_ = on_write_ = &Http2Session::noop;
|
|
||||||
|
on_read_ = &Http2Session::read_noop;
|
||||||
|
on_write_ = &Http2Session::write_noop;
|
||||||
|
|
||||||
conn_.disconnect();
|
conn_.disconnect();
|
||||||
|
|
||||||
|
@ -465,29 +468,17 @@ http_parser_settings htp_hooks = {
|
||||||
};
|
};
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
int Http2Session::downstream_read_proxy() {
|
int Http2Session::downstream_read_proxy(const uint8_t *data, size_t datalen) {
|
||||||
if (rb_.rleft() == 0) {
|
auto nread =
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t nread =
|
|
||||||
http_parser_execute(proxy_htp_.get(), &htp_hooks,
|
http_parser_execute(proxy_htp_.get(), &htp_hooks,
|
||||||
reinterpret_cast<const char *>(rb_.pos), rb_.rleft());
|
reinterpret_cast<const char *>(data), datalen);
|
||||||
|
(void)nread;
|
||||||
rb_.drain(nread);
|
|
||||||
|
|
||||||
auto htperr = HTTP_PARSER_ERRNO(proxy_htp_.get());
|
auto htperr = HTTP_PARSER_ERRNO(proxy_htp_.get());
|
||||||
|
|
||||||
if (htperr == HPE_PAUSED) {
|
if (htperr == HPE_PAUSED) {
|
||||||
switch (state_) {
|
switch (state_) {
|
||||||
case Http2Session::PROXY_CONNECTED:
|
case Http2Session::PROXY_CONNECTED:
|
||||||
// we need to increment nread by 1 since http_parser_execute()
|
|
||||||
// returns 1 less value we expect. This means taht
|
|
||||||
// rb_.pos[nread] points to \x0a (LF), which is last byte of
|
|
||||||
// empty line to terminate headers. We want to eat that byte
|
|
||||||
// here.
|
|
||||||
rb_.drain(1);
|
|
||||||
|
|
||||||
// Initiate SSL/TLS handshake through established tunnel.
|
// Initiate SSL/TLS handshake through established tunnel.
|
||||||
if (initiate_connection() != 0) {
|
if (initiate_connection() != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -533,7 +524,7 @@ int Http2Session::downstream_connect_proxy() {
|
||||||
}
|
}
|
||||||
wb_.append(req);
|
wb_.append(req);
|
||||||
|
|
||||||
on_write_ = &Http2Session::noop;
|
on_write_ = &Http2Session::write_noop;
|
||||||
|
|
||||||
signal_write();
|
signal_write();
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1412,27 +1403,22 @@ int Http2Session::connection_made() {
|
||||||
int Http2Session::do_read() { return read_(*this); }
|
int Http2Session::do_read() { return read_(*this); }
|
||||||
int Http2Session::do_write() { return write_(*this); }
|
int Http2Session::do_write() { return write_(*this); }
|
||||||
|
|
||||||
int Http2Session::on_read() { return on_read_(*this); }
|
int Http2Session::on_read(const uint8_t *data, size_t datalen) {
|
||||||
|
return on_read_(*this, data, datalen);
|
||||||
|
}
|
||||||
|
|
||||||
int Http2Session::on_write() { return on_write_(*this); }
|
int Http2Session::on_write() { return on_write_(*this); }
|
||||||
|
|
||||||
int Http2Session::downstream_read() {
|
int Http2Session::downstream_read(const uint8_t *data, size_t datalen) {
|
||||||
ssize_t rv = 0;
|
ssize_t rv;
|
||||||
|
|
||||||
if (rb_.rleft() > 0) {
|
|
||||||
rv = nghttp2_session_mem_recv(
|
|
||||||
session_, reinterpret_cast<const uint8_t *>(rb_.pos), rb_.rleft());
|
|
||||||
|
|
||||||
|
rv = nghttp2_session_mem_recv(session_, data, datalen);
|
||||||
if (rv < 0) {
|
if (rv < 0) {
|
||||||
SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: "
|
SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: "
|
||||||
<< nghttp2_strerror(rv);
|
<< nghttp2_strerror(rv);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// nghttp2_session_mem_recv() should consume all input data in
|
|
||||||
// case of success.
|
|
||||||
rb_.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nghttp2_session_want_read(session_) == 0 &&
|
if (nghttp2_session_want_read(session_) == 0 &&
|
||||||
nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
@ -1621,6 +1607,10 @@ int Http2Session::get_connection_check_state() const {
|
||||||
|
|
||||||
int Http2Session::noop() { return 0; }
|
int Http2Session::noop() { return 0; }
|
||||||
|
|
||||||
|
int Http2Session::read_noop(const uint8_t *data, size_t datalen) { return 0; }
|
||||||
|
|
||||||
|
int Http2Session::write_noop() { return 0; }
|
||||||
|
|
||||||
int Http2Session::connected() {
|
int Http2Session::connected() {
|
||||||
if (!util::check_socket_connected(conn_.fd)) {
|
if (!util::check_socket_connected(conn_.fd)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1659,17 +1649,10 @@ int Http2Session::connected() {
|
||||||
int Http2Session::read_clear() {
|
int Http2Session::read_clear() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
|
|
||||||
for (;;) {
|
std::array<uint8_t, 16_k> buf;
|
||||||
// we should process buffered data first before we read EOF.
|
|
||||||
if (rb_.rleft() && on_read() != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (rb_.rleft()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
rb_.reset();
|
|
||||||
|
|
||||||
auto nread = conn_.read_clear(rb_.last, rb_.wleft());
|
for (;;) {
|
||||||
|
auto nread = conn_.read_clear(buf.data(), buf.size());
|
||||||
|
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1679,7 +1662,9 @@ int Http2Session::read_clear() {
|
||||||
return nread;
|
return nread;
|
||||||
}
|
}
|
||||||
|
|
||||||
rb_.write(nread);
|
if (on_read(buf.data(), nread) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1757,19 +1742,12 @@ int Http2Session::tls_handshake() {
|
||||||
int Http2Session::read_tls() {
|
int Http2Session::read_tls() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
|
|
||||||
|
std::array<uint8_t, 16_k> buf;
|
||||||
|
|
||||||
ERR_clear_error();
|
ERR_clear_error();
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
// we should process buffered data first before we read EOF.
|
auto nread = conn_.read_tls(buf.data(), buf.size());
|
||||||
if (rb_.rleft() && on_read() != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (rb_.rleft()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
rb_.reset();
|
|
||||||
|
|
||||||
auto nread = conn_.read_tls(rb_.last, rb_.wleft());
|
|
||||||
|
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1779,7 +1757,9 @@ int Http2Session::read_tls() {
|
||||||
return nread;
|
return nread;
|
||||||
}
|
}
|
||||||
|
|
||||||
rb_.write(nread);
|
if (on_read(buf.data(), nread) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ public:
|
||||||
int do_read();
|
int do_read();
|
||||||
int do_write();
|
int do_write();
|
||||||
|
|
||||||
int on_read();
|
int on_read(const uint8_t *data, size_t datalen);
|
||||||
int on_write();
|
int on_write();
|
||||||
|
|
||||||
int connected();
|
int connected();
|
||||||
|
@ -100,13 +100,15 @@ public:
|
||||||
int read_tls();
|
int read_tls();
|
||||||
int write_tls();
|
int write_tls();
|
||||||
|
|
||||||
int downstream_read_proxy();
|
int downstream_read_proxy(const uint8_t *data, size_t datalen);
|
||||||
int downstream_connect_proxy();
|
int downstream_connect_proxy();
|
||||||
|
|
||||||
int downstream_read();
|
int downstream_read(const uint8_t *data, size_t datalen);
|
||||||
int downstream_write();
|
int downstream_write();
|
||||||
|
|
||||||
int noop();
|
int noop();
|
||||||
|
int read_noop(const uint8_t *data, size_t datalen);
|
||||||
|
int write_noop();
|
||||||
|
|
||||||
void signal_write();
|
void signal_write();
|
||||||
|
|
||||||
|
@ -196,7 +198,8 @@ private:
|
||||||
DList<Http2DownstreamConnection> dconns_;
|
DList<Http2DownstreamConnection> dconns_;
|
||||||
DList<StreamData> streams_;
|
DList<StreamData> streams_;
|
||||||
std::function<int(Http2Session &)> read_, write_;
|
std::function<int(Http2Session &)> read_, write_;
|
||||||
std::function<int(Http2Session &)> on_read_, on_write_;
|
std::function<int(Http2Session &, const uint8_t *, size_t)> on_read_;
|
||||||
|
std::function<int(Http2Session &)> on_write_;
|
||||||
// Used to parse the response from HTTP proxy
|
// Used to parse the response from HTTP proxy
|
||||||
std::unique_ptr<http_parser> proxy_htp_;
|
std::unique_ptr<http_parser> proxy_htp_;
|
||||||
Worker *worker_;
|
Worker *worker_;
|
||||||
|
@ -213,7 +216,6 @@ private:
|
||||||
int state_;
|
int state_;
|
||||||
int connection_check_state_;
|
int connection_check_state_;
|
||||||
bool flow_control_;
|
bool flow_control_;
|
||||||
ReadBuf rb_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
nghttp2_session_callbacks *create_http2_downstream_callbacks();
|
nghttp2_session_callbacks *create_http2_downstream_callbacks();
|
||||||
|
|
Loading…
Reference in New Issue