From 9ba19df813fcf8ac133973214021409797c30fbf Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Fri, 8 Feb 2013 21:46:58 +0900 Subject: [PATCH] shrpx: Add --spdy-bridge option With --spdy-bridge option, it listens SPDY/HTTPS connections from front end and forwards them to the backend in SPDY. The usage will be written later. This change fixes the crash when more than 2 outstanding SpdyDownstreamConnection objects are added to SpdySession and establishing connection to SPDY backend is failed. --- src/shrpx.cc | 18 +++++++++----- src/shrpx_config.cc | 3 +++ src/shrpx_config.h | 2 ++ src/shrpx_downstream.cc | 17 ++++--------- src/shrpx_downstream.h | 3 +-- src/shrpx_downstream_connection.h | 3 +-- src/shrpx_http_downstream_connection.cc | 9 ++----- src/shrpx_http_downstream_connection.h | 3 +-- src/shrpx_https_upstream.cc | 15 ++++++----- src/shrpx_https_upstream.h | 2 +- src/shrpx_spdy_downstream_connection.cc | 33 ++++++++++++++++--------- src/shrpx_spdy_downstream_connection.h | 4 +-- src/shrpx_spdy_session.cc | 15 ++++++++--- src/shrpx_spdy_upstream.cc | 20 +++++++++------ src/shrpx_spdy_upstream.h | 2 +- src/shrpx_upstream.h | 2 +- 16 files changed, 83 insertions(+), 68 deletions(-) diff --git a/src/shrpx.cc b/src/shrpx.cc index 73a3a254..d3162629 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -243,7 +243,8 @@ int event_loop() event_base *evbase = event_base_new(); SSL_CTX *sv_ssl_ctx = get_config()->default_ssl_ctx; - SSL_CTX *cl_ssl_ctx = get_config()->client_mode ? + SSL_CTX *cl_ssl_ctx = (get_config()->client_mode || + get_config()->spdy_bridge)? ssl::create_ssl_client_context() : 0; ListenHandler *listener_handler = new ListenHandler(evbase, sv_ssl_ctx, @@ -372,6 +373,7 @@ void fill_default_config() mod_config()->backlog = 256; mod_config()->ciphers = 0; mod_config()->spdy_proxy = false; + mod_config()->spdy_bridge = false; mod_config()->client_proxy = false; mod_config()->client = false; mod_config()->client_mode = false; @@ -599,6 +601,7 @@ int main(int argc, char **argv) {"private-key-passwd-file", required_argument, &flag, 22}, {"no-via", no_argument, &flag, 23}, {"subcert", required_argument, &flag, 24}, + {"spdy-bridge", no_argument, &flag, 25}, {0, 0, 0, 0 } }; int option_index = 0; @@ -749,6 +752,10 @@ int main(int argc, char **argv) // --subcert cmdcfgs.push_back(std::make_pair(SHRPX_OPT_SUBCERT, optarg)); break; + case 25: + // --spdy-bridge + cmdcfgs.push_back(std::make_pair(SHRPX_OPT_SPDY_BRIDGE, "yes")); + break; default: break; } @@ -793,11 +800,10 @@ int main(int argc, char **argv) exit(EXIT_FAILURE); } - int mode = get_config()->spdy_proxy | - (get_config()->client_proxy << 1) | (get_config()->client << 2); - if(mode != 0 && mode != 1 && mode != 2 && mode != 4) { - LOG(FATAL) << "--spdy-proxy, --client-proxy and --client cannot be used " - << "at the same time."; + if(get_config()->spdy_proxy + get_config()->spdy_bridge + + get_config()->client_proxy + get_config()->client > 1) { + LOG(FATAL) << "--spdy-proxy, --spdy-bridge, --client-proxy and --client " + << "cannot be used at the same time."; exit(EXIT_FAILURE); } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 601ce2cf..b062a577 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -57,6 +57,7 @@ SHRPX_OPT_SPDY_MAX_CONCURRENT_STREAMS[] = "spdy-max-concurrent-streams"; const char SHRPX_OPT_LOG_LEVEL[] = "log-level"; const char SHRPX_OPT_DAEMON[] = "daemon"; const char SHRPX_OPT_SPDY_PROXY[] = "spdy-proxy"; +const char SHRPX_OPT_SPDY_BRIDGE[] = "spdy-bridge"; const char SHRPX_OPT_CLIENT_PROXY[] = "client-proxy"; const char SHRPX_OPT_ADD_X_FORWARDED_FOR[] = "add-x-forwarded-for"; const char SHRPX_OPT_NO_VIA[] = "no-via"; @@ -208,6 +209,8 @@ int parse_config(const char *opt, const char *optarg) mod_config()->daemon = util::strieq(optarg, "yes"); } else if(util::strieq(opt, SHRPX_OPT_SPDY_PROXY)) { mod_config()->spdy_proxy = util::strieq(optarg, "yes"); + } else if(util::strieq(opt, SHRPX_OPT_SPDY_BRIDGE)) { + mod_config()->spdy_bridge = util::strieq(optarg, "yes"); } else if(util::strieq(opt, SHRPX_OPT_CLIENT_PROXY)) { mod_config()->client_proxy = util::strieq(optarg, "yes"); } else if(util::strieq(opt, SHRPX_OPT_ADD_X_FORWARDED_FOR)) { diff --git a/src/shrpx_config.h b/src/shrpx_config.h index c05c556e..f9750fed 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -55,6 +55,7 @@ extern const char SHRPX_OPT_SPDY_MAX_CONCURRENT_STREAMS[]; extern const char SHRPX_OPT_LOG_LEVEL[]; extern const char SHRPX_OPT_DAEMON[]; extern const char SHRPX_OPT_SPDY_PROXY[]; +extern const char SHRPX_OPT_SPDY_BRIDGE[]; extern const char SHRPX_OPT_CLIENT_PROXY[]; extern const char SHRPX_OPT_ADD_X_FORWARDED_FOR[]; extern const char SHRPX_OPT_NO_VIA[]; @@ -112,6 +113,7 @@ struct Config { size_t num_worker; size_t spdy_max_concurrent_streams; bool spdy_proxy; + bool spdy_bridge; bool client_proxy; bool add_x_forwarded_for; bool no_via; diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 930b9f9a..e52da2d3 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -95,12 +95,12 @@ void Downstream::pause_read(IOCtrlReason reason) } } -bool Downstream::resume_read(IOCtrlReason reason) +int Downstream::resume_read(IOCtrlReason reason) { if(dconn_) { return dconn_->resume_read(reason); } else { - return false; + return 0; } } @@ -429,7 +429,9 @@ void body_buf_cb(evbuffer *body, size_t oldlen, size_t newlen, void *arg) { Downstream *downstream = reinterpret_cast(arg); if(newlen == 0) { - downstream->resume_read(SHRPX_NO_BUFFER); + if(downstream->resume_read(SHRPX_NO_BUFFER) == -1) { + DLOG(WARNING, downstream) << "Sending WINDOW_UPDATE failed"; + } } } } // namespace @@ -487,13 +489,4 @@ int32_t Downstream::get_downstream_stream_id() const return downstream_stream_id_; } -int Downstream::on_upstream_write() -{ - if(dconn_) { - return dconn_->on_upstream_write(); - } else { - return 0; - } -} - } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 2cdbe05f..06d0e4a8 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -52,7 +52,7 @@ public: int32_t get_stream_id() const; void set_priority(int pri); void pause_read(IOCtrlReason reason); - bool resume_read(IOCtrlReason reason); + int resume_read(IOCtrlReason reason); void force_resume_read(); // Set stream ID for downstream SPDY connection. void set_downstream_stream_id(int32_t stream_id); @@ -132,7 +132,6 @@ public: // Call this method when there is incoming data in downstream // connection. int on_read(); - int on_upstream_write(); static const size_t OUTPUT_UPPER_THRES = 64*1024; private: diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index 98082f1b..80b8fbf0 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -46,14 +46,13 @@ public: virtual int end_upload_data() = 0; virtual void pause_read(IOCtrlReason reason) = 0; - virtual bool resume_read(IOCtrlReason reason) = 0; + virtual int resume_read(IOCtrlReason reason) = 0; virtual void force_resume_read() = 0; virtual bool get_output_buffer_full() = 0; virtual int on_read() = 0; virtual int on_write() = 0; - virtual int on_upstream_write() = 0; ClientHandler* get_client_handler(); Downstream* get_downstream(); diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index da4b13fa..b01dee4c 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -330,9 +330,9 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) ioctrl_.pause_read(reason); } -bool HttpDownstreamConnection::resume_read(IOCtrlReason reason) +int HttpDownstreamConnection::resume_read(IOCtrlReason reason) { - return ioctrl_.resume_read(reason); + return ioctrl_.resume_read(reason) ? 0 : -1; } void HttpDownstreamConnection::force_resume_read() @@ -463,9 +463,4 @@ int HttpDownstreamConnection::on_write() return 0; } -int HttpDownstreamConnection::on_upstream_write() -{ - return 0; -} - } // namespace shrpx diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index 7fc9d53f..025ba939 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -49,14 +49,13 @@ public: virtual int end_upload_data(); virtual void pause_read(IOCtrlReason reason); - virtual bool resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason); virtual void force_resume_read(); virtual bool get_output_buffer_full(); virtual int on_read(); virtual int on_write(); - virtual int on_upstream_write(); bufferevent* get_bev(); private: diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index e41e9dfe..aea4b99c 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -333,8 +333,7 @@ int HttpsUpstream::on_write() int rv = 0; Downstream *downstream = get_downstream(); if(downstream) { - downstream->resume_read(SHRPX_NO_BUFFER); - rv = downstream->on_upstream_write(); + rv = downstream->resume_read(SHRPX_NO_BUFFER); } return rv; } @@ -354,7 +353,7 @@ void HttpsUpstream::pause_read(IOCtrlReason reason) ioctrl_.pause_read(reason); } -int HttpsUpstream::resume_read(IOCtrlReason reason) +int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream) { if(ioctrl_.resume_read(reason)) { // Process remaining data in input buffer here because these bytes @@ -400,7 +399,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) } else { upstream->delete_downstream(); // Process next HTTP request - upstream->resume_read(SHRPX_MSG_BLOCK); + upstream->resume_read(SHRPX_MSG_BLOCK, 0); } } } else { @@ -426,7 +425,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->delete_downstream(); // Process next HTTP request - upstream->resume_read(SHRPX_MSG_BLOCK); + upstream->resume_read(SHRPX_MSG_BLOCK, 0); } } } @@ -443,7 +442,7 @@ void https_downstream_writecb(bufferevent *bev, void *ptr) Downstream *downstream = dconn->get_downstream(); HttpsUpstream *upstream; upstream = static_cast(downstream->get_upstream()); - upstream->resume_read(SHRPX_NO_BUFFER); + upstream->resume_read(SHRPX_NO_BUFFER, downstream); } } // namespace @@ -494,7 +493,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->delete_downstream(); - upstream->resume_read(SHRPX_MSG_BLOCK); + upstream->resume_read(SHRPX_MSG_BLOCK, 0); } } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(LOG_ENABLED(INFO)) { @@ -518,7 +517,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->delete_downstream(); - upstream->resume_read(SHRPX_MSG_BLOCK); + upstream->resume_read(SHRPX_MSG_BLOCK, 0); } } } diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index 09787d6d..e0513d86 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -55,7 +55,7 @@ public: int error_reply(int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_spdy_downstream_connection.cc b/src/shrpx_spdy_downstream_connection.cc index bf9a6d1c..69c781f4 100644 --- a/src/shrpx_spdy_downstream_connection.cc +++ b/src/shrpx_spdy_downstream_connection.cc @@ -56,6 +56,9 @@ SpdyDownstreamConnection::SpdyDownstreamConnection SpdyDownstreamConnection::~SpdyDownstreamConnection() { + if(LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Deleting"; + } if(request_body_buf_) { evbuffer_free(request_body_buf_); } @@ -70,6 +73,9 @@ SpdyDownstreamConnection::~SpdyDownstreamConnection() if(downstream_) { downstream_->set_downstream_connection(0); } + if(LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Deleted"; + } } int SpdyDownstreamConnection::init_request_body_buf() @@ -176,7 +182,10 @@ ssize_t spdy_data_read_callback(spdylay_session *session, *eof = 1; break; } else { - if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER) == -1) { + // This is important because it will handle flow control + // stuff. + if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER, + downstream) == -1) { // In this case, downstream may be deleted. return SPDYLAY_ERR_DEFERRED; } @@ -386,17 +395,7 @@ int SpdyDownstreamConnection::end_upload_data() return 0; } -int SpdyDownstreamConnection::on_read() -{ - return 0; -} - -int SpdyDownstreamConnection::on_write() -{ - return 0; -} - -int SpdyDownstreamConnection::on_upstream_write() +int SpdyDownstreamConnection::resume_read(IOCtrlReason reason) { int rv; if(spdy_->get_state() == SpdySession::CONNECTED && @@ -413,6 +412,16 @@ int SpdyDownstreamConnection::on_upstream_write() return 0; } +int SpdyDownstreamConnection::on_read() +{ + return 0; +} + +int SpdyDownstreamConnection::on_write() +{ + return 0; +} + evbuffer* SpdyDownstreamConnection::get_request_body_buf() const { return request_body_buf_; diff --git a/src/shrpx_spdy_downstream_connection.h b/src/shrpx_spdy_downstream_connection.h index ff91ef96..8ee8d09c 100644 --- a/src/shrpx_spdy_downstream_connection.h +++ b/src/shrpx_spdy_downstream_connection.h @@ -50,14 +50,14 @@ public: virtual int end_upload_data(); virtual void pause_read(IOCtrlReason reason) {} - virtual bool resume_read(IOCtrlReason reason) { return true; } + virtual int resume_read(IOCtrlReason reason); virtual void force_resume_read() {} virtual bool get_output_buffer_full(); virtual int on_read(); virtual int on_write(); - virtual int on_upstream_write(); + int send(); int init_request_body_buf(); diff --git a/src/shrpx_spdy_session.cc b/src/shrpx_spdy_session.cc index 25acf541..a64175d6 100644 --- a/src/shrpx_spdy_session.cc +++ b/src/shrpx_spdy_session.cc @@ -96,13 +96,20 @@ int SpdySession::disconnect() // Delete all client handler associated to Downstream. When deleting // SpdyDownstreamConnection, it calls this object's - // remove_downstream_connection(). So first dump them in vector and - // iterate and delete them. + // remove_downstream_connection(). The multiple + // SpdyDownstreamConnection objects belong to the same ClientHandler + // object. So first dump ClientHandler objects and delete them once + // and for all. std::vector vec(dconns_.begin(), dconns_.end()); + std::set handlers; for(size_t i = 0; i < vec.size(); ++i) { - remove_downstream_connection(vec[i]); - delete vec[i]->get_client_handler(); + handlers.insert(vec[i]->get_client_handler()); } + for(std::set::iterator i = handlers.begin(), + eoi = handlers.end(); i != eoi; ++i) { + delete *i; + } + dconns_.clear(); for(std::set::iterator i = streams_.begin(), eoi = streams_.end(); i != eoi; ++i) { diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index dfdff29f..bb30c0de 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -177,7 +177,11 @@ void on_ctrl_recv_callback upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); return; } - if(get_config()->spdy_proxy && scheme && path[0] == '/') { + // SpdyDownstreamConnection examines request path to find + // scheme. We construct abs URI for spdy_bridge mode as well as + // spdy_proxy mode. + if((get_config()->spdy_proxy || get_config()->spdy_bridge) && + scheme && path[0] == '/') { std::string reqpath = scheme; reqpath += "://"; reqpath += host; @@ -485,12 +489,7 @@ void spdy_downstream_writecb(bufferevent *bev, void *ptr) Downstream *downstream = dconn->get_downstream(); SpdyUpstream *upstream; upstream = static_cast(downstream->get_upstream()); - if(upstream->get_flow_control()) { - if(downstream->get_recv_window_size() >= - upstream->get_initial_window_size()/2) { - upstream->window_update(downstream); - } - } + upstream->resume_read(SHRPX_NO_BUFFER, downstream); } } // namespace @@ -858,8 +857,13 @@ int32_t SpdyUpstream::get_initial_window_size() const void SpdyUpstream::pause_read(IOCtrlReason reason) {} -int SpdyUpstream::resume_read(IOCtrlReason reason) +int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream) { + if(get_flow_control()) { + if(downstream->get_recv_window_size() >= get_initial_window_size()/2) { + window_update(downstream); + } + } return 0; } diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 5f673960..8734edee 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -59,7 +59,7 @@ public: int error_reply(Downstream *downstream, int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index c4e4cf84..ab655ba9 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -53,7 +53,7 @@ public: virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual void pause_read(IOCtrlReason reason) = 0; - virtual int resume_read(IOCtrlReason reason) = 0; + virtual int resume_read(IOCtrlReason reason, Downstream *downstream) = 0; }; } // namespace shrpx