SPDY: Use keep-alive connection to downstream server

This commit is contained in:
Tatsuhiro Tsujikawa 2012-06-08 00:36:19 +09:00
parent 695dd50612
commit 17025a96d9
8 changed files with 118 additions and 23 deletions

View File

@ -66,7 +66,7 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg)
bool finish = false;
if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "Upstream handshake EOF";
LOG(INFO) << "Upstream EOF";
}
finish = true;
}

View File

@ -117,6 +117,10 @@ void Downstream::idle()
chunked_response_ = false;
response_connection_close_ = false;
response_headers_.clear();
if(response_body_buf_) {
size_t len = evbuffer_get_length(response_body_buf_);
evbuffer_drain(response_body_buf_, len);
}
}
void Downstream::pause_read(IOCtrlReason reason)
@ -543,4 +547,9 @@ evbuffer* Downstream::get_response_body_buf()
return response_body_buf_;
}
void Downstream::set_priority(int pri)
{
priority_ = pri;
}
} // namespace shrpx

View File

@ -54,6 +54,7 @@ public:
int start_connection();
Upstream* get_upstream() const;
int32_t get_stream_id() const;
void set_priority(int pri);
void pause_read(IOCtrlReason reason);
bool resume_read(IOCtrlReason reason);
void force_resume_read();

View File

@ -24,6 +24,8 @@
*/
#include "shrpx_downstream_queue.h"
#include <cassert>
#include "shrpx_downstream.h"
namespace shrpx {
@ -37,6 +39,10 @@ DownstreamQueue::~DownstreamQueue()
i != downstreams_.end(); ++i) {
delete (*i).second;
}
for(std::set<Downstream*>::iterator i = idle_downstreams_.begin();
i != idle_downstreams_.end(); ++i) {
delete *i;
}
}
void DownstreamQueue::add(Downstream *downstream)
@ -51,7 +57,11 @@ int DownstreamQueue::start(Downstream *downstream)
void DownstreamQueue::remove(Downstream *downstream)
{
downstreams_.erase(downstream->get_stream_id());
if(downstream->get_request_state() == Downstream::IDLE) {
idle_downstreams_.erase(downstream);
} else {
downstreams_.erase(downstream->get_stream_id());
}
}
Downstream* DownstreamQueue::find(int32_t stream_id)
@ -64,4 +74,24 @@ Downstream* DownstreamQueue::find(int32_t stream_id)
}
}
Downstream* DownstreamQueue::reuse(int32_t stream_id)
{
if(idle_downstreams_.empty()) {
return 0;
}
Downstream* downstream = *idle_downstreams_.begin();
idle_downstreams_.erase(downstream);
downstream->reuse(stream_id);
add(downstream);
return downstream;
}
void DownstreamQueue::idle(Downstream *downstream)
{
assert(downstream->get_request_state() != Downstream::IDLE);
remove(downstream);
downstream->idle();
idle_downstreams_.insert(downstream);
}
} // namespace shrpx

View File

@ -30,6 +30,7 @@
#include <stdint.h>
#include <map>
#include <set>
namespace shrpx {
@ -43,8 +44,11 @@ public:
int start(Downstream *downstream);
void remove(Downstream *downstream);
Downstream* find(int32_t stream_id);
Downstream* reuse(int32_t stream_id);
void idle(Downstream *downstream);
private:
std::map<int32_t, Downstream*> downstreams_;
std::set<Downstream*> idle_downstreams_;
};
} // namespace shrpx

View File

@ -79,8 +79,14 @@ int htp_msg_begin(htparser *htp)
Downstream *downstream = upstream->get_top_downstream();
if(downstream) {
// Keep-Alived connection
if(ENABLE_LOG) {
LOG(INFO) << "Reusing downstream";
}
downstream->reuse(0);
} else {
if(ENABLE_LOG) {
LOG(INFO) << "Creating new downstream";
}
downstream = new Downstream(upstream, 0, 0);
upstream->add_downstream(downstream);
}

View File

@ -93,16 +93,15 @@ void on_stream_close_callback
LOG(INFO) << "Upstream spdy Stream " << stream_id << " is being closed";
}
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
Downstream *downstream = upstream->get_downstream_queue()->find(stream_id);
Downstream *downstream = upstream->find_downstream(stream_id);
if(downstream) {
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
upstream->get_downstream_queue()->remove(downstream);
upstream->remove_downstream(downstream);
delete downstream;
} else {
downstream->set_request_state(Downstream::STREAM_CLOSED);
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
upstream->get_downstream_queue()->remove(downstream);
delete downstream;
upstream->remove_or_idle_downstream(downstream);
} else {
// At this point, downstream read may be paused. To reclaim
// file descriptor, enable read here and catch read
@ -126,9 +125,23 @@ void on_ctrl_recv_callback
LOG(INFO) << "Upstream spdy received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
}
Downstream *downstream = new Downstream(upstream,
frame->syn_stream.stream_id,
frame->syn_stream.pri);
Downstream *downstream;
downstream = upstream->reuse_downstream(frame->syn_stream.stream_id);
if(downstream) {
if(ENABLE_LOG) {
LOG(INFO) << "Reusing downstream for stream_id="
<< frame->syn_stream.stream_id;
}
downstream->set_priority(frame->syn_stream.pri);
} else {
if(ENABLE_LOG) {
LOG(INFO) << "Creating new downstream for stream_id="
<< frame->syn_stream.stream_id;
}
downstream = new Downstream(upstream,
frame->syn_stream.stream_id,
frame->syn_stream.pri);
}
downstream->init_response_body_buf();
char **nv = frame->syn_stream.nv;
@ -161,11 +174,13 @@ void on_ctrl_recv_callback
}
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
upstream->add_downstream(downstream);
if(upstream->start_downstream(downstream) != 0) {
// If downstream connection fails, issue RST_STREAM.
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
if(downstream->get_counter() == 1) {
upstream->add_downstream(downstream);
if(upstream->start_downstream(downstream) != 0) {
// If downstream connection fails, issue RST_STREAM.
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
}
}
break;
}
@ -186,7 +201,7 @@ void on_data_chunk_recv_callback(spdylay_session *session,
<< stream_id;
}
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
Downstream *downstream = upstream->get_downstream_queue()->find(stream_id);
Downstream *downstream = upstream->find_downstream(stream_id);
if(downstream) {
downstream->push_upload_data_chunk(data, len);
if(flags & SPDYLAY_DATA_FLAG_FIN) {
@ -285,6 +300,11 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr)
Downstream *downstream = reinterpret_cast<Downstream*>(ptr);
SpdyUpstream *upstream;
upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
if(downstream->get_request_state() == Downstream::IDLE) {
upstream->remove_downstream(downstream);
delete downstream;
return;
}
// If upstream SPDY stream was closed, we just close downstream,
// because there is no consumer now.
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
@ -320,13 +340,20 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
Downstream *downstream = reinterpret_cast<Downstream*>(ptr);
SpdyUpstream *upstream;
upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
if(downstream->get_request_state() == Downstream::IDLE) {
if(ENABLE_LOG) {
LOG(INFO) << "Delete idle downstream in spdy_downstream_eventcb";
}
upstream->remove_downstream(downstream);
delete downstream;
return;
}
if(events & BEV_EVENT_CONNECTED) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream connection established. Downstream "
<< downstream;
}
}
if(events & BEV_EVENT_EOF) {
} else if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream EOF stream_id="
<< downstream->get_stream_id();
@ -479,6 +506,26 @@ void SpdyUpstream::remove_downstream(Downstream *downstream)
downstream_queue_.remove(downstream);
}
Downstream* SpdyUpstream::find_downstream(int32_t stream_id)
{
return downstream_queue_.find(stream_id);
}
Downstream* SpdyUpstream::reuse_downstream(int32_t stream_id)
{
return downstream_queue_.reuse(stream_id);
}
void SpdyUpstream::remove_or_idle_downstream(Downstream *downstream)
{
if(downstream->get_response_connection_close()) {
downstream_queue_.remove(downstream);
delete downstream;
} else {
downstream_queue_.idle(downstream);
}
}
spdylay_session* SpdyUpstream::get_spdy_session()
{
return session_;
@ -564,9 +611,4 @@ int SpdyUpstream::on_downstream_body_complete(Downstream *downstream)
return 0;
}
DownstreamQueue* SpdyUpstream::get_downstream_queue()
{
return &downstream_queue_;
}
} // namespace shrpx

View File

@ -51,8 +51,11 @@ public:
void add_downstream(Downstream *downstream);
void remove_downstream(Downstream *downstream);
int start_downstream(Downstream *downstream);
Downstream* find_downstream(int32_t stream_id);
Downstream* reuse_downstream(int32_t stream_id);
void remove_or_idle_downstream(Downstream *downstream);
spdylay_session* get_spdy_session();
DownstreamQueue* get_downstream_queue();
int rst_stream(Downstream *downstream, int status_code);
int error_reply(Downstream *downstream, int status_code);