shrpx: Add --spdy-bridge option

With --spdy-bridge option, it listens SPDY/HTTPS connections from
front end and forwards them to the backend in SPDY. The usage will be
written later. This change fixes the crash when more than 2
outstanding SpdyDownstreamConnection objects are added to SpdySession
and establishing connection to SPDY backend is failed.
This commit is contained in:
Tatsuhiro Tsujikawa 2013-02-08 21:46:58 +09:00
parent 8925c58d71
commit 9ba19df813
16 changed files with 83 additions and 68 deletions

View File

@ -243,7 +243,8 @@ int event_loop()
event_base *evbase = event_base_new(); event_base *evbase = event_base_new();
SSL_CTX *sv_ssl_ctx = get_config()->default_ssl_ctx; SSL_CTX *sv_ssl_ctx = get_config()->default_ssl_ctx;
SSL_CTX *cl_ssl_ctx = get_config()->client_mode ? SSL_CTX *cl_ssl_ctx = (get_config()->client_mode ||
get_config()->spdy_bridge)?
ssl::create_ssl_client_context() : 0; ssl::create_ssl_client_context() : 0;
ListenHandler *listener_handler = new ListenHandler(evbase, sv_ssl_ctx, ListenHandler *listener_handler = new ListenHandler(evbase, sv_ssl_ctx,
@ -372,6 +373,7 @@ void fill_default_config()
mod_config()->backlog = 256; mod_config()->backlog = 256;
mod_config()->ciphers = 0; mod_config()->ciphers = 0;
mod_config()->spdy_proxy = false; mod_config()->spdy_proxy = false;
mod_config()->spdy_bridge = false;
mod_config()->client_proxy = false; mod_config()->client_proxy = false;
mod_config()->client = false; mod_config()->client = false;
mod_config()->client_mode = false; mod_config()->client_mode = false;
@ -599,6 +601,7 @@ int main(int argc, char **argv)
{"private-key-passwd-file", required_argument, &flag, 22}, {"private-key-passwd-file", required_argument, &flag, 22},
{"no-via", no_argument, &flag, 23}, {"no-via", no_argument, &flag, 23},
{"subcert", required_argument, &flag, 24}, {"subcert", required_argument, &flag, 24},
{"spdy-bridge", no_argument, &flag, 25},
{0, 0, 0, 0 } {0, 0, 0, 0 }
}; };
int option_index = 0; int option_index = 0;
@ -749,6 +752,10 @@ int main(int argc, char **argv)
// --subcert // --subcert
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_SUBCERT, optarg)); cmdcfgs.push_back(std::make_pair(SHRPX_OPT_SUBCERT, optarg));
break; break;
case 25:
// --spdy-bridge
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_SPDY_BRIDGE, "yes"));
break;
default: default:
break; break;
} }
@ -793,11 +800,10 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int mode = get_config()->spdy_proxy | if(get_config()->spdy_proxy + get_config()->spdy_bridge +
(get_config()->client_proxy << 1) | (get_config()->client << 2); get_config()->client_proxy + get_config()->client > 1) {
if(mode != 0 && mode != 1 && mode != 2 && mode != 4) { LOG(FATAL) << "--spdy-proxy, --spdy-bridge, --client-proxy and --client "
LOG(FATAL) << "--spdy-proxy, --client-proxy and --client cannot be used " << "cannot be used at the same time.";
<< "at the same time.";
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }

View File

@ -57,6 +57,7 @@ SHRPX_OPT_SPDY_MAX_CONCURRENT_STREAMS[] = "spdy-max-concurrent-streams";
const char SHRPX_OPT_LOG_LEVEL[] = "log-level"; const char SHRPX_OPT_LOG_LEVEL[] = "log-level";
const char SHRPX_OPT_DAEMON[] = "daemon"; const char SHRPX_OPT_DAEMON[] = "daemon";
const char SHRPX_OPT_SPDY_PROXY[] = "spdy-proxy"; const char SHRPX_OPT_SPDY_PROXY[] = "spdy-proxy";
const char SHRPX_OPT_SPDY_BRIDGE[] = "spdy-bridge";
const char SHRPX_OPT_CLIENT_PROXY[] = "client-proxy"; const char SHRPX_OPT_CLIENT_PROXY[] = "client-proxy";
const char SHRPX_OPT_ADD_X_FORWARDED_FOR[] = "add-x-forwarded-for"; const char SHRPX_OPT_ADD_X_FORWARDED_FOR[] = "add-x-forwarded-for";
const char SHRPX_OPT_NO_VIA[] = "no-via"; const char SHRPX_OPT_NO_VIA[] = "no-via";
@ -208,6 +209,8 @@ int parse_config(const char *opt, const char *optarg)
mod_config()->daemon = util::strieq(optarg, "yes"); mod_config()->daemon = util::strieq(optarg, "yes");
} else if(util::strieq(opt, SHRPX_OPT_SPDY_PROXY)) { } else if(util::strieq(opt, SHRPX_OPT_SPDY_PROXY)) {
mod_config()->spdy_proxy = util::strieq(optarg, "yes"); mod_config()->spdy_proxy = util::strieq(optarg, "yes");
} else if(util::strieq(opt, SHRPX_OPT_SPDY_BRIDGE)) {
mod_config()->spdy_bridge = util::strieq(optarg, "yes");
} else if(util::strieq(opt, SHRPX_OPT_CLIENT_PROXY)) { } else if(util::strieq(opt, SHRPX_OPT_CLIENT_PROXY)) {
mod_config()->client_proxy = util::strieq(optarg, "yes"); mod_config()->client_proxy = util::strieq(optarg, "yes");
} else if(util::strieq(opt, SHRPX_OPT_ADD_X_FORWARDED_FOR)) { } else if(util::strieq(opt, SHRPX_OPT_ADD_X_FORWARDED_FOR)) {

View File

@ -55,6 +55,7 @@ extern const char SHRPX_OPT_SPDY_MAX_CONCURRENT_STREAMS[];
extern const char SHRPX_OPT_LOG_LEVEL[]; extern const char SHRPX_OPT_LOG_LEVEL[];
extern const char SHRPX_OPT_DAEMON[]; extern const char SHRPX_OPT_DAEMON[];
extern const char SHRPX_OPT_SPDY_PROXY[]; extern const char SHRPX_OPT_SPDY_PROXY[];
extern const char SHRPX_OPT_SPDY_BRIDGE[];
extern const char SHRPX_OPT_CLIENT_PROXY[]; extern const char SHRPX_OPT_CLIENT_PROXY[];
extern const char SHRPX_OPT_ADD_X_FORWARDED_FOR[]; extern const char SHRPX_OPT_ADD_X_FORWARDED_FOR[];
extern const char SHRPX_OPT_NO_VIA[]; extern const char SHRPX_OPT_NO_VIA[];
@ -112,6 +113,7 @@ struct Config {
size_t num_worker; size_t num_worker;
size_t spdy_max_concurrent_streams; size_t spdy_max_concurrent_streams;
bool spdy_proxy; bool spdy_proxy;
bool spdy_bridge;
bool client_proxy; bool client_proxy;
bool add_x_forwarded_for; bool add_x_forwarded_for;
bool no_via; bool no_via;

View File

@ -95,12 +95,12 @@ void Downstream::pause_read(IOCtrlReason reason)
} }
} }
bool Downstream::resume_read(IOCtrlReason reason) int Downstream::resume_read(IOCtrlReason reason)
{ {
if(dconn_) { if(dconn_) {
return dconn_->resume_read(reason); return dconn_->resume_read(reason);
} else { } else {
return false; return 0;
} }
} }
@ -429,7 +429,9 @@ void body_buf_cb(evbuffer *body, size_t oldlen, size_t newlen, void *arg)
{ {
Downstream *downstream = reinterpret_cast<Downstream*>(arg); Downstream *downstream = reinterpret_cast<Downstream*>(arg);
if(newlen == 0) { if(newlen == 0) {
downstream->resume_read(SHRPX_NO_BUFFER); if(downstream->resume_read(SHRPX_NO_BUFFER) == -1) {
DLOG(WARNING, downstream) << "Sending WINDOW_UPDATE failed";
}
} }
} }
} // namespace } // namespace
@ -487,13 +489,4 @@ int32_t Downstream::get_downstream_stream_id() const
return downstream_stream_id_; return downstream_stream_id_;
} }
int Downstream::on_upstream_write()
{
if(dconn_) {
return dconn_->on_upstream_write();
} else {
return 0;
}
}
} // namespace shrpx } // namespace shrpx

View File

@ -52,7 +52,7 @@ public:
int32_t get_stream_id() const; int32_t get_stream_id() const;
void set_priority(int pri); void set_priority(int pri);
void pause_read(IOCtrlReason reason); void pause_read(IOCtrlReason reason);
bool resume_read(IOCtrlReason reason); int resume_read(IOCtrlReason reason);
void force_resume_read(); void force_resume_read();
// Set stream ID for downstream SPDY connection. // Set stream ID for downstream SPDY connection.
void set_downstream_stream_id(int32_t stream_id); void set_downstream_stream_id(int32_t stream_id);
@ -132,7 +132,6 @@ public:
// Call this method when there is incoming data in downstream // Call this method when there is incoming data in downstream
// connection. // connection.
int on_read(); int on_read();
int on_upstream_write();
static const size_t OUTPUT_UPPER_THRES = 64*1024; static const size_t OUTPUT_UPPER_THRES = 64*1024;
private: private:

View File

@ -46,14 +46,13 @@ public:
virtual int end_upload_data() = 0; virtual int end_upload_data() = 0;
virtual void pause_read(IOCtrlReason reason) = 0; virtual void pause_read(IOCtrlReason reason) = 0;
virtual bool resume_read(IOCtrlReason reason) = 0; virtual int resume_read(IOCtrlReason reason) = 0;
virtual void force_resume_read() = 0; virtual void force_resume_read() = 0;
virtual bool get_output_buffer_full() = 0; virtual bool get_output_buffer_full() = 0;
virtual int on_read() = 0; virtual int on_read() = 0;
virtual int on_write() = 0; virtual int on_write() = 0;
virtual int on_upstream_write() = 0;
ClientHandler* get_client_handler(); ClientHandler* get_client_handler();
Downstream* get_downstream(); Downstream* get_downstream();

View File

@ -330,9 +330,9 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason)
ioctrl_.pause_read(reason); ioctrl_.pause_read(reason);
} }
bool HttpDownstreamConnection::resume_read(IOCtrlReason reason) int HttpDownstreamConnection::resume_read(IOCtrlReason reason)
{ {
return ioctrl_.resume_read(reason); return ioctrl_.resume_read(reason) ? 0 : -1;
} }
void HttpDownstreamConnection::force_resume_read() void HttpDownstreamConnection::force_resume_read()
@ -463,9 +463,4 @@ int HttpDownstreamConnection::on_write()
return 0; return 0;
} }
int HttpDownstreamConnection::on_upstream_write()
{
return 0;
}
} // namespace shrpx } // namespace shrpx

View File

@ -49,14 +49,13 @@ public:
virtual int end_upload_data(); virtual int end_upload_data();
virtual void pause_read(IOCtrlReason reason); virtual void pause_read(IOCtrlReason reason);
virtual bool resume_read(IOCtrlReason reason); virtual int resume_read(IOCtrlReason reason);
virtual void force_resume_read(); virtual void force_resume_read();
virtual bool get_output_buffer_full(); virtual bool get_output_buffer_full();
virtual int on_read(); virtual int on_read();
virtual int on_write(); virtual int on_write();
virtual int on_upstream_write();
bufferevent* get_bev(); bufferevent* get_bev();
private: private:

View File

@ -333,8 +333,7 @@ int HttpsUpstream::on_write()
int rv = 0; int rv = 0;
Downstream *downstream = get_downstream(); Downstream *downstream = get_downstream();
if(downstream) { if(downstream) {
downstream->resume_read(SHRPX_NO_BUFFER); rv = downstream->resume_read(SHRPX_NO_BUFFER);
rv = downstream->on_upstream_write();
} }
return rv; return rv;
} }
@ -354,7 +353,7 @@ void HttpsUpstream::pause_read(IOCtrlReason reason)
ioctrl_.pause_read(reason); ioctrl_.pause_read(reason);
} }
int HttpsUpstream::resume_read(IOCtrlReason reason) int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream)
{ {
if(ioctrl_.resume_read(reason)) { if(ioctrl_.resume_read(reason)) {
// Process remaining data in input buffer here because these bytes // Process remaining data in input buffer here because these bytes
@ -400,7 +399,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
} else { } else {
upstream->delete_downstream(); upstream->delete_downstream();
// Process next HTTP request // Process next HTTP request
upstream->resume_read(SHRPX_MSG_BLOCK); upstream->resume_read(SHRPX_MSG_BLOCK, 0);
} }
} }
} else { } else {
@ -426,7 +425,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->delete_downstream(); upstream->delete_downstream();
// Process next HTTP request // Process next HTTP request
upstream->resume_read(SHRPX_MSG_BLOCK); upstream->resume_read(SHRPX_MSG_BLOCK, 0);
} }
} }
} }
@ -443,7 +442,7 @@ void https_downstream_writecb(bufferevent *bev, void *ptr)
Downstream *downstream = dconn->get_downstream(); Downstream *downstream = dconn->get_downstream();
HttpsUpstream *upstream; HttpsUpstream *upstream;
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream()); upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
upstream->resume_read(SHRPX_NO_BUFFER); upstream->resume_read(SHRPX_NO_BUFFER, downstream);
} }
} // namespace } // namespace
@ -494,7 +493,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
} }
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->delete_downstream(); upstream->delete_downstream();
upstream->resume_read(SHRPX_MSG_BLOCK); upstream->resume_read(SHRPX_MSG_BLOCK, 0);
} }
} else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
@ -518,7 +517,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
} }
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->delete_downstream(); upstream->delete_downstream();
upstream->resume_read(SHRPX_MSG_BLOCK); upstream->resume_read(SHRPX_MSG_BLOCK, 0);
} }
} }
} }

View File

@ -55,7 +55,7 @@ public:
int error_reply(int status_code); int error_reply(int status_code);
virtual void pause_read(IOCtrlReason reason); virtual void pause_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason); virtual int resume_read(IOCtrlReason reason, Downstream *downstream);
virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,

View File

@ -56,6 +56,9 @@ SpdyDownstreamConnection::SpdyDownstreamConnection
SpdyDownstreamConnection::~SpdyDownstreamConnection() SpdyDownstreamConnection::~SpdyDownstreamConnection()
{ {
if(LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Deleting";
}
if(request_body_buf_) { if(request_body_buf_) {
evbuffer_free(request_body_buf_); evbuffer_free(request_body_buf_);
} }
@ -70,6 +73,9 @@ SpdyDownstreamConnection::~SpdyDownstreamConnection()
if(downstream_) { if(downstream_) {
downstream_->set_downstream_connection(0); downstream_->set_downstream_connection(0);
} }
if(LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Deleted";
}
} }
int SpdyDownstreamConnection::init_request_body_buf() int SpdyDownstreamConnection::init_request_body_buf()
@ -176,7 +182,10 @@ ssize_t spdy_data_read_callback(spdylay_session *session,
*eof = 1; *eof = 1;
break; break;
} else { } else {
if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER) == -1) { // This is important because it will handle flow control
// stuff.
if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER,
downstream) == -1) {
// In this case, downstream may be deleted. // In this case, downstream may be deleted.
return SPDYLAY_ERR_DEFERRED; return SPDYLAY_ERR_DEFERRED;
} }
@ -386,17 +395,7 @@ int SpdyDownstreamConnection::end_upload_data()
return 0; return 0;
} }
int SpdyDownstreamConnection::on_read() int SpdyDownstreamConnection::resume_read(IOCtrlReason reason)
{
return 0;
}
int SpdyDownstreamConnection::on_write()
{
return 0;
}
int SpdyDownstreamConnection::on_upstream_write()
{ {
int rv; int rv;
if(spdy_->get_state() == SpdySession::CONNECTED && if(spdy_->get_state() == SpdySession::CONNECTED &&
@ -413,6 +412,16 @@ int SpdyDownstreamConnection::on_upstream_write()
return 0; return 0;
} }
int SpdyDownstreamConnection::on_read()
{
return 0;
}
int SpdyDownstreamConnection::on_write()
{
return 0;
}
evbuffer* SpdyDownstreamConnection::get_request_body_buf() const evbuffer* SpdyDownstreamConnection::get_request_body_buf() const
{ {
return request_body_buf_; return request_body_buf_;

View File

@ -50,14 +50,14 @@ public:
virtual int end_upload_data(); virtual int end_upload_data();
virtual void pause_read(IOCtrlReason reason) {} virtual void pause_read(IOCtrlReason reason) {}
virtual bool resume_read(IOCtrlReason reason) { return true; } virtual int resume_read(IOCtrlReason reason);
virtual void force_resume_read() {} virtual void force_resume_read() {}
virtual bool get_output_buffer_full(); virtual bool get_output_buffer_full();
virtual int on_read(); virtual int on_read();
virtual int on_write(); virtual int on_write();
virtual int on_upstream_write();
int send(); int send();
int init_request_body_buf(); int init_request_body_buf();

View File

@ -96,13 +96,20 @@ int SpdySession::disconnect()
// Delete all client handler associated to Downstream. When deleting // Delete all client handler associated to Downstream. When deleting
// SpdyDownstreamConnection, it calls this object's // SpdyDownstreamConnection, it calls this object's
// remove_downstream_connection(). So first dump them in vector and // remove_downstream_connection(). The multiple
// iterate and delete them. // SpdyDownstreamConnection objects belong to the same ClientHandler
// object. So first dump ClientHandler objects and delete them once
// and for all.
std::vector<SpdyDownstreamConnection*> vec(dconns_.begin(), dconns_.end()); std::vector<SpdyDownstreamConnection*> vec(dconns_.begin(), dconns_.end());
std::set<ClientHandler*> handlers;
for(size_t i = 0; i < vec.size(); ++i) { for(size_t i = 0; i < vec.size(); ++i) {
remove_downstream_connection(vec[i]); handlers.insert(vec[i]->get_client_handler());
delete vec[i]->get_client_handler();
} }
for(std::set<ClientHandler*>::iterator i = handlers.begin(),
eoi = handlers.end(); i != eoi; ++i) {
delete *i;
}
dconns_.clear(); dconns_.clear();
for(std::set<StreamData*>::iterator i = streams_.begin(), for(std::set<StreamData*>::iterator i = streams_.begin(),
eoi = streams_.end(); i != eoi; ++i) { eoi = streams_.end(); i != eoi; ++i) {

View File

@ -177,7 +177,11 @@ void on_ctrl_recv_callback
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
return; return;
} }
if(get_config()->spdy_proxy && scheme && path[0] == '/') { // SpdyDownstreamConnection examines request path to find
// scheme. We construct abs URI for spdy_bridge mode as well as
// spdy_proxy mode.
if((get_config()->spdy_proxy || get_config()->spdy_bridge) &&
scheme && path[0] == '/') {
std::string reqpath = scheme; std::string reqpath = scheme;
reqpath += "://"; reqpath += "://";
reqpath += host; reqpath += host;
@ -485,12 +489,7 @@ void spdy_downstream_writecb(bufferevent *bev, void *ptr)
Downstream *downstream = dconn->get_downstream(); Downstream *downstream = dconn->get_downstream();
SpdyUpstream *upstream; SpdyUpstream *upstream;
upstream = static_cast<SpdyUpstream*>(downstream->get_upstream()); upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
if(upstream->get_flow_control()) { upstream->resume_read(SHRPX_NO_BUFFER, downstream);
if(downstream->get_recv_window_size() >=
upstream->get_initial_window_size()/2) {
upstream->window_update(downstream);
}
}
} }
} // namespace } // namespace
@ -858,8 +857,13 @@ int32_t SpdyUpstream::get_initial_window_size() const
void SpdyUpstream::pause_read(IOCtrlReason reason) void SpdyUpstream::pause_read(IOCtrlReason reason)
{} {}
int SpdyUpstream::resume_read(IOCtrlReason reason) int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream)
{ {
if(get_flow_control()) {
if(downstream->get_recv_window_size() >= get_initial_window_size()/2) {
window_update(downstream);
}
}
return 0; return 0;
} }

View File

@ -59,7 +59,7 @@ public:
int error_reply(Downstream *downstream, int status_code); int error_reply(Downstream *downstream, int status_code);
virtual void pause_read(IOCtrlReason reason); virtual void pause_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason); virtual int resume_read(IOCtrlReason reason, Downstream *downstream);
virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,

View File

@ -53,7 +53,7 @@ public:
virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual int on_downstream_body_complete(Downstream *downstream) = 0;
virtual void pause_read(IOCtrlReason reason) = 0; virtual void pause_read(IOCtrlReason reason) = 0;
virtual int resume_read(IOCtrlReason reason) = 0; virtual int resume_read(IOCtrlReason reason, Downstream *downstream) = 0;
}; };
} // namespace shrpx } // namespace shrpx