diff --git a/examples/shrpx_client_handler.cc b/examples/shrpx_client_handler.cc index b62716ea..88897dad 100644 --- a/examples/shrpx_client_handler.cc +++ b/examples/shrpx_client_handler.cc @@ -66,7 +66,7 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg) bool finish = false; if(events & BEV_EVENT_EOF) { if(ENABLE_LOG) { - LOG(INFO) << "Upstream handshake EOF"; + LOG(INFO) << "Upstream EOF"; } finish = true; } diff --git a/examples/shrpx_downstream.cc b/examples/shrpx_downstream.cc index e210d695..97254da0 100644 --- a/examples/shrpx_downstream.cc +++ b/examples/shrpx_downstream.cc @@ -117,6 +117,10 @@ void Downstream::idle() chunked_response_ = false; response_connection_close_ = false; response_headers_.clear(); + if(response_body_buf_) { + size_t len = evbuffer_get_length(response_body_buf_); + evbuffer_drain(response_body_buf_, len); + } } void Downstream::pause_read(IOCtrlReason reason) @@ -543,4 +547,9 @@ evbuffer* Downstream::get_response_body_buf() return response_body_buf_; } +void Downstream::set_priority(int pri) +{ + priority_ = pri; +} + } // namespace shrpx diff --git a/examples/shrpx_downstream.h b/examples/shrpx_downstream.h index 36004aa1..46479af8 100644 --- a/examples/shrpx_downstream.h +++ b/examples/shrpx_downstream.h @@ -54,6 +54,7 @@ public: int start_connection(); Upstream* get_upstream() const; int32_t get_stream_id() const; + void set_priority(int pri); void pause_read(IOCtrlReason reason); bool resume_read(IOCtrlReason reason); void force_resume_read(); diff --git a/examples/shrpx_downstream_queue.cc b/examples/shrpx_downstream_queue.cc index 6d943eb8..4214ab7c 100644 --- a/examples/shrpx_downstream_queue.cc +++ b/examples/shrpx_downstream_queue.cc @@ -24,6 +24,8 @@ */ #include "shrpx_downstream_queue.h" +#include + #include "shrpx_downstream.h" namespace shrpx { @@ -37,6 +39,10 @@ DownstreamQueue::~DownstreamQueue() i != downstreams_.end(); ++i) { delete (*i).second; } + for(std::set::iterator i = idle_downstreams_.begin(); + i != idle_downstreams_.end(); ++i) { + delete *i; + } } void DownstreamQueue::add(Downstream *downstream) @@ -51,7 +57,11 @@ int DownstreamQueue::start(Downstream *downstream) void DownstreamQueue::remove(Downstream *downstream) { - downstreams_.erase(downstream->get_stream_id()); + if(downstream->get_request_state() == Downstream::IDLE) { + idle_downstreams_.erase(downstream); + } else { + downstreams_.erase(downstream->get_stream_id()); + } } Downstream* DownstreamQueue::find(int32_t stream_id) @@ -64,4 +74,24 @@ Downstream* DownstreamQueue::find(int32_t stream_id) } } +Downstream* DownstreamQueue::reuse(int32_t stream_id) +{ + if(idle_downstreams_.empty()) { + return 0; + } + Downstream* downstream = *idle_downstreams_.begin(); + idle_downstreams_.erase(downstream); + downstream->reuse(stream_id); + add(downstream); + return downstream; +} + +void DownstreamQueue::idle(Downstream *downstream) +{ + assert(downstream->get_request_state() != Downstream::IDLE); + remove(downstream); + downstream->idle(); + idle_downstreams_.insert(downstream); +} + } // namespace shrpx diff --git a/examples/shrpx_downstream_queue.h b/examples/shrpx_downstream_queue.h index b4e100aa..1ee0078e 100644 --- a/examples/shrpx_downstream_queue.h +++ b/examples/shrpx_downstream_queue.h @@ -30,6 +30,7 @@ #include #include +#include namespace shrpx { @@ -43,8 +44,11 @@ public: int start(Downstream *downstream); void remove(Downstream *downstream); Downstream* find(int32_t stream_id); + Downstream* reuse(int32_t stream_id); + void idle(Downstream *downstream); private: std::map downstreams_; + std::set idle_downstreams_; }; } // namespace shrpx diff --git a/examples/shrpx_https_upstream.cc b/examples/shrpx_https_upstream.cc index bbd3b7c6..3b2c73a4 100644 --- a/examples/shrpx_https_upstream.cc +++ b/examples/shrpx_https_upstream.cc @@ -79,8 +79,14 @@ int htp_msg_begin(htparser *htp) Downstream *downstream = upstream->get_top_downstream(); if(downstream) { // Keep-Alived connection + if(ENABLE_LOG) { + LOG(INFO) << "Reusing downstream"; + } downstream->reuse(0); } else { + if(ENABLE_LOG) { + LOG(INFO) << "Creating new downstream"; + } downstream = new Downstream(upstream, 0, 0); upstream->add_downstream(downstream); } diff --git a/examples/shrpx_spdy_upstream.cc b/examples/shrpx_spdy_upstream.cc index 7af407e0..037e43d7 100644 --- a/examples/shrpx_spdy_upstream.cc +++ b/examples/shrpx_spdy_upstream.cc @@ -93,16 +93,15 @@ void on_stream_close_callback LOG(INFO) << "Upstream spdy Stream " << stream_id << " is being closed"; } SpdyUpstream *upstream = reinterpret_cast(user_data); - Downstream *downstream = upstream->get_downstream_queue()->find(stream_id); + Downstream *downstream = upstream->find_downstream(stream_id); if(downstream) { if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { - upstream->get_downstream_queue()->remove(downstream); + upstream->remove_downstream(downstream); delete downstream; } else { downstream->set_request_state(Downstream::STREAM_CLOSED); if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { - upstream->get_downstream_queue()->remove(downstream); - delete downstream; + upstream->remove_or_idle_downstream(downstream); } else { // At this point, downstream read may be paused. To reclaim // file descriptor, enable read here and catch read @@ -126,9 +125,23 @@ void on_ctrl_recv_callback LOG(INFO) << "Upstream spdy received upstream SYN_STREAM stream_id=" << frame->syn_stream.stream_id; } - Downstream *downstream = new Downstream(upstream, - frame->syn_stream.stream_id, - frame->syn_stream.pri); + Downstream *downstream; + downstream = upstream->reuse_downstream(frame->syn_stream.stream_id); + if(downstream) { + if(ENABLE_LOG) { + LOG(INFO) << "Reusing downstream for stream_id=" + << frame->syn_stream.stream_id; + } + downstream->set_priority(frame->syn_stream.pri); + } else { + if(ENABLE_LOG) { + LOG(INFO) << "Creating new downstream for stream_id=" + << frame->syn_stream.stream_id; + } + downstream = new Downstream(upstream, + frame->syn_stream.stream_id, + frame->syn_stream.pri); + } downstream->init_response_body_buf(); char **nv = frame->syn_stream.nv; @@ -161,11 +174,13 @@ void on_ctrl_recv_callback } downstream->set_request_state(Downstream::MSG_COMPLETE); } - upstream->add_downstream(downstream); - if(upstream->start_downstream(downstream) != 0) { - // If downstream connection fails, issue RST_STREAM. - upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - downstream->set_request_state(Downstream::CONNECT_FAIL); + if(downstream->get_counter() == 1) { + upstream->add_downstream(downstream); + if(upstream->start_downstream(downstream) != 0) { + // If downstream connection fails, issue RST_STREAM. + upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + downstream->set_request_state(Downstream::CONNECT_FAIL); + } } break; } @@ -186,7 +201,7 @@ void on_data_chunk_recv_callback(spdylay_session *session, << stream_id; } SpdyUpstream *upstream = reinterpret_cast(user_data); - Downstream *downstream = upstream->get_downstream_queue()->find(stream_id); + Downstream *downstream = upstream->find_downstream(stream_id); if(downstream) { downstream->push_upload_data_chunk(data, len); if(flags & SPDYLAY_DATA_FLAG_FIN) { @@ -285,6 +300,11 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr) Downstream *downstream = reinterpret_cast(ptr); SpdyUpstream *upstream; upstream = static_cast(downstream->get_upstream()); + if(downstream->get_request_state() == Downstream::IDLE) { + upstream->remove_downstream(downstream); + delete downstream; + return; + } // If upstream SPDY stream was closed, we just close downstream, // because there is no consumer now. if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { @@ -320,13 +340,20 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) Downstream *downstream = reinterpret_cast(ptr); SpdyUpstream *upstream; upstream = static_cast(downstream->get_upstream()); + if(downstream->get_request_state() == Downstream::IDLE) { + if(ENABLE_LOG) { + LOG(INFO) << "Delete idle downstream in spdy_downstream_eventcb"; + } + upstream->remove_downstream(downstream); + delete downstream; + return; + } if(events & BEV_EVENT_CONNECTED) { if(ENABLE_LOG) { LOG(INFO) << "Downstream connection established. Downstream " << downstream; } - } - if(events & BEV_EVENT_EOF) { + } else if(events & BEV_EVENT_EOF) { if(ENABLE_LOG) { LOG(INFO) << "Downstream EOF stream_id=" << downstream->get_stream_id(); @@ -479,6 +506,26 @@ void SpdyUpstream::remove_downstream(Downstream *downstream) downstream_queue_.remove(downstream); } +Downstream* SpdyUpstream::find_downstream(int32_t stream_id) +{ + return downstream_queue_.find(stream_id); +} + +Downstream* SpdyUpstream::reuse_downstream(int32_t stream_id) +{ + return downstream_queue_.reuse(stream_id); +} + +void SpdyUpstream::remove_or_idle_downstream(Downstream *downstream) +{ + if(downstream->get_response_connection_close()) { + downstream_queue_.remove(downstream); + delete downstream; + } else { + downstream_queue_.idle(downstream); + } +} + spdylay_session* SpdyUpstream::get_spdy_session() { return session_; @@ -564,9 +611,4 @@ int SpdyUpstream::on_downstream_body_complete(Downstream *downstream) return 0; } -DownstreamQueue* SpdyUpstream::get_downstream_queue() -{ - return &downstream_queue_; -} - } // namespace shrpx diff --git a/examples/shrpx_spdy_upstream.h b/examples/shrpx_spdy_upstream.h index 6a0c2bf2..0d74d043 100644 --- a/examples/shrpx_spdy_upstream.h +++ b/examples/shrpx_spdy_upstream.h @@ -51,8 +51,11 @@ public: void add_downstream(Downstream *downstream); void remove_downstream(Downstream *downstream); int start_downstream(Downstream *downstream); + Downstream* find_downstream(int32_t stream_id); + Downstream* reuse_downstream(int32_t stream_id); + void remove_or_idle_downstream(Downstream *downstream); + spdylay_session* get_spdy_session(); - DownstreamQueue* get_downstream_queue(); int rst_stream(Downstream *downstream, int status_code); int error_reply(Downstream *downstream, int status_code);