nghttpx: Improve priority handling in http2 upstream

This commit is contained in:
Tatsuhiro Tsujikawa 2014-12-23 17:38:03 +09:00
parent a8a66843db
commit 89291e4010
2 changed files with 42 additions and 5 deletions

View File

@ -46,7 +46,6 @@ using namespace nghttp2;
namespace shrpx { namespace shrpx {
namespace { namespace {
const size_t OUTBUF_MAX_THRES = 16 * 1024;
const size_t INBUF_MAX_THRES = 16 * 1024; const size_t INBUF_MAX_THRES = 16 * 1024;
} // namespace } // namespace
@ -598,11 +597,19 @@ uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) {
} }
} // namespace } // namespace
namespace {
void write_notify_cb(evutil_socket_t fd, short what, void *arg) {
auto upstream = static_cast<Http2Upstream *>(arg);
upstream->perform_send();
}
} // namespace
Http2Upstream::Http2Upstream(ClientHandler *handler) Http2Upstream::Http2Upstream(ClientHandler *handler)
: downstream_queue_(get_config()->http2_proxy : downstream_queue_(get_config()->http2_proxy
? get_config()->downstream_connections_per_host ? get_config()->downstream_connections_per_host
: 0), : 0),
handler_(handler), session_(nullptr), settings_timerev_(nullptr) { handler_(handler), session_(nullptr), settings_timerev_(nullptr),
write_notifyev_(nullptr), deferred_(false) {
reset_timeouts(); reset_timeouts();
int rv; int rv;
@ -691,6 +698,8 @@ Http2Upstream::Http2Upstream(ClientHandler *handler)
} }
} }
} }
write_notifyev_ = evtimer_new(handler_->get_evbase(), write_notify_cb, this);
} }
Http2Upstream::~Http2Upstream() { Http2Upstream::~Http2Upstream() {
@ -698,6 +707,9 @@ Http2Upstream::~Http2Upstream() {
if (settings_timerev_) { if (settings_timerev_) {
event_free(settings_timerev_); event_free(settings_timerev_);
} }
if (write_notifyev_) {
event_free(write_notifyev_);
}
} }
int Http2Upstream::on_read() { int Http2Upstream::on_read() {
@ -732,8 +744,17 @@ int Http2Upstream::on_read() {
int Http2Upstream::on_write() { return send(); } int Http2Upstream::on_write() { return send(); }
// After this function call, downstream may be deleted.
int Http2Upstream::send() { int Http2Upstream::send() {
if (write_notifyev_ == nullptr) {
return -1;
}
event_active(write_notifyev_, 0, 0);
return 0;
}
// After this function call, downstream may be deleted.
int Http2Upstream::perform_send() {
int rv; int rv;
uint8_t buf[16384]; uint8_t buf[16384];
auto bev = handler_->get_bev(); auto bev = handler_->get_bev();
@ -742,8 +763,7 @@ int Http2Upstream::send() {
sendbuf.reset(output, buf, sizeof(buf), handler_->get_write_limit()); sendbuf.reset(output, buf, sizeof(buf), handler_->get_write_limit());
for (;;) { for (;;) {
// Check buffer length and break if it is large enough. // Check buffer length and break if it is large enough.
if (handler_->get_outbuf_length() + sendbuf.get_buflen() >= if (handler_->get_outbuf_length() > 0) {
OUTBUF_MAX_THRES) {
break; break;
} }
@ -763,8 +783,13 @@ int Http2Upstream::send() {
ULOG(FATAL, this) << "evbuffer_add() failed"; ULOG(FATAL, this) << "evbuffer_add() failed";
return -1; return -1;
} }
if (deferred_) {
break;
}
} }
deferred_ = false;
rv = sendbuf.flush(); rv = sendbuf.flush();
if (rv != 0) { if (rv != 0) {
ULOG(FATAL, this) << "evbuffer_add() failed"; ULOG(FATAL, this) << "evbuffer_add() failed";
@ -1076,6 +1101,11 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
} }
if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
// Higher priority stream is likely to be handled first and if it
// has no data to send, we'd better to break here, so that we have
// a chance to read another incoming data from backend to this
// stream.
upstream->set_deferred(true);
return NGHTTP2_ERR_DEFERRED; return NGHTTP2_ERR_DEFERRED;
} }
@ -1431,4 +1461,6 @@ int Http2Upstream::on_downstream_reset() {
return 0; return 0;
} }
void Http2Upstream::set_deferred(bool f) { deferred_ = f; }
} // namespace shrpx } // namespace shrpx

View File

@ -51,6 +51,7 @@ public:
virtual int on_downstream_abort_request(Downstream *downstream, virtual int on_downstream_abort_request(Downstream *downstream,
unsigned int status_code); unsigned int status_code);
int send(); int send();
int perform_send();
virtual ClientHandler *get_client_handler() const; virtual ClientHandler *get_client_handler() const;
virtual bufferevent_data_cb get_downstream_readcb(); virtual bufferevent_data_cb get_downstream_readcb();
virtual bufferevent_data_cb get_downstream_writecb(); virtual bufferevent_data_cb get_downstream_writecb();
@ -92,6 +93,8 @@ public:
void start_downstream(Downstream *downstream); void start_downstream(Downstream *downstream);
void initiate_downstream(std::unique_ptr<Downstream> downstream); void initiate_downstream(std::unique_ptr<Downstream> downstream);
void set_deferred(bool f);
nghttp2::util::EvbufferBuffer sendbuf; nghttp2::util::EvbufferBuffer sendbuf;
private: private:
@ -100,7 +103,9 @@ private:
ClientHandler *handler_; ClientHandler *handler_;
nghttp2_session *session_; nghttp2_session *session_;
event *settings_timerev_; event *settings_timerev_;
event *write_notifyev_;
bool flow_control_; bool flow_control_;
bool deferred_;
}; };
} // namespace shrpx } // namespace shrpx