Avoid too large buffering in upstream output.

This commit is contained in:
Tatsuhiro Tsujikawa 2012-06-05 03:11:43 +09:00
parent fad7f51f8d
commit c04c09ff3e
14 changed files with 268 additions and 50 deletions

View File

@ -77,6 +77,7 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \
shrpx_downstream.cc shrpx_downstream.h \
shrpx_log.cc shrpx_log.h \
shrpx_http.cc shrpx_http.h \
shrpx_io_control.cc shrpx_io_control.h \
htparse/htparse.c htparse/htparse.h
noinst_PROGRAMS = spdycli

View File

@ -285,7 +285,7 @@ int main(int argc, char **argv)
mod_config()->upstream_read_timeout.tv_sec = 30;
mod_config()->upstream_read_timeout.tv_usec = 0;
mod_config()->upstream_write_timeout.tv_sec = 30;
mod_config()->upstream_write_timeout.tv_sec = 60;
mod_config()->upstream_write_timeout.tv_usec = 0;
mod_config()->spdy_upstream_read_timeout.tv_sec = 600;

View File

@ -48,12 +48,12 @@ void upstream_readcb(bufferevent *bev, void *arg)
namespace {
void upstream_writecb(bufferevent *bev, void *arg)
{
if(ENABLE_LOG) {
LOG(INFO) << "<upstream> upstream_writecb";
}
ClientHandler *handler = reinterpret_cast<ClientHandler*>(arg);
if(handler->get_should_close_after_write()) {
delete handler;
} else {
Upstream *upstream = handler->get_upstream();
upstream->on_write();
}
}
} // namespace

View File

@ -25,6 +25,8 @@
#ifndef SHRPX_CONFIG_H
#define SHRPX_CONFIG_H
#include "shrpx.h"
#include <stdint.h>
#include <sys/types.h>
#include <sys/socket.h>

View File

@ -41,6 +41,7 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
bev_(0),
stream_id_(stream_id),
priority_(priority),
ioctrl_(0),
request_state_(INITIAL),
chunked_request_(false),
request_connection_close_(false),
@ -56,6 +57,7 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
bev_ = bufferevent_socket_new
(evbase, -1,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
ioctrl_.set_bev(bev_);
}
Downstream::~Downstream()
@ -75,6 +77,16 @@ Downstream::~Downstream()
}
}
void Downstream::pause_read(IOCtrlReason reason)
{
ioctrl_.pause_read(reason);
}
bool Downstream::resume_read(IOCtrlReason reason)
{
return ioctrl_.resume_read(reason);
}
namespace {
void check_transfer_encoding_chunked(bool *chunked,
const Headers::value_type &item)
@ -394,6 +406,16 @@ int Downstream::get_response_state() const
return response_state_;
}
namespace {
void body_buf_cb(evbuffer *body, size_t oldlen, size_t newlen, void *arg)
{
Downstream *downstream = reinterpret_cast<Downstream*>(arg);
if(newlen == 0) {
downstream->resume_read(SHRPX_NO_BUFFER);
}
}
} // namespace
int Downstream::init_response_body_buf()
{
assert(response_body_buf_ == 0);
@ -401,6 +423,7 @@ int Downstream::init_response_body_buf()
if(response_body_buf_ == 0) {
DIE();
}
evbuffer_setcb(response_body_buf_, body_buf_cb, this);
return 0;
}

View File

@ -39,6 +39,8 @@ extern "C" {
#include "htparse/htparse.h"
}
#include "shrpx_io_control.h"
namespace shrpx {
class Upstream;
@ -52,6 +54,8 @@ public:
int start_connection();
Upstream* get_upstream() const;
int32_t get_stream_id() const;
void pause_read(IOCtrlReason reason);
bool resume_read(IOCtrlReason reason);
// downstream request API
const Headers& get_request_headers() const;
void add_request_header(const std::string& name, const std::string& value);
@ -89,7 +93,7 @@ private:
bufferevent *bev_;
int32_t stream_id_;
int priority_;
IOControl ioctrl_;
int request_state_;
std::string request_method_;
std::string request_path_;

View File

@ -25,6 +25,8 @@
#ifndef SHRPX_ERROR_H
#define SHRPX_ERROR_H
#include "shrpx.h"
namespace shrpx {
enum ErrorCode {

View File

@ -38,9 +38,14 @@ using namespace spdylay;
namespace shrpx {
namespace {
const size_t SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024;
} // namespace
HttpsUpstream::HttpsUpstream(ClientHandler *handler)
: handler_(handler),
htp_(htparser_new())
htp_(htparser_new()),
ioctrl_(handler->get_bev())
{
if(ENABLE_LOG) {
LOG(INFO) << "HttpsUpstream ctor";
@ -221,7 +226,7 @@ int HttpsUpstream::on_read()
evbuffer_drain(input, nread);
htpparse_error htperr = htparser_get_error(htp_);
if(htperr == htparse_error_user) {
bufferevent_disable(bev, EV_READ);
pause_read(SHRPX_MSG_BLOCK);
if(ENABLE_LOG) {
LOG(INFO) << "<upstream> remaining bytes " << evbuffer_get_length(input);
}
@ -235,6 +240,15 @@ int HttpsUpstream::on_read()
return 0;
}
int HttpsUpstream::on_write()
{
Downstream *downstream = get_top_downstream();
if(downstream) {
downstream->resume_read(SHRPX_NO_BUFFER);
}
return 0;
}
int HttpsUpstream::on_event()
{
return 0;
@ -245,11 +259,18 @@ ClientHandler* HttpsUpstream::get_client_handler() const
return handler_;
}
void HttpsUpstream::resume_read()
void HttpsUpstream::pause_read(IOCtrlReason reason)
{
bufferevent_enable(handler_->get_bev(), EV_READ);
// Process remaining data in input buffer here.
ioctrl_.pause_read(reason);
}
void HttpsUpstream::resume_read(IOCtrlReason reason)
{
if(ioctrl_.resume_read(reason)) {
// Process remaining data in input buffer here because these bytes
// are not notified by readcb until new data arrive.
on_read();
}
}
namespace {
@ -264,7 +285,14 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
assert(downstream == upstream->get_top_downstream());
upstream->pop_downstream();
delete downstream;
upstream->resume_read();
upstream->resume_read(SHRPX_MSG_BLOCK);
} else {
ClientHandler *handler = upstream->get_client_handler();
bufferevent *bev = handler->get_bev();
size_t outputlen = evbuffer_get_length(bufferevent_get_output(bev));
if(outputlen > SHRPX_HTTPS_UPSTREAM_OUTPUT_UPPER_THRES) {
downstream->pause_read(SHRPX_NO_BUFFER);
}
}
} else {
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
@ -274,7 +302,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
assert(downstream == upstream->get_top_downstream());
upstream->pop_downstream();
delete downstream;
upstream->resume_read();
upstream->resume_read(SHRPX_MSG_BLOCK);
}
}
}
@ -320,7 +348,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
upstream->pop_downstream();
delete downstream;
upstream->resume_read();
upstream->resume_read(SHRPX_MSG_BLOCK);
} else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> error/timeout. " << downstream;
@ -336,7 +364,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
upstream->pop_downstream();
delete downstream;
upstream->resume_read();
upstream->resume_read(SHRPX_MSG_BLOCK);
}
}
} // namespace
@ -384,12 +412,20 @@ void HttpsUpstream::pop_downstream()
Downstream* HttpsUpstream::get_top_downstream()
{
if(downstream_queue_.empty()) {
return 0;
} else {
return downstream_queue_.front();
}
}
Downstream* HttpsUpstream::get_last_downstream()
{
if(downstream_queue_.empty()) {
return 0;
} else {
return downstream_queue_.back();
}
}
int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)

View File

@ -36,6 +36,7 @@ extern "C" {
}
#include "shrpx_upstream.h"
#include "shrpx_io_control.h"
namespace shrpx {
@ -46,6 +47,7 @@ public:
HttpsUpstream(ClientHandler *handler);
virtual ~HttpsUpstream();
virtual int on_read();
virtual int on_write();
virtual int on_event();
//int send();
virtual ClientHandler* get_client_handler() const;
@ -58,7 +60,8 @@ public:
Downstream* get_last_downstream();
void error_reply(Downstream *downstream, int status_code);
void resume_read();
void pause_read(IOCtrlReason reason);
void resume_read(IOCtrlReason reason);
virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream,
@ -69,6 +72,7 @@ private:
ClientHandler *handler_;
htparser *htp_;
std::deque<Downstream*> downstream_queue_;
IOControl ioctrl_;
};
} // namespace shrpx

View File

@ -0,0 +1,61 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_io_control.h"
#include <algorithm>
namespace shrpx {
IOControl::IOControl(bufferevent *bev)
: bev_(bev),
ctrlv_(SHRPX_REASON_MAX)
{}
IOControl::~IOControl()
{}
void IOControl::set_bev(bufferevent *bev)
{
bev_ = bev;
}
void IOControl::pause_read(IOCtrlReason reason)
{
ctrlv_[reason] = 1;
bufferevent_disable(bev_, EV_READ);
}
bool IOControl::resume_read(IOCtrlReason reason)
{
ctrlv_[reason] = 0;
if(std::find(ctrlv_.begin(), ctrlv_.end(), 1) == ctrlv_.end()) {
bufferevent_enable(bev_, EV_READ);
return true;
} else {
return false;
}
}
} // namespace shrpx

View File

@ -0,0 +1,58 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_IO_CONTROL_H
#define SHRPX_IO_CONTROL_H
#include "shrpx.h"
#include <vector>
#include <event.h>
#include <event2/bufferevent.h>
namespace shrpx {
enum IOCtrlReason {
SHRPX_NO_BUFFER = 0,
SHRPX_MSG_BLOCK,
SHRPX_REASON_MAX
};
class IOControl {
public:
IOControl(bufferevent *bev);
~IOControl();
void set_bev(bufferevent *bev);
void pause_read(IOCtrlReason reason);
// Returns true if read operation is enabled after this call
bool resume_read(IOCtrlReason reason);
private:
bufferevent *bev_;
std::vector<int> ctrlv_;
};
} // namespace shrpx
#endif // SHRPX_IO_CONTROL_H

View File

@ -37,6 +37,10 @@ using namespace spdylay;
namespace shrpx {
namespace {
const size_t SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES = 512*1024;
} // namespace
namespace {
ssize_t send_callback(spdylay_session *session,
const uint8_t *data, size_t len, int flags,
@ -47,6 +51,11 @@ ssize_t send_callback(spdylay_session *session,
ClientHandler *handler = upstream->get_client_handler();
bufferevent *bev = handler->get_bev();
evbuffer *output = bufferevent_get_output(bev);
// Check buffer length and return WOULDBLOCK if it is large enough.
if(evbuffer_get_length(output) > SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES) {
return SPDYLAY_ERR_WOULDBLOCK;
}
rv = evbuffer_add(output, data, len);
if(rv == -1) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
@ -228,6 +237,12 @@ int SpdyUpstream::on_read()
return 0;
}
int SpdyUpstream::on_write()
{
send();
return 0;
}
int SpdyUpstream::send()
{
int rv;
@ -306,32 +321,36 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
LOG(INFO) << "<downstream> EOF stream_id="
<< downstream->get_stream_id();
}
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
//If stream was closed already, we don't need to send reply at
// the first place. We can delete downstream.
upstream->remove_downstream(downstream);
delete downstream;
} else {
// downstream wil be deleted in on_stream_close_callback.
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// Server may indicate the end of the request by EOF
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> Assuming content-length is 0 byte";
}
upstream->on_downstream_body_complete(downstream);
downstream->set_response_state(Downstream::MSG_COMPLETE);
// downstream wil be deleted in on_stream_close_callback.
} else if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
// nothing todo
} else {
// error
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> Treated as error";
}
upstream->on_downstream_body_complete(downstream);
} else if(downstream->get_response_state() != Downstream::MSG_COMPLETE) {
// If stream was not closed, then we set MSG_COMPLETE and let
// on_stream_close_callback delete downstream.
upstream->error_reply(downstream, 502);
downstream->set_response_state(Downstream::MSG_COMPLETE);
upstream->send();
upstream->remove_downstream(downstream);
delete downstream;
}
}
} else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> error/timeout. Downstream " << downstream;
}
// For Downstream::MSG_COMPLETE case, downstream will be deleted
// in on_stream_close_callback.
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
upstream->remove_downstream(downstream);
delete downstream;
} else {
if(downstream->get_response_state() != Downstream::MSG_COMPLETE) {
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
@ -344,9 +363,9 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
upstream->error_reply(downstream, status);
}
downstream->set_response_state(Downstream::MSG_COMPLETE);
upstream->send();
upstream->remove_downstream(downstream);
delete downstream;
}
}
}
}
@ -511,6 +530,12 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream,
evbuffer_add(body, data, len);
spdylay_session_resume_data(session_, downstream->get_stream_id());
//send();
size_t bodylen = evbuffer_get_length(body);
if(bodylen > SHRPX_SPDY_UPSTREAM_OUTPUT_UPPER_THRES) {
downstream->pause_read(SHRPX_NO_BUFFER);
}
return 0;
}

View File

@ -41,6 +41,7 @@ public:
SpdyUpstream(uint16_t version, ClientHandler *handler);
virtual ~SpdyUpstream();
virtual int on_read();
virtual int on_write();
virtual int on_event();
int send();
virtual ClientHandler* get_client_handler() const;

View File

@ -38,6 +38,7 @@ class Upstream {
public:
virtual ~Upstream() {}
virtual int on_read() = 0;
virtual int on_write() = 0;
virtual int on_event() = 0;
virtual bufferevent_data_cb get_downstream_readcb() = 0;
virtual bufferevent_data_cb get_downstream_writecb() = 0;