nghttpx: Rewrite read timer handling
For HTTP/2, read timer starts when there is no downstream, and timer stops when there is at least one downstream. For HTTP/1, read timer starts when request handling finished, and timer stops when request handling starts.
This commit is contained in:
parent
66ca8272ca
commit
cbced219ec
|
@ -125,10 +125,6 @@ int ClientHandler::read_clear() {
|
||||||
rb_.reset();
|
rb_.reset();
|
||||||
} else if (rb_.wleft() == 0) {
|
} else if (rb_.wleft() == 0) {
|
||||||
conn_.rlimit.stopw();
|
conn_.rlimit.stopw();
|
||||||
if (reset_conn_rtimer_required_) {
|
|
||||||
reset_conn_rtimer_required_ = false;
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,10 +135,6 @@ int ClientHandler::read_clear() {
|
||||||
auto nread = conn_.read_clear(rb_.last, rb_.wleft());
|
auto nread = conn_.read_clear(rb_.last, rb_.wleft());
|
||||||
|
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
if (reset_conn_rtimer_required_) {
|
|
||||||
reset_conn_rtimer_required_ = false;
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,8 +149,6 @@ int ClientHandler::read_clear() {
|
||||||
int ClientHandler::write_clear() {
|
int ClientHandler::write_clear() {
|
||||||
std::array<iovec, 2> iov;
|
std::array<iovec, 2> iov;
|
||||||
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (on_write() != 0) {
|
if (on_write() != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -228,11 +218,6 @@ int ClientHandler::read_tls() {
|
||||||
rb_.reset();
|
rb_.reset();
|
||||||
} else if (rb_.wleft() == 0) {
|
} else if (rb_.wleft() == 0) {
|
||||||
conn_.rlimit.stopw();
|
conn_.rlimit.stopw();
|
||||||
if (reset_conn_rtimer_required_) {
|
|
||||||
reset_conn_rtimer_required_ = false;
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,11 +228,6 @@ int ClientHandler::read_tls() {
|
||||||
auto nread = conn_.read_tls(rb_.last, rb_.wleft());
|
auto nread = conn_.read_tls(rb_.last, rb_.wleft());
|
||||||
|
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
if (reset_conn_rtimer_required_) {
|
|
||||||
reset_conn_rtimer_required_ = false;
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,8 +242,6 @@ int ClientHandler::read_tls() {
|
||||||
int ClientHandler::write_tls() {
|
int ClientHandler::write_tls() {
|
||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
ERR_clear_error();
|
ERR_clear_error();
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
@ -409,8 +387,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
|
||||||
worker_(worker),
|
worker_(worker),
|
||||||
left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
|
left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
|
||||||
affinity_hash_(-1),
|
affinity_hash_(-1),
|
||||||
should_close_after_write_(false),
|
should_close_after_write_(false) {
|
||||||
reset_conn_rtimer_required_(false) {
|
|
||||||
|
|
||||||
++worker_->get_worker_stat()->num_connections;
|
++worker_->get_worker_stat()->num_connections;
|
||||||
|
|
||||||
|
@ -516,10 +493,12 @@ void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClientHandler::signal_reset_upstream_conn_rtimer() {
|
void ClientHandler::repeat_read_timer() {
|
||||||
reset_conn_rtimer_required_ = true;
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ClientHandler::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
|
||||||
|
|
||||||
int ClientHandler::validate_next_proto() {
|
int ClientHandler::validate_next_proto() {
|
||||||
const unsigned char *next_proto = nullptr;
|
const unsigned char *next_proto = nullptr;
|
||||||
unsigned int next_proto_len = 0;
|
unsigned int next_proto_len = 0;
|
||||||
|
|
|
@ -88,7 +88,7 @@ public:
|
||||||
struct ev_loop *get_loop() const;
|
struct ev_loop *get_loop() const;
|
||||||
void reset_upstream_read_timeout(ev_tstamp t);
|
void reset_upstream_read_timeout(ev_tstamp t);
|
||||||
void reset_upstream_write_timeout(ev_tstamp t);
|
void reset_upstream_write_timeout(ev_tstamp t);
|
||||||
void signal_reset_upstream_conn_rtimer();
|
|
||||||
int validate_next_proto();
|
int validate_next_proto();
|
||||||
const std::string &get_ipaddr() const;
|
const std::string &get_ipaddr() const;
|
||||||
const std::string &get_port() const;
|
const std::string &get_port() const;
|
||||||
|
@ -151,6 +151,9 @@ public:
|
||||||
|
|
||||||
const UpstreamAddr *get_upstream_addr() const;
|
const UpstreamAddr *get_upstream_addr() const;
|
||||||
|
|
||||||
|
void repeat_read_timer();
|
||||||
|
void stop_read_timer();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Connection conn_;
|
Connection conn_;
|
||||||
ev_timer reneg_shutdown_timer_;
|
ev_timer reneg_shutdown_timer_;
|
||||||
|
@ -173,7 +176,6 @@ private:
|
||||||
size_t left_connhd_len_;
|
size_t left_connhd_len_;
|
||||||
int32_t affinity_hash_;
|
int32_t affinity_hash_;
|
||||||
bool should_close_after_write_;
|
bool should_close_after_write_;
|
||||||
bool reset_conn_rtimer_required_;
|
|
||||||
ReadBuf rb_;
|
ReadBuf rb_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -482,8 +482,11 @@ int Http2Session::initiate_connection() {
|
||||||
ev_timer_again(conn_.loop, &conn_.wt);
|
ev_timer_again(conn_.loop, &conn_.wt);
|
||||||
} else {
|
} else {
|
||||||
conn_.rlimit.startw();
|
conn_.rlimit.startw();
|
||||||
|
|
||||||
|
if (addr_->num_dconn == 0) {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -603,6 +606,8 @@ int Http2Session::downstream_connect_proxy() {
|
||||||
void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn) {
|
void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn) {
|
||||||
dconns_.append(dconn);
|
dconns_.append(dconn);
|
||||||
++addr_->num_dconn;
|
++addr_->num_dconn;
|
||||||
|
|
||||||
|
stop_read_timer();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Http2Session::remove_downstream_connection(
|
void Http2Session::remove_downstream_connection(
|
||||||
|
@ -611,6 +616,10 @@ void Http2Session::remove_downstream_connection(
|
||||||
dconns_.remove(dconn);
|
dconns_.remove(dconn);
|
||||||
dconn->detach_stream_data();
|
dconn->detach_stream_data();
|
||||||
|
|
||||||
|
if (addr_->num_dconn == 0) {
|
||||||
|
repeat_read_timer();
|
||||||
|
}
|
||||||
|
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
SSLOG(INFO, this) << "Remove downstream";
|
SSLOG(INFO, this) << "Remove downstream";
|
||||||
}
|
}
|
||||||
|
@ -1830,8 +1839,6 @@ int Http2Session::connected() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int Http2Session::read_clear() {
|
int Http2Session::read_clear() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
std::array<uint8_t, 16_k> buf;
|
std::array<uint8_t, 16_k> buf;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
@ -1852,8 +1859,6 @@ int Http2Session::read_clear() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int Http2Session::write_clear() {
|
int Http2Session::write_clear() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
std::array<struct iovec, MAX_WR_IOVCNT> iov;
|
std::array<struct iovec, MAX_WR_IOVCNT> iov;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
@ -1935,8 +1940,6 @@ int Http2Session::tls_handshake() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int Http2Session::read_tls() {
|
int Http2Session::read_tls() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
std::array<uint8_t, 16_k> buf;
|
std::array<uint8_t, 16_k> buf;
|
||||||
|
|
||||||
ERR_clear_error();
|
ERR_clear_error();
|
||||||
|
@ -1959,8 +1962,6 @@ int Http2Session::read_tls() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int Http2Session::write_tls() {
|
int Http2Session::write_tls() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
ERR_clear_error();
|
ERR_clear_error();
|
||||||
|
|
||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
|
@ -2223,4 +2224,10 @@ void Http2Session::check_retire() {
|
||||||
signal_write();
|
signal_write();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Http2Session::repeat_read_timer() {
|
||||||
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Http2Session::stop_read_timer() { ev_timer_stop(conn_.loop, &conn_.rt); }
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -203,6 +203,9 @@ public:
|
||||||
// shutdown the connection.
|
// shutdown the connection.
|
||||||
void check_retire();
|
void check_retire();
|
||||||
|
|
||||||
|
void repeat_read_timer();
|
||||||
|
void stop_read_timer();
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
// Disconnected
|
// Disconnected
|
||||||
DISCONNECTED,
|
DISCONNECTED,
|
||||||
|
|
|
@ -138,6 +138,9 @@ int Http2Upstream::upgrade_upstream(HttpsUpstream *http) {
|
||||||
downstream_queue_.add_pending(std::move(downstream));
|
downstream_queue_.add_pending(std::move(downstream));
|
||||||
downstream_queue_.mark_active(ptr);
|
downstream_queue_.mark_active(ptr);
|
||||||
|
|
||||||
|
// TODO This might not be necessary
|
||||||
|
handler_->stop_read_timer();
|
||||||
|
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
ULOG(INFO, this) << "Connection upgraded to HTTP/2";
|
ULOG(INFO, this) << "Connection upgraded to HTTP/2";
|
||||||
}
|
}
|
||||||
|
@ -431,9 +434,6 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
|
||||||
verbose_on_frame_recv_callback(session, frame, user_data);
|
verbose_on_frame_recv_callback(session, frame, user_data);
|
||||||
}
|
}
|
||||||
auto upstream = static_cast<Http2Upstream *>(user_data);
|
auto upstream = static_cast<Http2Upstream *>(user_data);
|
||||||
auto handler = upstream->get_client_handler();
|
|
||||||
|
|
||||||
handler->signal_reset_upstream_conn_rtimer();
|
|
||||||
|
|
||||||
switch (frame->hd.type) {
|
switch (frame->hd.type) {
|
||||||
case NGHTTP2_DATA: {
|
case NGHTTP2_DATA: {
|
||||||
|
@ -1374,6 +1374,8 @@ int Http2Upstream::error_reply(Downstream *downstream,
|
||||||
void Http2Upstream::add_pending_downstream(
|
void Http2Upstream::add_pending_downstream(
|
||||||
std::unique_ptr<Downstream> downstream) {
|
std::unique_ptr<Downstream> downstream) {
|
||||||
downstream_queue_.add_pending(std::move(downstream));
|
downstream_queue_.add_pending(std::move(downstream));
|
||||||
|
|
||||||
|
handler_->stop_read_timer();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Http2Upstream::remove_downstream(Downstream *downstream) {
|
void Http2Upstream::remove_downstream(Downstream *downstream) {
|
||||||
|
@ -1389,6 +1391,11 @@ void Http2Upstream::remove_downstream(Downstream *downstream) {
|
||||||
if (next_downstream) {
|
if (next_downstream) {
|
||||||
initiate_downstream(next_downstream);
|
initiate_downstream(next_downstream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (downstream_queue_.get_downstreams() == nullptr) {
|
||||||
|
// There is no downstream at the moment. Start idle timer now.
|
||||||
|
handler_->repeat_read_timer();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WARNING: Never call directly or indirectly nghttp2_session_send or
|
// WARNING: Never call directly or indirectly nghttp2_session_send or
|
||||||
|
|
|
@ -314,7 +314,8 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
|
||||||
// we may set read timer cb to idle_timeoutcb. Reset again.
|
// we may set read timer cb to idle_timeoutcb. Reset again.
|
||||||
conn_.rt.repeat = downstreamconf.timeout.read;
|
conn_.rt.repeat = downstreamconf.timeout.read;
|
||||||
ev_set_cb(&conn_.rt, timeoutcb);
|
ev_set_cb(&conn_.rt, timeoutcb);
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
ev_timer_stop(conn_.loop, &conn_.rt);
|
||||||
|
|
||||||
ev_set_cb(&conn_.rev, readcb);
|
ev_set_cb(&conn_.rev, readcb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -892,7 +893,6 @@ http_parser_settings htp_hooks = {
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
int HttpDownstreamConnection::read_clear() {
|
int HttpDownstreamConnection::read_clear() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
std::array<uint8_t, 16_k> buf;
|
std::array<uint8_t, 16_k> buf;
|
||||||
int rv;
|
int rv;
|
||||||
|
|
||||||
|
@ -918,8 +918,6 @@ int HttpDownstreamConnection::read_clear() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int HttpDownstreamConnection::write_clear() {
|
int HttpDownstreamConnection::write_clear() {
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
auto upstream = downstream_->get_upstream();
|
auto upstream = downstream_->get_upstream();
|
||||||
auto input = downstream_->get_request_buf();
|
auto input = downstream_->get_request_buf();
|
||||||
|
|
||||||
|
@ -1007,7 +1005,6 @@ int HttpDownstreamConnection::tls_handshake() {
|
||||||
int HttpDownstreamConnection::read_tls() {
|
int HttpDownstreamConnection::read_tls() {
|
||||||
ERR_clear_error();
|
ERR_clear_error();
|
||||||
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
std::array<uint8_t, 16_k> buf;
|
std::array<uint8_t, 16_k> buf;
|
||||||
int rv;
|
int rv;
|
||||||
|
|
||||||
|
@ -1035,8 +1032,6 @@ int HttpDownstreamConnection::read_tls() {
|
||||||
int HttpDownstreamConnection::write_tls() {
|
int HttpDownstreamConnection::write_tls() {
|
||||||
ERR_clear_error();
|
ERR_clear_error();
|
||||||
|
|
||||||
ev_timer_again(conn_.loop, &conn_.rt);
|
|
||||||
|
|
||||||
auto upstream = downstream_->get_upstream();
|
auto upstream = downstream_->get_upstream();
|
||||||
auto input = downstream_->get_request_buf();
|
auto input = downstream_->get_request_buf();
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,8 @@ int htp_msg_begin(http_parser *htp) {
|
||||||
|
|
||||||
upstream->attach_downstream(std::move(downstream));
|
upstream->attach_downstream(std::move(downstream));
|
||||||
|
|
||||||
|
handler->stop_read_timer();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
@ -287,8 +289,6 @@ int htp_hdrs_completecb(http_parser *htp) {
|
||||||
|
|
||||||
auto handler = upstream->get_client_handler();
|
auto handler = upstream->get_client_handler();
|
||||||
|
|
||||||
handler->signal_reset_upstream_conn_rtimer();
|
|
||||||
|
|
||||||
auto downstream = upstream->get_downstream();
|
auto downstream = upstream->get_downstream();
|
||||||
auto &req = downstream->request();
|
auto &req = downstream->request();
|
||||||
|
|
||||||
|
@ -436,10 +436,6 @@ namespace {
|
||||||
int htp_bodycb(http_parser *htp, const char *data, size_t len) {
|
int htp_bodycb(http_parser *htp, const char *data, size_t len) {
|
||||||
int rv;
|
int rv;
|
||||||
auto upstream = static_cast<HttpsUpstream *>(htp->data);
|
auto upstream = static_cast<HttpsUpstream *>(htp->data);
|
||||||
auto handler = upstream->get_client_handler();
|
|
||||||
|
|
||||||
handler->signal_reset_upstream_conn_rtimer();
|
|
||||||
|
|
||||||
auto downstream = upstream->get_downstream();
|
auto downstream = upstream->get_downstream();
|
||||||
rv = downstream->push_upload_data_chunk(
|
rv = downstream->push_upload_data_chunk(
|
||||||
reinterpret_cast<const uint8_t *>(data), len);
|
reinterpret_cast<const uint8_t *>(data), len);
|
||||||
|
@ -669,6 +665,8 @@ int HttpsUpstream::on_write() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handler_->repeat_read_timer();
|
||||||
|
|
||||||
return resume_read(SHRPX_NO_BUFFER, nullptr, 0);
|
return resume_read(SHRPX_NO_BUFFER, nullptr, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,8 +145,6 @@ void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type,
|
||||||
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
||||||
auto handler = upstream->get_client_handler();
|
auto handler = upstream->get_client_handler();
|
||||||
|
|
||||||
handler->signal_reset_upstream_conn_rtimer();
|
|
||||||
|
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case SPDYLAY_SYN_STREAM: {
|
case SPDYLAY_SYN_STREAM: {
|
||||||
if (LOG_ENABLED(INFO)) {
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
@ -440,9 +438,6 @@ void on_data_recv_callback(spdylay_session *session, uint8_t flags,
|
||||||
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
auto upstream = static_cast<SpdyUpstream *>(user_data);
|
||||||
auto downstream = static_cast<Downstream *>(
|
auto downstream = static_cast<Downstream *>(
|
||||||
spdylay_session_get_stream_user_data(session, stream_id));
|
spdylay_session_get_stream_user_data(session, stream_id));
|
||||||
auto handler = upstream->get_client_handler();
|
|
||||||
|
|
||||||
handler->signal_reset_upstream_conn_rtimer();
|
|
||||||
|
|
||||||
if (downstream && (flags & SPDYLAY_DATA_FLAG_FIN)) {
|
if (downstream && (flags & SPDYLAY_DATA_FLAG_FIN)) {
|
||||||
if (!downstream->validate_request_recv_body_length()) {
|
if (!downstream->validate_request_recv_body_length()) {
|
||||||
|
@ -1003,6 +998,8 @@ Downstream *SpdyUpstream::add_pending_downstream(int32_t stream_id) {
|
||||||
|
|
||||||
downstream_queue_.add_pending(std::move(downstream));
|
downstream_queue_.add_pending(std::move(downstream));
|
||||||
|
|
||||||
|
handler_->stop_read_timer();
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1019,6 +1016,10 @@ void SpdyUpstream::remove_downstream(Downstream *downstream) {
|
||||||
if (next_downstream) {
|
if (next_downstream) {
|
||||||
initiate_downstream(next_downstream);
|
initiate_downstream(next_downstream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (downstream_queue_.get_downstreams() == nullptr) {
|
||||||
|
handler_->repeat_read_timer();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WARNING: Never call directly or indirectly spdylay_session_send or
|
// WARNING: Never call directly or indirectly spdylay_session_send or
|
||||||
|
|
Loading…
Reference in New Issue