diff --git a/examples/Makefile.am b/examples/Makefile.am index 42d97a41..d62e8d69 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -77,6 +77,7 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \ shrpx_downstream.cc shrpx_downstream.h \ shrpx_log.cc shrpx_log.h \ shrpx_http.cc shrpx_http.h \ + shrpx_io_control.cc shrpx_io_control.h \ htparse/htparse.c htparse/htparse.h noinst_PROGRAMS = spdycli diff --git a/examples/shrpx.cc b/examples/shrpx.cc index ba57abbc..3200f73a 100644 --- a/examples/shrpx.cc +++ b/examples/shrpx.cc @@ -285,7 +285,7 @@ int main(int argc, char **argv) mod_config()->upstream_read_timeout.tv_sec = 30; mod_config()->upstream_read_timeout.tv_usec = 0; - mod_config()->upstream_write_timeout.tv_sec = 30; + mod_config()->upstream_write_timeout.tv_sec = 60; mod_config()->upstream_write_timeout.tv_usec = 0; mod_config()->spdy_upstream_read_timeout.tv_sec = 600; diff --git a/examples/shrpx_client_handler.cc b/examples/shrpx_client_handler.cc index 4fceaaed..4139c3a9 100644 --- a/examples/shrpx_client_handler.cc +++ b/examples/shrpx_client_handler.cc @@ -48,12 +48,12 @@ void upstream_readcb(bufferevent *bev, void *arg) namespace { void upstream_writecb(bufferevent *bev, void *arg) { - if(ENABLE_LOG) { - LOG(INFO) << " upstream_writecb"; - } ClientHandler *handler = reinterpret_cast(arg); if(handler->get_should_close_after_write()) { delete handler; + } else { + Upstream *upstream = handler->get_upstream(); + upstream->on_write(); } } } // namespace diff --git a/examples/shrpx_config.h b/examples/shrpx_config.h index 9b955882..70630813 100644 --- a/examples/shrpx_config.h +++ b/examples/shrpx_config.h @@ -25,6 +25,8 @@ #ifndef SHRPX_CONFIG_H #define SHRPX_CONFIG_H +#include "shrpx.h" + #include #include #include diff --git a/examples/shrpx_downstream.cc b/examples/shrpx_downstream.cc index b8755b8a..08ab5fda 100644 --- a/examples/shrpx_downstream.cc +++ b/examples/shrpx_downstream.cc @@ -41,6 +41,7 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) bev_(0), stream_id_(stream_id), priority_(priority), + ioctrl_(0), request_state_(INITIAL), chunked_request_(false), request_connection_close_(false), @@ -56,6 +57,7 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) bev_ = bufferevent_socket_new (evbase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); + ioctrl_.set_bev(bev_); } Downstream::~Downstream() @@ -75,6 +77,16 @@ Downstream::~Downstream() } } +void Downstream::pause_read(IOCtrlReason reason) +{ + ioctrl_.pause_read(reason); +} + +bool Downstream::resume_read(IOCtrlReason reason) +{ + return ioctrl_.resume_read(reason); +} + namespace { void check_transfer_encoding_chunked(bool *chunked, const Headers::value_type &item) @@ -394,6 +406,16 @@ int Downstream::get_response_state() const return response_state_; } +namespace { +void body_buf_cb(evbuffer *body, size_t oldlen, size_t newlen, void *arg) +{ + Downstream *downstream = reinterpret_cast(arg); + if(newlen == 0) { + downstream->resume_read(SHRPX_NO_BUFFER); + } +} +} // namespace + int Downstream::init_response_body_buf() { assert(response_body_buf_ == 0); @@ -401,6 +423,7 @@ int Downstream::init_response_body_buf() if(response_body_buf_ == 0) { DIE(); } + evbuffer_setcb(response_body_buf_, body_buf_cb, this); return 0; } diff --git a/examples/shrpx_downstream.h b/examples/shrpx_downstream.h index d4b824db..ef5687b2 100644 --- a/examples/shrpx_downstream.h +++ b/examples/shrpx_downstream.h @@ -39,6 +39,8 @@ extern "C" { #include "htparse/htparse.h" } +#include "shrpx_io_control.h" + namespace shrpx { class Upstream; @@ -52,6 +54,8 @@ public: int start_connection(); Upstream* get_upstream() const; int32_t get_stream_id() const; + void pause_read(IOCtrlReason reason); + bool resume_read(IOCtrlReason reason); // downstream request API const Headers& get_request_headers() const; void add_request_header(const std::string& name, const std::string& value); @@ -89,7 +93,7 @@ private: bufferevent *bev_; int32_t stream_id_; int priority_; - + IOControl ioctrl_; int request_state_; std::string request_method_; std::string request_path_; diff --git a/examples/shrpx_error.h b/examples/shrpx_error.h index 0c8dbd37..40a68502 100644 --- a/examples/shrpx_error.h +++ b/examples/shrpx_error.h @@ -25,6 +25,8 @@ #ifndef SHRPX_ERROR_H #define SHRPX_ERROR_H +#include "shrpx.h" + namespace shrpx { enum ErrorCode { diff --git a/examples/shrpx_https_upstream.cc b/examples/shrpx_https_upstream.cc index 92e6e542..c33c6a9b 100644 --- a/examples/shrpx_https_upstream.cc +++ b/examples/shrpx_https_upstream.cc @@ -38,9 +38,14 @@ using namespace spdylay; namespace shrpx { +namespace { +const size_t SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024; +} // namespace + HttpsUpstream::HttpsUpstream(ClientHandler *handler) : handler_(handler), - htp_(htparser_new()) + htp_(htparser_new()), + ioctrl_(handler->get_bev()) { if(ENABLE_LOG) { LOG(INFO) << "HttpsUpstream ctor"; @@ -221,7 +226,7 @@ int HttpsUpstream::on_read() evbuffer_drain(input, nread); htpparse_error htperr = htparser_get_error(htp_); if(htperr == htparse_error_user) { - bufferevent_disable(bev, EV_READ); + pause_read(SHRPX_MSG_BLOCK); if(ENABLE_LOG) { LOG(INFO) << " remaining bytes " << evbuffer_get_length(input); } @@ -235,6 +240,15 @@ int HttpsUpstream::on_read() return 0; } +int HttpsUpstream::on_write() +{ + Downstream *downstream = get_top_downstream(); + if(downstream) { + downstream->resume_read(SHRPX_NO_BUFFER); + } + return 0; +} + int HttpsUpstream::on_event() { return 0; @@ -245,11 +259,18 @@ ClientHandler* HttpsUpstream::get_client_handler() const return handler_; } -void HttpsUpstream::resume_read() +void HttpsUpstream::pause_read(IOCtrlReason reason) { - bufferevent_enable(handler_->get_bev(), EV_READ); - // Process remaining data in input buffer here. - on_read(); + ioctrl_.pause_read(reason); +} + +void HttpsUpstream::resume_read(IOCtrlReason reason) +{ + if(ioctrl_.resume_read(reason)) { + // Process remaining data in input buffer here because these bytes + // are not notified by readcb until new data arrive. + on_read(); + } } namespace { @@ -264,7 +285,14 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) assert(downstream == upstream->get_top_downstream()); upstream->pop_downstream(); delete downstream; - upstream->resume_read(); + upstream->resume_read(SHRPX_MSG_BLOCK); + } else { + ClientHandler *handler = upstream->get_client_handler(); + bufferevent *bev = handler->get_bev(); + size_t outputlen = evbuffer_get_length(bufferevent_get_output(bev)); + if(outputlen > SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES) { + downstream->pause_read(SHRPX_NO_BUFFER); + } } } else { if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { @@ -274,7 +302,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) assert(downstream == upstream->get_top_downstream()); upstream->pop_downstream(); delete downstream; - upstream->resume_read(); + upstream->resume_read(SHRPX_MSG_BLOCK); } } } @@ -320,7 +348,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } upstream->pop_downstream(); delete downstream; - upstream->resume_read(); + upstream->resume_read(SHRPX_MSG_BLOCK); } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(ENABLE_LOG) { LOG(INFO) << " error/timeout. " << downstream; @@ -336,7 +364,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } upstream->pop_downstream(); delete downstream; - upstream->resume_read(); + upstream->resume_read(SHRPX_MSG_BLOCK); } } } // namespace @@ -384,12 +412,20 @@ void HttpsUpstream::pop_downstream() Downstream* HttpsUpstream::get_top_downstream() { - return downstream_queue_.front(); + if(downstream_queue_.empty()) { + return 0; + } else { + return downstream_queue_.front(); + } } Downstream* HttpsUpstream::get_last_downstream() { - return downstream_queue_.back(); + if(downstream_queue_.empty()) { + return 0; + } else { + return downstream_queue_.back(); + } } int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) diff --git a/examples/shrpx_https_upstream.h b/examples/shrpx_https_upstream.h index f05a48c9..3fa76f9e 100644 --- a/examples/shrpx_https_upstream.h +++ b/examples/shrpx_https_upstream.h @@ -36,6 +36,7 @@ extern "C" { } #include "shrpx_upstream.h" +#include "shrpx_io_control.h" namespace shrpx { @@ -46,6 +47,7 @@ public: HttpsUpstream(ClientHandler *handler); virtual ~HttpsUpstream(); virtual int on_read(); + virtual int on_write(); virtual int on_event(); //int send(); virtual ClientHandler* get_client_handler() const; @@ -58,7 +60,8 @@ public: Downstream* get_last_downstream(); void error_reply(Downstream *downstream, int status_code); - void resume_read(); + void pause_read(IOCtrlReason reason); + void resume_read(IOCtrlReason reason); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, @@ -69,6 +72,7 @@ private: ClientHandler *handler_; htparser *htp_; std::deque downstream_queue_; + IOControl ioctrl_; }; } // namespace shrpx diff --git a/examples/shrpx_io_control.cc b/examples/shrpx_io_control.cc new file mode 100644 index 00000000..76b957ad --- /dev/null +++ b/examples/shrpx_io_control.cc @@ -0,0 +1,61 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_io_control.h" + +#include + +namespace shrpx { + +IOControl::IOControl(bufferevent *bev) + : bev_(bev), + ctrlv_(SHRPX_REASON_MAX) +{} + +IOControl::~IOControl() +{} + +void IOControl::set_bev(bufferevent *bev) +{ + bev_ = bev; +} + +void IOControl::pause_read(IOCtrlReason reason) +{ + ctrlv_[reason] = 1; + bufferevent_disable(bev_, EV_READ); +} + +bool IOControl::resume_read(IOCtrlReason reason) +{ + ctrlv_[reason] = 0; + if(std::find(ctrlv_.begin(), ctrlv_.end(), 1) == ctrlv_.end()) { + bufferevent_enable(bev_, EV_READ); + return true; + } else { + return false; + } +} + +} // namespace shrpx diff --git a/examples/shrpx_io_control.h b/examples/shrpx_io_control.h new file mode 100644 index 00000000..b73e36ed --- /dev/null +++ b/examples/shrpx_io_control.h @@ -0,0 +1,58 @@ +/* + * Spdylay - SPDY Library + * + * Copyright (c) 2012 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_IO_CONTROL_H +#define SHRPX_IO_CONTROL_H + +#include "shrpx.h" + +#include + +#include +#include + +namespace shrpx { + +enum IOCtrlReason { + SHRPX_NO_BUFFER = 0, + SHRPX_MSG_BLOCK, + SHRPX_REASON_MAX +}; + +class IOControl { +public: + IOControl(bufferevent *bev); + ~IOControl(); + void set_bev(bufferevent *bev); + void pause_read(IOCtrlReason reason); + // Returns true if read operation is enabled after this call + bool resume_read(IOCtrlReason reason); +private: + bufferevent *bev_; + std::vector ctrlv_; +}; + +} // namespace shrpx + +#endif // SHRPX_IO_CONTROL_H diff --git a/examples/shrpx_spdy_upstream.cc b/examples/shrpx_spdy_upstream.cc index f217dad2..e1a09f45 100644 --- a/examples/shrpx_spdy_upstream.cc +++ b/examples/shrpx_spdy_upstream.cc @@ -37,6 +37,10 @@ using namespace spdylay; namespace shrpx { +namespace { +const size_t SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024; +} // namespace + namespace { ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len, int flags, @@ -47,6 +51,11 @@ ssize_t send_callback(spdylay_session *session, ClientHandler *handler = upstream->get_client_handler(); bufferevent *bev = handler->get_bev(); evbuffer *output = bufferevent_get_output(bev); + // Check buffer length and return WOULDBLOCK if it is large enough. + if(evbuffer_get_length(output) > SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES) { + return SPDYLAY_ERR_WOULDBLOCK; + } + rv = evbuffer_add(output, data, len); if(rv == -1) { return SPDYLAY_ERR_CALLBACK_FAILURE; @@ -228,6 +237,12 @@ int SpdyUpstream::on_read() return 0; } +int SpdyUpstream::on_write() +{ + send(); + return 0; +} + int SpdyUpstream::send() { int rv; @@ -306,47 +321,51 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) LOG(INFO) << " EOF stream_id=" << downstream->get_stream_id(); } - if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { - // Server may indicate the end of the request by EOF - if(ENABLE_LOG) { - LOG(INFO) << " Assuming content-length is 0 byte"; - } - upstream->on_downstream_body_complete(downstream); - downstream->set_response_state(Downstream::MSG_COMPLETE); - // downstream wil be deleted in on_stream_close_callback. - } else if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { - // nothing todo - } else { - // error - if(ENABLE_LOG) { - LOG(INFO) << " Treated as error"; - } - upstream->error_reply(downstream, 502); - upstream->send(); + if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { + //If stream was closed already, we don't need to send reply at + // the first place. We can delete downstream. upstream->remove_downstream(downstream); delete downstream; + } else { + // downstream wil be deleted in on_stream_close_callback. + if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { + // Server may indicate the end of the request by EOF + if(ENABLE_LOG) { + LOG(INFO) << " Assuming content-length is 0 byte"; + } + downstream->set_response_state(Downstream::MSG_COMPLETE); + upstream->on_downstream_body_complete(downstream); + } else if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { + // If stream was not closed, then we set MSG_COMPLETE and let + // on_stream_close_callback delete downstream. + upstream->error_reply(downstream, 502); + downstream->set_response_state(Downstream::MSG_COMPLETE); + upstream->send(); + } } } else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(ENABLE_LOG) { LOG(INFO) << " error/timeout. Downstream " << downstream; } - // For Downstream::MSG_COMPLETE case, downstream will be deleted - // in on_stream_close_callback. - if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { - if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { - upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - } else { - int status; - if(events & BEV_EVENT_TIMEOUT) { - status = 504; - } else { - status = 502; - } - upstream->error_reply(downstream, status); - } - upstream->send(); + if(downstream->get_request_state() == Downstream::STREAM_CLOSED) { upstream->remove_downstream(downstream); delete downstream; + } else { + if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { + if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { + upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + } else { + int status; + if(events & BEV_EVENT_TIMEOUT) { + status = 504; + } else { + status = 502; + } + upstream->error_reply(downstream, status); + } + downstream->set_response_state(Downstream::MSG_COMPLETE); + upstream->send(); + } } } } @@ -511,6 +530,12 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream, evbuffer_add(body, data, len); spdylay_session_resume_data(session_, downstream->get_stream_id()); //send(); + + size_t bodylen = evbuffer_get_length(body); + if(bodylen > SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES) { + downstream->pause_read(SHRPX_NO_BUFFER); + } + return 0; } diff --git a/examples/shrpx_spdy_upstream.h b/examples/shrpx_spdy_upstream.h index c9db8bb7..fc878988 100644 --- a/examples/shrpx_spdy_upstream.h +++ b/examples/shrpx_spdy_upstream.h @@ -41,6 +41,7 @@ public: SpdyUpstream(uint16_t version, ClientHandler *handler); virtual ~SpdyUpstream(); virtual int on_read(); + virtual int on_write(); virtual int on_event(); int send(); virtual ClientHandler* get_client_handler() const; diff --git a/examples/shrpx_upstream.h b/examples/shrpx_upstream.h index bb886ada..7c7593ce 100644 --- a/examples/shrpx_upstream.h +++ b/examples/shrpx_upstream.h @@ -38,6 +38,7 @@ class Upstream { public: virtual ~Upstream() {} virtual int on_read() = 0; + virtual int on_write() = 0; virtual int on_event() = 0; virtual bufferevent_data_cb get_downstream_readcb() = 0; virtual bufferevent_data_cb get_downstream_writecb() = 0;