From 9d19e2bfe9387143ae369878a152c7995d991292 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sat, 9 Jun 2012 23:14:00 +0900 Subject: [PATCH] Rewrite connection pooling --- examples/Makefile.am | 1 + examples/shrpx.h | 2 + examples/shrpx_client_handler.cc | 41 +++++++ examples/shrpx_client_handler.h | 9 ++ examples/shrpx_downstream.cc | 102 +++++---------- examples/shrpx_downstream.h | 10 +- examples/shrpx_downstream_connection.cc | 157 ++++++++++++++++++++++++ examples/shrpx_downstream_connection.h | 57 +++++++++ examples/shrpx_downstream_queue.cc | 35 +----- examples/shrpx_downstream_queue.h | 5 - examples/shrpx_https_upstream.cc | 139 +++++++++++---------- examples/shrpx_io_control.cc | 12 +- examples/shrpx_spdy_upstream.cc | 115 ++++++++--------- examples/shrpx_spdy_upstream.h | 3 - 14 files changed, 435 insertions(+), 253 deletions(-) create mode 100644 examples/shrpx_downstream_connection.cc create mode 100644 examples/shrpx_downstream_connection.h diff --git a/examples/Makefile.am b/examples/Makefile.am index 0007b190..4de78d28 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -80,6 +80,7 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \ shrpx_https_upstream.cc shrpx_https_upstream.h \ shrpx_downstream_queue.cc shrpx_downstream_queue.h \ shrpx_downstream.cc shrpx_downstream.h \ + shrpx_downstream_connection.cc shrpx_downstream_connection.h \ shrpx_log.cc shrpx_log.h \ shrpx_http.cc shrpx_http.h \ shrpx_io_control.cc shrpx_io_control.h \ diff --git a/examples/shrpx.h b/examples/shrpx.h index 50e458e0..7fefc4b5 100644 --- a/examples/shrpx.h +++ b/examples/shrpx.h @@ -36,4 +36,6 @@ #define DIE() \ assert(0); +#define SHRPX_READ_WARTER_MARK (64*1024) + #endif // SHRPX_H diff --git a/examples/shrpx_client_handler.cc b/examples/shrpx_client_handler.cc index 88897dad..535cc469 100644 --- a/examples/shrpx_client_handler.cc +++ b/examples/shrpx_client_handler.cc @@ -28,6 +28,7 @@ #include "shrpx_spdy_upstream.h" #include "shrpx_https_upstream.h" #include "shrpx_config.h" +#include "shrpx_downstream_connection.h" namespace shrpx { @@ -108,6 +109,7 @@ ClientHandler::ClientHandler(bufferevent *bev, SSL *ssl, const char *ipaddr) should_close_after_write_(false) { bufferevent_enable(bev_, EV_READ | EV_WRITE); + bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); set_upstream_timeouts(&get_config()->upstream_read_timeout, &get_config()->upstream_write_timeout); set_bev_cb(0, upstream_writecb, upstream_eventcb); @@ -126,6 +128,10 @@ ClientHandler::~ClientHandler() shutdown(fd, SHUT_WR); close(fd); delete upstream_; + for(std::set::iterator i = dconn_pool_.begin(); + i != dconn_pool_.end(); ++i) { + delete *i; + } if(ENABLE_LOG) { LOG(INFO) << "Deleted"; } @@ -213,4 +219,39 @@ void ClientHandler::set_should_close_after_write(bool f) should_close_after_write_ = f; } +void ClientHandler::pool_downstream_connection(DownstreamConnection *dconn) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Pooling downstream connection " << dconn; + } + dconn_pool_.insert(dconn); +} + +void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Removing downstream connection " << dconn + << " from pool"; + } + dconn_pool_.erase(dconn); +} + +DownstreamConnection* ClientHandler::get_downstream_connection() +{ + if(dconn_pool_.empty()) { + if(ENABLE_LOG) { + LOG(INFO) << "Downstream connection pool is empty. Create new one"; + } + return new DownstreamConnection(this); + } else { + DownstreamConnection *dconn = *dconn_pool_.begin(); + dconn_pool_.erase(dconn); + if(ENABLE_LOG) { + LOG(INFO) << "Reuse downstream connection " << dconn + << " from pool"; + } + return dconn; + } +} + } // namespace shrpx diff --git a/examples/shrpx_client_handler.h b/examples/shrpx_client_handler.h index 0392f764..424d3b97 100644 --- a/examples/shrpx_client_handler.h +++ b/examples/shrpx_client_handler.h @@ -27,12 +27,15 @@ #include "shrpx.h" +#include + #include #include namespace shrpx { class Upstream; +class DownstreamConnection; class ClientHandler { public: @@ -51,12 +54,18 @@ public: bool get_should_close_after_write() const; void set_should_close_after_write(bool f); Upstream* get_upstream(); + + void pool_downstream_connection(DownstreamConnection *dconn); + void remove_downstream_connection(DownstreamConnection *dconn); + DownstreamConnection* get_downstream_connection(); private: bufferevent *bev_; SSL *ssl_; Upstream *upstream_; std::string ipaddr_; bool should_close_after_write_; + + std::set dconn_pool_; }; } // namespace shrpx diff --git a/examples/shrpx_downstream.cc b/examples/shrpx_downstream.cc index 97254da0..2ca90064 100644 --- a/examples/shrpx_downstream.cc +++ b/examples/shrpx_downstream.cc @@ -31,6 +31,7 @@ #include "shrpx_config.h" #include "shrpx_error.h" #include "shrpx_http.h" +#include "shrpx_downstream_connection.h" #include "util.h" using namespace spdylay; @@ -39,10 +40,9 @@ namespace shrpx { Downstream::Downstream(Upstream *upstream, int stream_id, int priority) : upstream_(upstream), - bev_(0), + dconn_(0), stream_id_(stream_id), priority_(priority), - counter_(1), ioctrl_(0), request_state_(INITIAL), request_major_(1), @@ -60,11 +60,6 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) { htparser_init(response_htp_, htp_type_response); htparser_set_userdata(response_htp_, this); - event_base *evbase = upstream_->get_client_handler()->get_evbase(); - bev_ = bufferevent_socket_new - (evbase, -1, - BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); - ioctrl_.set_bev(bev_); } Downstream::~Downstream() @@ -76,53 +71,30 @@ Downstream::~Downstream() // Passing NULL to evbuffer_free() causes segmentation fault. evbuffer_free(response_body_buf_); } - bufferevent_disable(bev_, EV_READ | EV_WRITE); - bufferevent_free(bev_); + if(dconn_) { + delete dconn_; + } free(response_htp_); if(ENABLE_LOG) { LOG(INFO) << "Deleted"; } } -int Downstream::get_counter() const +void Downstream::set_downstream_connection(DownstreamConnection *dconn) { - return counter_; -} - -void Downstream::reuse(int stream_id) -{ - stream_id_ = stream_id; - ++counter_; - request_state_ = INITIAL; -} - -void Downstream::idle() -{ - stream_id_ = -1; - priority_ = 0; - ioctrl_.force_resume_read(); - request_state_ = IDLE; - request_method_.clear(); - request_path_.clear(); - request_major_ = 1; - request_minor_ = 1; - chunked_request_ = false; - request_connection_close_ = false; - request_headers_.clear(); - - response_state_ = INITIAL; - response_http_status_ = 0; - response_major_ = 1; - response_minor_ = 1; - chunked_response_ = false; - response_connection_close_ = false; - response_headers_.clear(); - if(response_body_buf_) { - size_t len = evbuffer_get_length(response_body_buf_); - evbuffer_drain(response_body_buf_, len); + dconn_ = dconn; + if(dconn_) { + ioctrl_.set_bev(dconn_->get_bev()); + } else { + ioctrl_.set_bev(0); } } +DownstreamConnection* Downstream::get_downstream_connection() +{ + return dconn_; +} + void Downstream::pause_read(IOCtrlReason reason) { ioctrl_.pause_read(reason); @@ -218,28 +190,6 @@ int32_t Downstream::get_stream_id() const return stream_id_; } -int Downstream::start_connection() -{ - bufferevent_setcb(bev_, - upstream_->get_downstream_readcb(), - upstream_->get_downstream_writecb(), - upstream_->get_downstream_eventcb(), this); - bufferevent_enable(bev_, EV_READ | EV_WRITE); - bufferevent_set_timeouts(bev_, - &get_config()->downstream_read_timeout, - &get_config()->downstream_write_timeout); - int rv = bufferevent_socket_connect - (bev_, - // TODO maybe not thread-safe? - const_cast(&get_config()->downstream_addr.sa), - get_config()->downstream_addrlen); - if(rv == 0) { - return 0; - } else { - return SHRPX_ERR_NETWORK; - } -} - void Downstream::set_request_state(int state) { request_state_ = state; @@ -286,6 +236,9 @@ int Downstream::push_request_headers() via_value = (*i).second; continue; } + if(util::strieq((*i).first.c_str(), "host")) { + continue; + } hdrs += (*i).first; hdrs += ": "; hdrs += (*i).second; @@ -318,7 +271,8 @@ int Downstream::push_request_headers() if(ENABLE_LOG) { LOG(INFO) << "Downstream request headers\n" << hdrs; } - evbuffer *output = bufferevent_get_output(bev_); + bufferevent *bev = dconn_->get_bev(); + evbuffer *output = bufferevent_get_output(bev); evbuffer_add(output, hdrs.c_str(), hdrs.size()); return 0; } @@ -329,13 +283,17 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) // buffer using push_request_headers(). ssize_t res = 0; int rv; - evbuffer *output = bufferevent_get_output(bev_); + bufferevent *bev = dconn_->get_bev(); + evbuffer *output = bufferevent_get_output(bev); if(chunked_request_) { char chunk_size_hex[16]; rv = snprintf(chunk_size_hex, sizeof(chunk_size_hex), "%X\r\n", static_cast(datalen)); - evbuffer_add(output, chunk_size_hex, rv); res += rv; + rv = evbuffer_add(output, chunk_size_hex, rv); + if(rv == -1) { + return -1; + } } rv = evbuffer_add(output, data, datalen); if(rv == -1) { @@ -348,7 +306,8 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) int Downstream::end_upload_data() { if(chunked_request_) { - evbuffer *output = bufferevent_get_output(bev_); + bufferevent *bev = dconn_->get_bev(); + evbuffer *output = bufferevent_get_output(bev); evbuffer_add(output, "0\r\n\r\n", 5); } return 0; @@ -493,7 +452,8 @@ htparse_hooks htp_hooks = { int Downstream::parse_http_response() { - evbuffer *input = bufferevent_get_input(bev_); + bufferevent *bev = dconn_->get_bev(); + evbuffer *input = bufferevent_get_input(bev); unsigned char *mem = evbuffer_pullup(input, -1); size_t nread = htparser_run(response_htp_, &htp_hooks, reinterpret_cast(mem), diff --git a/examples/shrpx_downstream.h b/examples/shrpx_downstream.h index 46479af8..80f3cdba 100644 --- a/examples/shrpx_downstream.h +++ b/examples/shrpx_downstream.h @@ -44,6 +44,7 @@ extern "C" { namespace shrpx { class Upstream; +class DownstreamConnection; typedef std::vector > Headers; @@ -51,16 +52,14 @@ class Downstream { public: Downstream(Upstream *upstream, int stream_id, int priority); ~Downstream(); - int start_connection(); Upstream* get_upstream() const; int32_t get_stream_id() const; void set_priority(int pri); void pause_read(IOCtrlReason reason); bool resume_read(IOCtrlReason reason); void force_resume_read(); - void idle(); - void reuse(int stream_id); - int get_counter() const; + void set_downstream_connection(DownstreamConnection *dconn); + DownstreamConnection* get_downstream_connection(); // downstream request API const Headers& get_request_headers() const; void add_request_header(const std::string& name, const std::string& value); @@ -106,10 +105,9 @@ public: evbuffer* get_response_body_buf(); private: Upstream *upstream_; - bufferevent *bev_; + DownstreamConnection *dconn_; int32_t stream_id_; int priority_; - int counter_; IOControl ioctrl_; int request_state_; std::string request_method_; diff --git a/examples/shrpx_downstream_connection.cc b/examples/shrpx_downstream_connection.cc new file mode 100644 index 00000000..613cd7bb --- /dev/null +++ b/examples/shrpx_downstream_connection.cc @@ -0,0 +1,157 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_downstream_connection.h" + +#include "shrpx_client_handler.h" +#include "shrpx_upstream.h" +#include "shrpx_downstream.h" +#include "shrpx_config.h" +#include "shrpx_error.h" + +namespace shrpx { + +DownstreamConnection::DownstreamConnection(ClientHandler *client_handler) + : client_handler_(client_handler), + bev_(0), + downstream_(0) +{ + +} + +DownstreamConnection::~DownstreamConnection() +{ + if(bev_) { + bufferevent_disable(bev_, EV_READ | EV_WRITE); + bufferevent_free(bev_); + } + // Downstream and DownstreamConnection may be deleted + // asynchronously. + if(downstream_) { + downstream_->set_downstream_connection(0); + } +} + +int DownstreamConnection::attach_downstream(Downstream *downstream) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Attaching downstream connection " << this << " to " + << "downstream " << downstream; + } + Upstream *upstream = downstream->get_upstream(); + if(!bev_) { + event_base *evbase = client_handler_->get_evbase(); + bev_ = bufferevent_socket_new + (evbase, -1, + BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); + int rv = bufferevent_socket_connect + (bev_, + // TODO maybe not thread-safe? + const_cast(&get_config()->downstream_addr.sa), + get_config()->downstream_addrlen); + if(rv != 0) { + bufferevent_free(bev_); + bev_ = 0; + return SHRPX_ERR_NETWORK; + } + if(ENABLE_LOG) { + LOG(INFO) << "Connecting to downstream server " << this; + } + } + downstream->set_downstream_connection(this); + downstream_ = downstream; + bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); + bufferevent_enable(bev_, EV_READ | EV_WRITE); + bufferevent_setcb(bev_, + upstream->get_downstream_readcb(), + upstream->get_downstream_writecb(), + upstream->get_downstream_eventcb(), this); + + bufferevent_set_timeouts(bev_, + &get_config()->downstream_read_timeout, + &get_config()->downstream_write_timeout); + return 0; +} + +namespace { +// Gets called when DownstreamConnection is pooled in ClientHandler. +void idle_eventcb(bufferevent *bev, short events, void *arg) +{ + DownstreamConnection *dconn = reinterpret_cast(arg); + if(events & BEV_EVENT_CONNECTED) { + // Downstream was detached before connection established? + // This may be safe to be left. + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connected?" << dconn; + } + return; + } + if(events & BEV_EVENT_EOF) { + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connection EOF " << dconn; + } + } else if(events & BEV_EVENT_TIMEOUT) { + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connection timeout " << dconn; + } + } else if(events & BEV_EVENT_ERROR) { + if(ENABLE_LOG) { + LOG(INFO) << "Idle downstream connection error " << dconn; + } + } + ClientHandler *client_handler = dconn->get_client_handler(); + client_handler->remove_downstream_connection(dconn); + delete dconn; +} +} // namespace + +void DownstreamConnection::detach_downstream(Downstream *downstream) +{ + if(ENABLE_LOG) { + LOG(INFO) << "Detaching downstream connection " << this << " from " + << "downstream " << downstream; + } + downstream->set_downstream_connection(0); + downstream_ = 0; + bufferevent_enable(bev_, EV_READ | EV_WRITE); + bufferevent_setcb(bev_, 0, 0, idle_eventcb, this); + client_handler_->pool_downstream_connection(this); +} + +ClientHandler* DownstreamConnection::get_client_handler() +{ + return client_handler_; +} + +Downstream* DownstreamConnection::get_downstream() +{ + return downstream_; +} + +bufferevent* DownstreamConnection::get_bev() +{ + return bev_; +} + +} // namespace shrpx diff --git a/examples/shrpx_downstream_connection.h b/examples/shrpx_downstream_connection.h new file mode 100644 index 00000000..f866ba08 --- /dev/null +++ b/examples/shrpx_downstream_connection.h @@ -0,0 +1,57 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_DOWNSTREAM_CONNECTION_H +#define SHRPX_DOWNSTREAM_CONNECTION_H + +#include "shrpx.h" + +#include +#include + +namespace shrpx { + +class ClientHandler; +class Downstream; + +class DownstreamConnection { +public: + DownstreamConnection(ClientHandler *client_handler); + ~DownstreamConnection(); + int attach_downstream(Downstream *downstream); + void detach_downstream(Downstream *downstream); + bufferevent* get_bev(); + int push_data(const void *data, size_t len); + + ClientHandler* get_client_handler(); + Downstream* get_downstream(); +private: + ClientHandler *client_handler_; + bufferevent *bev_; + Downstream *downstream_; +}; + +} // namespace shrpx + +#endif // SHRPX_DOWNSTREAM_CONNECTION_H diff --git a/examples/shrpx_downstream_queue.cc b/examples/shrpx_downstream_queue.cc index 4214ab7c..2d217127 100644 --- a/examples/shrpx_downstream_queue.cc +++ b/examples/shrpx_downstream_queue.cc @@ -39,10 +39,6 @@ DownstreamQueue::~DownstreamQueue() i != downstreams_.end(); ++i) { delete (*i).second; } - for(std::set::iterator i = idle_downstreams_.begin(); - i != idle_downstreams_.end(); ++i) { - delete *i; - } } void DownstreamQueue::add(Downstream *downstream) @@ -50,18 +46,9 @@ void DownstreamQueue::add(Downstream *downstream) downstreams_[downstream->get_stream_id()] = downstream; } -int DownstreamQueue::start(Downstream *downstream) -{ - return downstream->start_connection(); -} - void DownstreamQueue::remove(Downstream *downstream) { - if(downstream->get_request_state() == Downstream::IDLE) { - idle_downstreams_.erase(downstream); - } else { - downstreams_.erase(downstream->get_stream_id()); - } + downstreams_.erase(downstream->get_stream_id()); } Downstream* DownstreamQueue::find(int32_t stream_id) @@ -74,24 +61,4 @@ Downstream* DownstreamQueue::find(int32_t stream_id) } } -Downstream* DownstreamQueue::reuse(int32_t stream_id) -{ - if(idle_downstreams_.empty()) { - return 0; - } - Downstream* downstream = *idle_downstreams_.begin(); - idle_downstreams_.erase(downstream); - downstream->reuse(stream_id); - add(downstream); - return downstream; -} - -void DownstreamQueue::idle(Downstream *downstream) -{ - assert(downstream->get_request_state() != Downstream::IDLE); - remove(downstream); - downstream->idle(); - idle_downstreams_.insert(downstream); -} - } // namespace shrpx diff --git a/examples/shrpx_downstream_queue.h b/examples/shrpx_downstream_queue.h index 1ee0078e..cf04f175 100644 --- a/examples/shrpx_downstream_queue.h +++ b/examples/shrpx_downstream_queue.h @@ -30,7 +30,6 @@ #include #include -#include namespace shrpx { @@ -41,14 +40,10 @@ public: DownstreamQueue(); ~DownstreamQueue(); void add(Downstream *downstream); - int start(Downstream *downstream); void remove(Downstream *downstream); Downstream* find(int32_t stream_id); - Downstream* reuse(int32_t stream_id); - void idle(Downstream *downstream); private: std::map downstreams_; - std::set idle_downstreams_; }; } // namespace shrpx diff --git a/examples/shrpx_https_upstream.cc b/examples/shrpx_https_upstream.cc index aae27258..09d0f734 100644 --- a/examples/shrpx_https_upstream.cc +++ b/examples/shrpx_https_upstream.cc @@ -29,6 +29,7 @@ #include "shrpx_client_handler.h" #include "shrpx_downstream.h" +#include "shrpx_downstream_connection.h" #include "shrpx_http.h" #include "shrpx_config.h" #include "shrpx_error.h" @@ -39,7 +40,7 @@ using namespace spdylay; namespace shrpx { namespace { -const size_t SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024; +const size_t SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES = 64*1024; const size_t SHRPX_HTTPS_MAX_HEADER_LENGTH = 64*1024; } // namespace @@ -76,20 +77,8 @@ int htp_msg_begin(htparser *htp) HttpsUpstream *upstream; upstream = reinterpret_cast(htparser_get_userdata(htp)); upstream->reset_current_header_length(); - Downstream *downstream = upstream->get_top_downstream(); - if(downstream) { - // Keep-Alived connection - if(ENABLE_LOG) { - LOG(INFO) << "Reusing downstream"; - } - downstream->reuse(0); - } else { - if(ENABLE_LOG) { - LOG(INFO) << "Creating new downstream"; - } - downstream = new Downstream(upstream, 0, 0); - upstream->add_downstream(downstream); - } + Downstream *downstream = new Downstream(upstream, 0, 0); + upstream->add_downstream(downstream); return 0; } } // namespace @@ -169,18 +158,20 @@ int htp_hdrs_completecb(htparser *htp) downstream->set_request_major(htparser_get_major(htp)); downstream->set_request_minor(htparser_get_minor(htp)); - downstream->push_request_headers(); - downstream->set_request_state(Downstream::HEADER_COMPLETE); + DownstreamConnection *dconn; + dconn = upstream->get_client_handler()->get_downstream_connection(); - if(downstream->get_counter() == 1) { - int rv = downstream->start_connection(); - if(rv != 0) { - LOG(ERROR) << "Upstream connection failed"; - downstream->set_request_state(Downstream::CONNECT_FAIL); - return 1; - } + int rv = dconn->attach_downstream(downstream); + if(rv != 0) { + downstream->set_request_state(Downstream::CONNECT_FAIL); + downstream->set_downstream_connection(0); + delete dconn; + return 1; + } else { + downstream->push_request_headers(); + downstream->set_request_state(Downstream::HEADER_COMPLETE); + return 0; } - return 0; } } // namespace @@ -250,19 +241,29 @@ int HttpsUpstream::on_read() htpparse_error htperr = htparser_get_error(htp_); if(htperr == htparse_error_user) { Downstream *downstream = get_top_downstream(); - if(downstream && - downstream->get_request_state() == Downstream::CONNECT_FAIL) { + if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { get_client_handler()->set_should_close_after_write(true); error_reply(503); - } else if(current_header_length_ > SHRPX_HTTPS_MAX_HEADER_LENGTH) { + // Downstream gets deleted after response body is read. + } else { + assert(downstream->get_request_state() == Downstream::MSG_COMPLETE); + if(downstream->get_downstream_connection() == 0) { + // Error response already be sent + assert(downstream->get_response_state() == Downstream::MSG_COMPLETE); + pop_downstream(); + delete downstream; + } else { + pause_read(SHRPX_MSG_BLOCK); + } + } + } else if(htperr == htparse_error_none) { + if(current_header_length_ > SHRPX_HTTPS_MAX_HEADER_LENGTH) { LOG(WARNING) << "Request Header too long:" << current_header_length_ << " bytes"; get_client_handler()->set_should_close_after_write(true); error_reply(400); - } else { - pause_read(SHRPX_MSG_BLOCK); } - } else if(htperr != htparse_error_none) { + } else { if(ENABLE_LOG) { LOG(INFO) << "Upstream http parse failure: " << htparser_get_strerror(htp_); @@ -273,6 +274,10 @@ int HttpsUpstream::on_read() return 0; } +namespace { +void https_downstream_readcb(bufferevent *bev, void *ptr); +} // namespace + int HttpsUpstream::on_write() { Downstream *downstream = get_top_downstream(); @@ -309,28 +314,28 @@ void HttpsUpstream::resume_read(IOCtrlReason reason) namespace { void https_downstream_readcb(bufferevent *bev, void *ptr) { - Downstream *downstream = reinterpret_cast(ptr); + DownstreamConnection *dconn = reinterpret_cast(ptr); + Downstream *downstream = dconn->get_downstream(); HttpsUpstream *upstream; upstream = static_cast(downstream->get_upstream()); - if(downstream->get_request_state() == Downstream::IDLE) { - if(ENABLE_LOG) { - LOG(INFO) << "Delete idle downstream in https_downstream_readcb"; - } - upstream->pop_downstream(); - delete downstream; - return; - } int rv = downstream->parse_http_response(); if(rv == 0) { if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { - assert(downstream == upstream->get_top_downstream()); if(downstream->get_response_connection_close()) { + // Connection close + downstream->set_downstream_connection(0); + delete dconn; + dconn = 0; + } else { + // Keep-alive + dconn->detach_downstream(downstream); + } + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->pop_downstream(); delete downstream; - } else { - downstream->idle(); + // Process next HTTP request + upstream->resume_read(SHRPX_MSG_BLOCK); } - upstream->resume_read(SHRPX_MSG_BLOCK); } else { ClientHandler *handler = upstream->get_client_handler(); bufferevent *bev = handler->get_bev(); @@ -341,13 +346,19 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) } } else { if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { + // We already sent HTTP response headers to upstream + // client. Just close the upstream connection. delete upstream->get_client_handler(); } else { + // We did not sent any HTTP response, so sent error + // response. Cannot reuse downstream connection in this case. upstream->error_reply(502); - assert(downstream == upstream->get_top_downstream()); - upstream->pop_downstream(); - delete downstream; - upstream->resume_read(SHRPX_MSG_BLOCK); + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + upstream->pop_downstream(); + delete downstream; + // Process next HTTP request + upstream->resume_read(SHRPX_MSG_BLOCK); + } } } } @@ -362,18 +373,10 @@ void https_downstream_writecb(bufferevent *bev, void *ptr) namespace { void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) { - Downstream *downstream = reinterpret_cast(ptr); + DownstreamConnection *dconn = reinterpret_cast(ptr); + Downstream *downstream = dconn->get_downstream(); HttpsUpstream *upstream; upstream = static_cast(downstream->get_upstream()); - if(downstream->get_request_state() == Downstream::IDLE) { - if(ENABLE_LOG) { - LOG(INFO) << "Delete idle downstream in https_downstream_eventcb"; - } - upstream->pop_downstream(); - delete downstream; - upstream->resume_read(SHRPX_MSG_BLOCK); - return; - } if(events & BEV_EVENT_CONNECTED) { if(ENABLE_LOG) { LOG(INFO) << "Downstream connection established. downstream " @@ -400,9 +403,11 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } upstream->error_reply(502); } - upstream->pop_downstream(); - delete downstream; - upstream->resume_read(SHRPX_MSG_BLOCK); + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + upstream->pop_downstream(); + delete downstream; + upstream->resume_read(SHRPX_MSG_BLOCK); + } } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(ENABLE_LOG) { LOG(INFO) << "Downstream error/timeout. " << downstream; @@ -416,9 +421,11 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } upstream->error_reply(status); } - upstream->pop_downstream(); - delete downstream; - upstream->resume_read(SHRPX_MSG_BLOCK); + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + upstream->pop_downstream(); + delete downstream; + upstream->resume_read(SHRPX_MSG_BLOCK); + } } } } // namespace @@ -439,6 +446,10 @@ void HttpsUpstream::error_reply(int status_code) evbuffer *output = bufferevent_get_output(handler_->get_bev()); evbuffer_add(output, header.c_str(), header.size()); evbuffer_add(output, html.c_str(), html.size()); + Downstream *downstream = get_top_downstream(); + if(downstream) { + downstream->set_response_state(Downstream::MSG_COMPLETE); + } } bufferevent_data_cb HttpsUpstream::get_downstream_readcb() diff --git a/examples/shrpx_io_control.cc b/examples/shrpx_io_control.cc index 3113958c..9f04406d 100644 --- a/examples/shrpx_io_control.cc +++ b/examples/shrpx_io_control.cc @@ -44,14 +44,18 @@ void IOControl::set_bev(bufferevent *bev) void IOControl::pause_read(IOCtrlReason reason) { rdbits_ |= reason; - bufferevent_disable(bev_, EV_READ); + if(bev_) { + bufferevent_disable(bev_, EV_READ); + } } bool IOControl::resume_read(IOCtrlReason reason) { rdbits_ &= ~reason; if(rdbits_ == 0) { - bufferevent_enable(bev_, EV_READ); + if(bev_) { + bufferevent_enable(bev_, EV_READ); + } return true; } else { return false; @@ -61,7 +65,9 @@ bool IOControl::resume_read(IOCtrlReason reason) void IOControl::force_resume_read() { rdbits_ = 0; - bufferevent_enable(bev_, EV_READ); + if(bev_) { + bufferevent_enable(bev_, EV_READ); + } } } // namespace shrpx diff --git a/examples/shrpx_spdy_upstream.cc b/examples/shrpx_spdy_upstream.cc index 1bb69306..d7c853d9 100644 --- a/examples/shrpx_spdy_upstream.cc +++ b/examples/shrpx_spdy_upstream.cc @@ -29,6 +29,7 @@ #include "shrpx_client_handler.h" #include "shrpx_downstream.h" +#include "shrpx_downstream_connection.h" #include "shrpx_config.h" #include "shrpx_http.h" #include "util.h" @@ -38,7 +39,7 @@ using namespace spdylay; namespace shrpx { namespace { -const size_t SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024; +const size_t SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES = 64*1024; } // namespace namespace { @@ -101,7 +102,16 @@ void on_stream_close_callback } else { downstream->set_request_state(Downstream::STREAM_CLOSED); if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { - upstream->remove_or_idle_downstream(downstream); + // At this point, downstream response was read + if(!downstream->get_response_connection_close()) { + // Keep-alive + DownstreamConnection *dconn; + dconn = downstream->get_downstream_connection(); + if(dconn) { + dconn->detach_downstream(downstream); + } + } + upstream->remove_downstream(downstream); } else { // At this point, downstream read may be paused. To reclaim // file descriptor, enable read here and catch read @@ -126,22 +136,10 @@ void on_ctrl_recv_callback << frame->syn_stream.stream_id; } Downstream *downstream; - downstream = upstream->reuse_downstream(frame->syn_stream.stream_id); - if(downstream) { - if(ENABLE_LOG) { - LOG(INFO) << "Reusing downstream for stream_id=" - << frame->syn_stream.stream_id; - } - downstream->set_priority(frame->syn_stream.pri); - } else { - if(ENABLE_LOG) { - LOG(INFO) << "Creating new downstream for stream_id=" - << frame->syn_stream.stream_id; - } - downstream = new Downstream(upstream, - frame->syn_stream.stream_id, - frame->syn_stream.pri); - } + downstream = new Downstream(upstream, + frame->syn_stream.stream_id, + frame->syn_stream.pri); + upstream->add_downstream(downstream); downstream->init_response_body_buf(); char **nv = frame->syn_stream.nv; @@ -164,6 +162,15 @@ void on_ctrl_recv_callback LOG(INFO) << "Upstream spdy request headers:\n" << ss.str(); } + DownstreamConnection *dconn; + dconn = upstream->get_client_handler()->get_downstream_connection(); + int rv = dconn->attach_downstream(downstream); + if(rv != 0) { + // If downstream connection fails, issue RST_STREAM. + upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + downstream->set_request_state(Downstream::CONNECT_FAIL); + return; + } downstream->push_request_headers(); downstream->set_request_state(Downstream::HEADER_COMPLETE); if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) { @@ -174,14 +181,6 @@ void on_ctrl_recv_callback } downstream->set_request_state(Downstream::MSG_COMPLETE); } - if(downstream->get_counter() == 1) { - upstream->add_downstream(downstream); - if(upstream->start_downstream(downstream) != 0) { - // If downstream connection fails, issue RST_STREAM. - upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - downstream->set_request_state(Downstream::CONNECT_FAIL); - } - } break; } default: @@ -297,20 +296,14 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr) if(ENABLE_LOG) { LOG(INFO) << "spdy_downstream_readcb"; } - Downstream *downstream = reinterpret_cast(ptr); + DownstreamConnection *dconn = reinterpret_cast(ptr); + Downstream *downstream = dconn->get_downstream(); SpdyUpstream *upstream; upstream = static_cast(downstream->get_upstream()); - if(downstream->get_request_state() == Downstream::IDLE) { - if(ENABLE_LOG) { - LOG(INFO) << "Delete idle downstream in spdy_downstream_readcb"; - } - upstream->remove_downstream(downstream); - delete downstream; - return; - } - // If upstream SPDY stream was closed, we just close downstream, - // because there is no consumer now. if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { + // If upstream SPDY stream was closed, we just close downstream, + // because there is no consumer now. Downstream connection is also + // closed in this case. upstream->remove_downstream(downstream); delete downstream; return; @@ -326,6 +319,11 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr) upstream->error_reply(downstream, 502); } downstream->set_response_state(Downstream::MSG_COMPLETE); + // Clearly, we have to close downstream connection on http parser + // failure. + downstream->set_downstream_connection(0); + delete dconn; + dconn = 0; } upstream->send(); } @@ -340,17 +338,10 @@ void spdy_downstream_writecb(bufferevent *bev, void *ptr) namespace { void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) { - Downstream *downstream = reinterpret_cast(ptr); + DownstreamConnection *dconn = reinterpret_cast(ptr); + Downstream *downstream = dconn->get_downstream(); SpdyUpstream *upstream; upstream = static_cast(downstream->get_upstream()); - if(downstream->get_request_state() == Downstream::IDLE) { - if(ENABLE_LOG) { - LOG(INFO) << "Delete idle downstream in spdy_downstream_eventcb"; - } - upstream->remove_downstream(downstream); - delete downstream; - return; - } if(events & BEV_EVENT_CONNECTED) { if(ENABLE_LOG) { LOG(INFO) << "Downstream connection established. Downstream " @@ -362,11 +353,16 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) << downstream->get_stream_id(); } if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { - //If stream was closed already, we don't need to send reply at + // If stream was closed already, we don't need to send reply at // the first place. We can delete downstream. upstream->remove_downstream(downstream); delete downstream; } else { + // Delete downstream connection. If we don't delete it here, it + // will be pooled in on_stream_close_callback. + downstream->set_downstream_connection(0); + delete dconn; + dconn = 0; // downstream wil be deleted in on_stream_close_callback. if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { // Server may indicate the end of the request by EOF @@ -391,6 +387,11 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) upstream->remove_downstream(downstream); delete downstream; } else { + // Delete downstream connection. If we don't delete it here, it + // will be pooled in on_stream_close_callback. + downstream->set_downstream_connection(0); + delete dconn; + dconn = 0; if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); @@ -499,11 +500,6 @@ void SpdyUpstream::add_downstream(Downstream *downstream) downstream_queue_.add(downstream); } -int SpdyUpstream::start_downstream(Downstream *downstream) -{ - return downstream_queue_.start(downstream); -} - void SpdyUpstream::remove_downstream(Downstream *downstream) { downstream_queue_.remove(downstream); @@ -514,21 +510,6 @@ Downstream* SpdyUpstream::find_downstream(int32_t stream_id) return downstream_queue_.find(stream_id); } -Downstream* SpdyUpstream::reuse_downstream(int32_t stream_id) -{ - return downstream_queue_.reuse(stream_id); -} - -void SpdyUpstream::remove_or_idle_downstream(Downstream *downstream) -{ - if(downstream->get_response_connection_close()) { - downstream_queue_.remove(downstream); - delete downstream; - } else { - downstream_queue_.idle(downstream); - } -} - spdylay_session* SpdyUpstream::get_spdy_session() { return session_; diff --git a/examples/shrpx_spdy_upstream.h b/examples/shrpx_spdy_upstream.h index 0d74d043..90ffc858 100644 --- a/examples/shrpx_spdy_upstream.h +++ b/examples/shrpx_spdy_upstream.h @@ -50,10 +50,7 @@ public: virtual bufferevent_event_cb get_downstream_eventcb(); void add_downstream(Downstream *downstream); void remove_downstream(Downstream *downstream); - int start_downstream(Downstream *downstream); Downstream* find_downstream(int32_t stream_id); - Downstream* reuse_downstream(int32_t stream_id); - void remove_or_idle_downstream(Downstream *downstream); spdylay_session* get_spdy_session();