diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 1fb46a0f..0e28269d 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -38,7 +38,7 @@ namespace shrpx { -Downstream::Downstream(Upstream *upstream, int stream_id, int priority) +Downstream::Downstream(Upstream *upstream, int32_t stream_id, int32_t priority) : request_bodylen_(0), response_bodylen_(0), upstream_(upstream), diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 90a4b8bb..4fe9b305 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -49,7 +49,7 @@ class DownstreamConnection; class Downstream { public: - Downstream(Upstream *upstream, int stream_id, int priority); + Downstream(Upstream *upstream, int32_t stream_id, int32_t priority); ~Downstream(); void reset_upstream(Upstream *upstream); Upstream* get_upstream() const; diff --git a/src/shrpx_downstream_queue.cc b/src/shrpx_downstream_queue.cc index dec8b314..bcfa00b9 100644 --- a/src/shrpx_downstream_queue.cc +++ b/src/shrpx_downstream_queue.cc @@ -34,40 +34,53 @@ DownstreamQueue::DownstreamQueue() {} DownstreamQueue::~DownstreamQueue() +{} + +void DownstreamQueue::add_pending(std::unique_ptr downstream) { - for(auto& kv : pending_downstreams_) { - delete kv.second; + auto stream_id = downstream->get_stream_id(); + pending_downstreams_[stream_id] = std::move(downstream); +} + +void DownstreamQueue::add_failure(std::unique_ptr downstream) +{ + auto stream_id = downstream->get_stream_id(); + failure_downstreams_[stream_id] = std::move(downstream); +} + +void DownstreamQueue::add_active(std::unique_ptr downstream) +{ + auto stream_id = downstream->get_stream_id(); + active_downstreams_[stream_id] = std::move(downstream); +} + +std::unique_ptr DownstreamQueue::remove(int32_t stream_id) +{ + auto kv = pending_downstreams_.find(stream_id); + + if(kv != std::end(pending_downstreams_)) { + auto downstream = std::move((*kv).second); + pending_downstreams_.erase(kv); + return downstream; } - for(auto& kv : active_downstreams_) { - delete kv.second; + kv = active_downstreams_.find(stream_id); + + if(kv != std::end(active_downstreams_)) { + auto downstream = std::move((*kv).second); + active_downstreams_.erase(kv); + return downstream; } - for(auto& kv : failure_downstreams_) { - delete kv.second; + kv = failure_downstreams_.find(stream_id); + + if(kv != std::end(failure_downstreams_)) { + auto downstream = std::move((*kv).second); + failure_downstreams_.erase(kv); + return downstream; } -} -void DownstreamQueue::add_pending(Downstream *downstream) -{ - pending_downstreams_[downstream->get_stream_id()] = downstream; -} - -void DownstreamQueue::add_failure(Downstream *downstream) -{ - failure_downstreams_[downstream->get_stream_id()] = downstream; -} - -void DownstreamQueue::add_active(Downstream *downstream) -{ - active_downstreams_[downstream->get_stream_id()] = downstream; -} - -void DownstreamQueue::remove(Downstream *downstream) -{ - pending_downstreams_.erase(downstream->get_stream_id()); - active_downstreams_.erase(downstream->get_stream_id()); - failure_downstreams_.erase(downstream->get_stream_id()); + return nullptr; } Downstream* DownstreamQueue::find(int32_t stream_id) @@ -75,25 +88,25 @@ Downstream* DownstreamQueue::find(int32_t stream_id) auto kv = pending_downstreams_.find(stream_id); if(kv != std::end(pending_downstreams_)) { - return (*kv).second; + return (*kv).second.get(); } kv = active_downstreams_.find(stream_id); if(kv != std::end(active_downstreams_)) { - return (*kv).second; + return (*kv).second.get(); } kv = failure_downstreams_.find(stream_id); if(kv != std::end(failure_downstreams_)) { - return (*kv).second; + return (*kv).second.get(); } return nullptr; } -Downstream* DownstreamQueue::pop_pending() +std::unique_ptr DownstreamQueue::pop_pending() { auto i = std::begin(pending_downstreams_); @@ -101,7 +114,7 @@ Downstream* DownstreamQueue::pop_pending() return nullptr; } - auto downstream = (*i).second; + auto downstream = std::move((*i).second); pending_downstreams_.erase(i); diff --git a/src/shrpx_downstream_queue.h b/src/shrpx_downstream_queue.h index 04245b54..60cf8b83 100644 --- a/src/shrpx_downstream_queue.h +++ b/src/shrpx_downstream_queue.h @@ -30,6 +30,7 @@ #include #include +#include namespace shrpx { @@ -39,12 +40,13 @@ class DownstreamQueue { public: DownstreamQueue(); ~DownstreamQueue(); - void add_pending(Downstream *downstream); - void add_failure(Downstream *downstream); - void add_active(Downstream *downstream); + void add_pending(std::unique_ptr downstream); + void add_failure(std::unique_ptr downstream); + void add_active(std::unique_ptr downstream); // Removes |downstream| from either pending_downstreams_, - // active_downstreams_ or failure_downstreams_. - void remove(Downstream *downstream); + // active_downstreams_ or failure_downstreams_ and returns it + // wrapped in std::unique_ptr. + std::unique_ptr remove(int32_t stream_id); // Finds Downstream object denoted by |stream_id| either in // pending_downstreams_, active_downstreams_ or // failure_downstreams_. @@ -55,14 +57,14 @@ public: bool pending_empty() const; // Pops first Downstream object in pending_downstreams_ and returns // it. - Downstream* pop_pending(); + std::unique_ptr pop_pending(); private: // Downstream objects, not processed yet - std::map pending_downstreams_; + std::map> pending_downstreams_; // Downstream objects in use, consuming downstream concurrency limit - std::map active_downstreams_; + std::map> active_downstreams_; // Downstream objects, failed to connect to downstream server - std::map failure_downstreams_; + std::map> failure_downstreams_; }; } // namespace shrpx diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 6b788fd7..22322c6a 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -73,8 +73,7 @@ int on_stream_close_callback if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { upstream->remove_downstream(downstream); - - delete downstream; + // downstream was deleted return 0; } @@ -94,8 +93,7 @@ int on_stream_close_callback } upstream->remove_downstream(downstream); - - delete downstream; + // downstream was deleted return 0; } @@ -105,7 +103,8 @@ int on_stream_close_callback // If shrpx_downstream::push_request_headers() failed, the // error is handled here. upstream->remove_downstream(downstream); - delete downstream; + // downstream was deleted + // How to test this case? Request sufficient large download // and make client send RST_STREAM after it gets first DATA // frame chunk. @@ -117,9 +116,8 @@ int on_stream_close_callback int Http2Upstream::upgrade_upstream(HttpsUpstream *http) { int rv; - auto downstream = http->get_downstream(); - auto http2_settings = downstream->get_http2_settings(); + auto http2_settings = http->get_downstream()->get_http2_settings(); util::to_base64(http2_settings); auto settings_payload = base64::decode(std::begin(http2_settings), @@ -136,16 +134,17 @@ int Http2Upstream::upgrade_upstream(HttpsUpstream *http) return -1; } pre_upstream_.reset(http); - http->pop_downstream(); + auto downstream = http->pop_downstream(); downstream->reset_upstream(this); downstream->set_stream_id(1); - downstream_queue_.add_active(downstream); downstream->init_upstream_timer(); downstream->reset_upstream_rtimer(); downstream->init_response_body_buf(); downstream->set_stream_id(1); downstream->set_priority(0); + downstream_queue_.add_active(std::move(downstream)); + if(LOG_ENABLED(INFO)) { ULOG(INFO, this) << "Connection upgraded to HTTP/2"; } @@ -272,11 +271,9 @@ int on_begin_headers_callback(nghttp2_session *session, } // TODO Use priority 0 for now - auto downstream = new Downstream(upstream, - frame->hd.stream_id, - 0); + auto downstream = util::make_unique + (upstream, frame->hd.stream_id, 0); - upstream->add_pending_downstream(downstream); downstream->init_upstream_timer(); downstream->reset_upstream_rtimer(); downstream->init_response_body_buf(); @@ -286,6 +283,8 @@ int on_begin_headers_callback(nghttp2_session *session, downstream->set_request_major(2); downstream->set_request_minor(0); + upstream->add_pending_downstream(std::move(downstream)); + return 0; } } // namespace @@ -392,40 +391,41 @@ void Http2Upstream::maintain_downstream_concurrency() break; } - initiate_downstream(downstream); + initiate_downstream(std::move(downstream)); } } -void Http2Upstream::initiate_downstream(Downstream *downstream) +void Http2Upstream::initiate_downstream(std::unique_ptr downstream) { int rv; auto dconn = handler_->get_downstream_connection(); - rv = dconn->attach_downstream(downstream); + rv = dconn->attach_downstream(downstream.get()); if(rv != 0) { - downstream_queue_.add_failure(downstream); - // downstream connection fails, send error page - if(error_reply(downstream, 503) != 0) { - rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); + if(error_reply(downstream.get(), 503) != 0) { + rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR); } downstream->set_request_state(Downstream::CONNECT_FAIL); + downstream_queue_.add_failure(std::move(downstream)); + return; } rv = downstream->push_request_headers(); if(rv != 0) { - downstream_queue_.add_failure(downstream); - if(error_reply(downstream, 503) != 0) { - rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); + if(error_reply(downstream.get(), 503) != 0) { + rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR); } + downstream_queue_.add_failure(std::move(downstream)); + return; } - downstream_queue_.add_active(downstream); + downstream_queue_.add_active(std::move(downstream)); return; } @@ -837,7 +837,8 @@ void downstream_readcb(bufferevent *bev, void *ptr) // because there is no consumer now. Downstream connection is also // closed in this case. upstream->remove_downstream(downstream); - delete downstream; + // downstream was deleted + return; } @@ -925,7 +926,7 @@ void downstream_eventcb(bufferevent *bev, short events, void *ptr) // If stream was closed already, we don't need to send reply at // the first place. We can delete downstream. upstream->remove_downstream(downstream); - delete downstream; + // downstream was deleted return; } @@ -981,7 +982,7 @@ void downstream_eventcb(bufferevent *bev, short events, void *ptr) if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { upstream->remove_downstream(downstream); - delete downstream; + // downstream was deleted return; } @@ -1169,14 +1170,15 @@ bufferevent_event_cb Http2Upstream::get_downstream_eventcb() return downstream_eventcb; } -void Http2Upstream::add_pending_downstream(Downstream *downstream) +void Http2Upstream::add_pending_downstream +(std::unique_ptr downstream) { - downstream_queue_.add_pending(downstream); + downstream_queue_.add_pending(std::move(downstream)); } void Http2Upstream::remove_downstream(Downstream *downstream) { - downstream_queue_.remove(downstream); + downstream_queue_.remove(downstream->get_stream_id()); maintain_downstream_concurrency(); } diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index f9130e1d..d720e683 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -54,7 +54,7 @@ public: virtual bufferevent_data_cb get_downstream_readcb(); virtual bufferevent_data_cb get_downstream_writecb(); virtual bufferevent_event_cb get_downstream_eventcb(); - void add_pending_downstream(Downstream *downstream); + void add_pending_downstream(std::unique_ptr downstream); void remove_downstream(Downstream *downstream); Downstream* find_downstream(int32_t stream_id); @@ -83,7 +83,7 @@ public: void log_response_headers(Downstream *downstream, const std::vector& nva) const; void maintain_downstream_concurrency(); - void initiate_downstream(Downstream *downstream); + void initiate_downstream(std::unique_ptr downstream); private: DownstreamQueue downstream_queue_; std::unique_ptr pre_upstream_; diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 68651ecb..dc0a2a15 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -50,7 +50,6 @@ const size_t OUTBUF_MAX_THRES = 16*1024; HttpsUpstream::HttpsUpstream(ClientHandler *handler) : handler_(handler), current_header_length_(0), - downstream_(nullptr), ioctrl_(handler->get_bev()) { http_parser_init(&htp_, HTTP_REQUEST); @@ -58,9 +57,7 @@ HttpsUpstream::HttpsUpstream(ClientHandler *handler) } HttpsUpstream::~HttpsUpstream() -{ - delete downstream_; -} +{} void HttpsUpstream::reset_current_header_length() { @@ -76,8 +73,7 @@ int htp_msg_begin(http_parser *htp) } upstream->reset_current_header_length(); // TODO specify 0 as priority for now - auto downstream = new Downstream(upstream, 0, 0); - upstream->attach_downstream(downstream); + upstream->attach_downstream(util::make_unique(upstream, 0, 0)); return 0; } } // namespace @@ -723,28 +719,25 @@ bufferevent_event_cb HttpsUpstream::get_downstream_eventcb() return https_downstream_eventcb; } -void HttpsUpstream::attach_downstream(Downstream *downstream) +void HttpsUpstream::attach_downstream(std::unique_ptr downstream) { assert(!downstream_); - downstream_ = downstream; + downstream_ = std::move(downstream); } void HttpsUpstream::delete_downstream() { - delete downstream_; - downstream_ = 0; + downstream_.reset(); } Downstream* HttpsUpstream::get_downstream() const { - return downstream_; + return downstream_.get(); } -Downstream* HttpsUpstream::pop_downstream() +std::unique_ptr HttpsUpstream::pop_downstream() { - auto downstream = downstream_; - downstream_ = nullptr; - return downstream; + return std::unique_ptr(downstream_.release()); } int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index 53013831..b2bb3309 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -29,6 +29,8 @@ #include +#include + #include "http-parser/http_parser.h" #include "shrpx_upstream.h" @@ -50,10 +52,10 @@ public: virtual bufferevent_data_cb get_downstream_readcb(); virtual bufferevent_data_cb get_downstream_writecb(); virtual bufferevent_event_cb get_downstream_eventcb(); - void attach_downstream(Downstream *downstream); + void attach_downstream(std::unique_ptr downstream); void delete_downstream(); Downstream* get_downstream() const; - Downstream* pop_downstream(); + std::unique_ptr pop_downstream(); int error_reply(unsigned int status_code); virtual void pause_read(IOCtrlReason reason); @@ -70,7 +72,7 @@ private: ClientHandler *handler_; http_parser htp_; size_t current_header_length_; - Downstream *downstream_; + std::unique_ptr downstream_; IOControl ioctrl_; }; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index f5e35ef3..2f7cad7d 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -109,7 +109,8 @@ void on_stream_close_callback if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { upstream->remove_downstream(downstream); - delete downstream; + // downstrea was deleted + return; } @@ -125,7 +126,8 @@ void on_stream_close_callback } } upstream->remove_downstream(downstream); - delete downstream; + // downstrea was deleted + return; } @@ -134,7 +136,8 @@ void on_stream_close_callback // If shrpx_downstream::push_request_headers() failed, the // error is handled here. upstream->remove_downstream(downstream); - delete downstream; + // downstrea was deleted + // How to test this case? Request sufficient large download // and make client send RST_STREAM after it gets first DATA // frame chunk. @@ -153,10 +156,10 @@ void on_ctrl_recv_callback ULOG(INFO, upstream) << "Received upstream SYN_STREAM stream_id=" << frame->syn_stream.stream_id; } - auto downstream = new Downstream(upstream, - frame->syn_stream.stream_id, - frame->syn_stream.pri); - upstream->add_downstream(downstream); + + auto downstream = upstream->add_pending_downstream + (frame->syn_stream.stream_id, frame->syn_stream.pri); + downstream->init_upstream_timer(); downstream->reset_upstream_rtimer(); downstream->init_response_body_buf(); @@ -256,31 +259,33 @@ void SpdyUpstream::maintain_downstream_concurrency() break; } - initiate_downstream(downstream); + initiate_downstream(std::move(downstream)); } } -void SpdyUpstream::initiate_downstream(Downstream *downstream) +void SpdyUpstream::initiate_downstream(std::unique_ptr downstream) { auto dconn = handler_->get_downstream_connection(); - int rv = dconn->attach_downstream(downstream); + int rv = dconn->attach_downstream(downstream.get()); if(rv != 0) { - downstream_queue_.add_failure(downstream); - // If downstream connection fails, issue RST_STREAM. - rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR); downstream->set_request_state(Downstream::CONNECT_FAIL); + + downstream_queue_.add_failure(std::move(downstream)); + return; } rv = downstream->push_request_headers(); if(rv != 0) { - downstream_queue_.add_failure(downstream); + rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR); + + downstream_queue_.add_failure(std::move(downstream)); - rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); return; } - downstream_queue_.add_active(downstream); + downstream_queue_.add_active(std::move(downstream)); } namespace { @@ -566,7 +571,8 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr) // because there is no consumer now. Downstream connection is also // closed in this case. upstream->remove_downstream(downstream); - delete downstream; + // downstrea was deleted + return; } @@ -654,7 +660,8 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) // If stream was closed already, we don't need to send reply at // the first place. We can delete downstream. upstream->remove_downstream(downstream); - delete downstream; + // downstrea was deleted + return; } @@ -709,7 +716,8 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) } if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { upstream->remove_downstream(downstream); - delete downstream; + // downstrea was deleted + return; } @@ -900,14 +908,20 @@ bufferevent_event_cb SpdyUpstream::get_downstream_eventcb() return spdy_downstream_eventcb; } -void SpdyUpstream::add_downstream(Downstream *downstream) +Downstream* SpdyUpstream::add_pending_downstream +(int32_t stream_id, int32_t priority) { - downstream_queue_.add_pending(downstream); + auto downstream = util::make_unique(this, stream_id, priority); + auto res = downstream.get(); + + downstream_queue_.add_pending(std::move(downstream)); + + return res; } void SpdyUpstream::remove_downstream(Downstream *downstream) { - downstream_queue_.remove(downstream); + downstream_queue_.remove(downstream->get_stream_id()); maintain_downstream_concurrency(); } diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index f42a9839..04d23297 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -27,6 +27,8 @@ #include "shrpx.h" +#include + #include #include "shrpx_upstream.h" @@ -52,7 +54,7 @@ public: virtual bufferevent_data_cb get_downstream_readcb(); virtual bufferevent_data_cb get_downstream_writecb(); virtual bufferevent_event_cb get_downstream_eventcb(); - void add_downstream(Downstream *downstream); + Downstream* add_pending_downstream(int32_t stream_id, int32_t priority); void remove_downstream(Downstream *downstream); Downstream* find_downstream(int32_t stream_id); @@ -75,7 +77,7 @@ public: int handle_ign_data_chunk(size_t len); void maintain_downstream_concurrency(); - void initiate_downstream(Downstream *downstream); + void initiate_downstream(std::unique_ptr downstream); nghttp2::util::EvbufferBuffer sendbuf; private: