shrpx: Share SPDY session among multiple frontend connections per thread

In client mode, now SPDY connection to the backend server is
established per thread.  The frontend connections which belong to the
same thread share the SPDY connection.
This commit is contained in:
Tatsuhiro Tsujikawa 2012-11-21 01:29:39 +09:00
parent ae30e7f71b
commit fa552c6788
25 changed files with 1392 additions and 744 deletions

View File

@ -84,6 +84,7 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \
shrpx_downstream_connection.cc shrpx_downstream_connection.h \ shrpx_downstream_connection.cc shrpx_downstream_connection.h \
shrpx_http_downstream_connection.cc shrpx_http_downstream_connection.h \ shrpx_http_downstream_connection.cc shrpx_http_downstream_connection.h \
shrpx_spdy_downstream_connection.cc shrpx_spdy_downstream_connection.h \ shrpx_spdy_downstream_connection.cc shrpx_spdy_downstream_connection.h \
shrpx_spdy_session.cc shrpx_spdy_session.h \
shrpx_log.cc shrpx_log.h \ shrpx_log.cc shrpx_log.h \
shrpx_http.cc shrpx_http.h \ shrpx_http.cc shrpx_http.h \
shrpx_io_control.cc shrpx_io_control.h \ shrpx_io_control.cc shrpx_io_control.h \

View File

@ -252,6 +252,8 @@ int event_loop()
if(get_config()->num_worker > 1) { if(get_config()->num_worker > 1) {
listener_handler->create_worker_thread(get_config()->num_worker); listener_handler->create_worker_thread(get_config()->num_worker);
} else if(get_config()->client_mode) {
listener_handler->create_spdy_session();
} }
if(ENABLE_LOG) { if(ENABLE_LOG) {

View File

@ -118,11 +118,11 @@ ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
const char *ipaddr) const char *ipaddr)
: bev_(bev), : bev_(bev),
fd_(fd), fd_(fd),
ssl_client_ctx_(0),
ssl_(ssl), ssl_(ssl),
upstream_(0), upstream_(0),
ipaddr_(ipaddr), ipaddr_(ipaddr),
should_close_after_write_(false) should_close_after_write_(false),
spdy_(0)
{ {
bufferevent_enable(bev_, EV_READ | EV_WRITE); bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
@ -294,14 +294,14 @@ SSL* ClientHandler::get_ssl() const
return ssl_; return ssl_;
} }
void ClientHandler::set_ssl_client_ctx(SSL_CTX *ssl_ctx) void ClientHandler::set_spdy_session(SpdySession *spdy)
{ {
ssl_client_ctx_ = ssl_ctx; spdy_ = spdy;
} }
SSL_CTX* ClientHandler::get_ssl_client_ctx() const SpdySession* ClientHandler::get_spdy_session() const
{ {
return ssl_client_ctx_; return spdy_;
} }
} // namespace shrpx } // namespace shrpx

View File

@ -36,6 +36,7 @@ namespace shrpx {
class Upstream; class Upstream;
class DownstreamConnection; class DownstreamConnection;
class SpdySession;
class ClientHandler { class ClientHandler {
public: public:
@ -60,19 +61,19 @@ public:
DownstreamConnection* get_downstream_connection(); DownstreamConnection* get_downstream_connection();
size_t get_pending_write_length(); size_t get_pending_write_length();
SSL* get_ssl() const; SSL* get_ssl() const;
void set_ssl_client_ctx(SSL_CTX *ssl_ctx); void set_spdy_session(SpdySession *spdy);
SSL_CTX* get_ssl_client_ctx() const; SpdySession* get_spdy_session() const;
private: private:
bufferevent *bev_; bufferevent *bev_;
int fd_; int fd_;
// SSL_CTX for SSL object to connect backend SPDY server
SSL_CTX *ssl_client_ctx_;
SSL *ssl_; SSL *ssl_;
Upstream *upstream_; Upstream *upstream_;
std::string ipaddr_; std::string ipaddr_;
bool should_close_after_write_; bool should_close_after_write_;
std::set<DownstreamConnection*> dconn_pool_; std::set<DownstreamConnection*> dconn_pool_;
// Shared SPDY session for each thread. NULL if not client mode. Not
// deleted by this object.
SpdySession *spdy_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -42,7 +42,6 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
dconn_(0), dconn_(0),
stream_id_(stream_id), stream_id_(stream_id),
priority_(priority), priority_(priority),
ioctrl_(0),
downstream_stream_id_(-1), downstream_stream_id_(-1),
request_state_(INITIAL), request_state_(INITIAL),
request_major_(1), request_major_(1),
@ -58,13 +57,9 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
chunked_response_(false), chunked_response_(false),
response_connection_close_(false), response_connection_close_(false),
response_header_key_prev_(false), response_header_key_prev_(false),
response_htp_(new http_parser()),
response_body_buf_(0), response_body_buf_(0),
recv_window_size_(0) recv_window_size_(0)
{ {}
http_parser_init(response_htp_, HTTP_RESPONSE);
response_htp_->data = this;
}
Downstream::~Downstream() Downstream::~Downstream()
{ {
@ -78,7 +73,6 @@ Downstream::~Downstream()
if(dconn_) { if(dconn_) {
delete dconn_; delete dconn_;
} }
delete response_htp_;
if(ENABLE_LOG) { if(ENABLE_LOG) {
LOG(INFO) << "Deleted"; LOG(INFO) << "Deleted";
} }
@ -87,11 +81,6 @@ Downstream::~Downstream()
void Downstream::set_downstream_connection(DownstreamConnection *dconn) void Downstream::set_downstream_connection(DownstreamConnection *dconn)
{ {
dconn_ = dconn; dconn_ = dconn;
if(dconn_) {
ioctrl_.set_bev(dconn_->get_bev());
} else {
ioctrl_.set_bev(0);
}
} }
DownstreamConnection* Downstream::get_downstream_connection() DownstreamConnection* Downstream::get_downstream_connection()
@ -101,17 +90,25 @@ DownstreamConnection* Downstream::get_downstream_connection()
void Downstream::pause_read(IOCtrlReason reason) void Downstream::pause_read(IOCtrlReason reason)
{ {
ioctrl_.pause_read(reason); if(dconn_) {
dconn_->pause_read(reason);
}
} }
bool Downstream::resume_read(IOCtrlReason reason) bool Downstream::resume_read(IOCtrlReason reason)
{ {
return ioctrl_.resume_read(reason); if(dconn_) {
return dconn_->resume_read(reason);
} else {
return false;
}
} }
void Downstream::force_resume_read() void Downstream::force_resume_read()
{ {
ioctrl_.force_resume_read(); if(dconn_) {
dconn_->force_resume_read();
}
} }
namespace { namespace {
@ -285,9 +282,7 @@ bool Downstream::get_expect_100_continue() const
bool Downstream::get_output_buffer_full() bool Downstream::get_output_buffer_full()
{ {
if(dconn_) { if(dconn_) {
bufferevent *bev = dconn_->get_bev(); return dconn_->get_output_buffer_full();
evbuffer *output = bufferevent_get_output(bev);
return evbuffer_get_length(output) >= DOWNSTREAM_OUTPUT_UPPER_THRES;
} else { } else {
return false; return false;
} }
@ -414,116 +409,9 @@ void Downstream::set_response_connection_close(bool f)
response_connection_close_ = f; response_connection_close_ = f;
} }
namespace { int Downstream::on_read()
int htp_hdrs_completecb(http_parser *htp)
{ {
Downstream *downstream; return dconn_->on_read();
downstream = reinterpret_cast<Downstream*>(htp->data);
downstream->set_response_http_status(htp->status_code);
downstream->set_response_major(htp->http_major);
downstream->set_response_minor(htp->http_minor);
downstream->set_response_connection_close(!http_should_keep_alive(htp));
downstream->set_response_state(Downstream::HEADER_COMPLETE);
if(downstream->get_upstream()->on_downstream_header_complete(downstream)
!= 0) {
return -1;
}
unsigned int status = downstream->get_response_http_status();
// Ignore the response body. HEAD response may contain
// Content-Length or Transfer-Encoding: chunked. Some server send
// 304 status code with nonzero Content-Length, but without response
// body. See
// http://tools.ietf.org/html/draft-ietf-httpbis-p1-messaging-20#section-3.3
return downstream->get_request_method() == "HEAD" ||
(100 <= status && status <= 199) || status == 204 ||
status == 304 ? 1 : 0;
}
} // namespace
namespace {
int htp_hdr_keycb(http_parser *htp, const char *data, size_t len)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
if(downstream->get_response_header_key_prev()) {
downstream->append_last_response_header_key(data, len);
} else {
downstream->add_response_header(std::string(data, len), "");
}
return 0;
}
} // namespace
namespace {
int htp_hdr_valcb(http_parser *htp, const char *data, size_t len)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
if(downstream->get_response_header_key_prev()) {
downstream->set_last_response_header_value(std::string(data, len));
} else {
downstream->append_last_response_header_value(data, len);
}
return 0;
}
} // namespace
namespace {
int htp_bodycb(http_parser *htp, const char *data, size_t len)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
return downstream->get_upstream()->on_downstream_body
(downstream, reinterpret_cast<const uint8_t*>(data), len);
}
} // namespace
namespace {
int htp_msg_completecb(http_parser *htp)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
downstream->set_response_state(Downstream::MSG_COMPLETE);
return downstream->get_upstream()->on_downstream_body_complete(downstream);
}
} // namespace
namespace {
http_parser_settings htp_hooks = {
0, /*http_cb on_message_begin;*/
0, /*http_data_cb on_url;*/
htp_hdr_keycb, /*http_data_cb on_header_field;*/
htp_hdr_valcb, /*http_data_cb on_header_value;*/
htp_hdrs_completecb, /*http_cb on_headers_complete;*/
htp_bodycb, /*http_data_cb on_body;*/
htp_msg_completecb /*http_cb on_message_complete;*/
};
} // namespace
int Downstream::parse_http_response()
{
bufferevent *bev = dconn_->get_bev();
evbuffer *input = bufferevent_get_input(bev);
unsigned char *mem = evbuffer_pullup(input, -1);
size_t nread = http_parser_execute(response_htp_, &htp_hooks,
reinterpret_cast<const char*>(mem),
evbuffer_get_length(input));
evbuffer_drain(input, nread);
http_errno htperr = HTTP_PARSER_ERRNO(response_htp_);
if(htperr == HPE_OK) {
return 0;
} else {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream HTTP parser failure: "
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr);
}
return SHRPX_ERR_HTTP_PARSE;
}
} }
void Downstream::set_response_state(int state) void Downstream::set_response_state(int state)

View File

@ -35,10 +35,6 @@
#include <event.h> #include <event.h>
#include <event2/bufferevent.h> #include <event2/bufferevent.h>
extern "C" {
#include "http-parser/http_parser.h"
}
#include "shrpx_io_control.h" #include "shrpx_io_control.h"
namespace shrpx { namespace shrpx {
@ -128,19 +124,21 @@ public:
void set_chunked_response(bool f); void set_chunked_response(bool f);
bool get_response_connection_close() const; bool get_response_connection_close() const;
void set_response_connection_close(bool f); void set_response_connection_close(bool f);
int parse_http_response();
void set_response_state(int state); void set_response_state(int state);
int get_response_state() const; int get_response_state() const;
int init_response_body_buf(); int init_response_body_buf();
evbuffer* get_response_body_buf(); evbuffer* get_response_body_buf();
static const size_t DOWNSTREAM_OUTPUT_UPPER_THRES = 64*1024; // Call this method when there is incoming data in downstream
// connection.
int on_read();
static const size_t OUTPUT_UPPER_THRES = 64*1024;
private: private:
Upstream *upstream_; Upstream *upstream_;
DownstreamConnection *dconn_; DownstreamConnection *dconn_;
int32_t stream_id_; int32_t stream_id_;
int priority_; int priority_;
IOControl ioctrl_;
// stream ID in backend connection // stream ID in backend connection
int32_t downstream_stream_id_; int32_t downstream_stream_id_;
@ -163,7 +161,6 @@ private:
bool response_connection_close_; bool response_connection_close_;
Headers response_headers_; Headers response_headers_;
bool response_header_key_prev_; bool response_header_key_prev_;
http_parser *response_htp_;
// This buffer is used to temporarily store downstream response // This buffer is used to temporarily store downstream response
// body. Spdylay reads data from this in the callback. // body. Spdylay reads data from this in the callback.
evbuffer *response_body_buf_; evbuffer *response_body_buf_;

View File

@ -25,96 +25,17 @@
#include "shrpx_downstream_connection.h" #include "shrpx_downstream_connection.h"
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_upstream.h"
#include "shrpx_downstream.h" #include "shrpx_downstream.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
namespace shrpx { namespace shrpx {
DownstreamConnection::DownstreamConnection(ClientHandler *client_handler) DownstreamConnection::DownstreamConnection(ClientHandler *client_handler)
: client_handler_(client_handler), : client_handler_(client_handler),
bev_(0),
downstream_(0) downstream_(0)
{} {}
DownstreamConnection::~DownstreamConnection() DownstreamConnection::~DownstreamConnection()
{ {}
if(bev_) {
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
}
// Downstream and DownstreamConnection may be deleted
// asynchronously.
if(downstream_) {
downstream_->set_downstream_connection(0);
}
}
// When downstream request is issued, call this function to set read
// timeout. We don't know when the request is completely received by
// the downstream server. This function may be called before that
// happens. Overall it does not cause problem for most of the time.
// If the downstream server is too slow to recv/send, the connection
// will be dropped by read timeout.
void DownstreamConnection::start_waiting_response()
{
if(bev_) {
bufferevent_set_timeouts(bev_,
&get_config()->downstream_read_timeout,
&get_config()->downstream_write_timeout);
}
}
namespace {
// Gets called when DownstreamConnection is pooled in ClientHandler.
void idle_eventcb(bufferevent *bev, short events, void *arg)
{
DownstreamConnection *dconn = reinterpret_cast<DownstreamConnection*>(arg);
if(events & BEV_EVENT_CONNECTED) {
// Downstream was detached before connection established?
// This may be safe to be left.
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connected?" << dconn;
}
return;
}
if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection EOF " << dconn;
}
} else if(events & BEV_EVENT_TIMEOUT) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection timeout " << dconn;
}
} else if(events & BEV_EVENT_ERROR) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection error " << dconn;
}
}
ClientHandler *client_handler = dconn->get_client_handler();
client_handler->remove_downstream_connection(dconn);
delete dconn;
}
} // namespace
void DownstreamConnection::detach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Detaching downstream connection " << this << " from "
<< "downstream " << downstream;
}
downstream->set_downstream_connection(0);
downstream_ = 0;
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_, 0, 0, idle_eventcb, this);
// On idle state, just enable read timeout. Normally idle downstream
// connection will get EOF from the downstream server and closed.
bufferevent_set_timeouts(bev_,
&get_config()->downstream_idle_read_timeout,
&get_config()->downstream_write_timeout);
client_handler_->pool_downstream_connection(this);
}
ClientHandler* DownstreamConnection::get_client_handler() ClientHandler* DownstreamConnection::get_client_handler()
{ {
@ -126,9 +47,4 @@ Downstream* DownstreamConnection::get_downstream()
return downstream_; return downstream_;
} }
bufferevent* DownstreamConnection::get_bev()
{
return bev_;
}
} // namespace shrpx } // namespace shrpx

View File

@ -27,8 +27,7 @@
#include "shrpx.h" #include "shrpx.h"
#include <event.h> #include "shrpx_io_control.h"
#include <event2/bufferevent.h>
namespace shrpx { namespace shrpx {
@ -40,21 +39,25 @@ public:
DownstreamConnection(ClientHandler *client_handler); DownstreamConnection(ClientHandler *client_handler);
virtual ~DownstreamConnection(); virtual ~DownstreamConnection();
virtual int attach_downstream(Downstream *downstream) = 0; virtual int attach_downstream(Downstream *downstream) = 0;
void detach_downstream(Downstream *downstream); virtual void detach_downstream(Downstream *downstream) = 0;
bufferevent* get_bev();
virtual int push_request_headers() = 0; virtual int push_request_headers() = 0;
virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen) = 0; virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen) = 0;
virtual int end_upload_data() = 0; virtual int end_upload_data() = 0;
virtual int on_connect() = 0; virtual void pause_read(IOCtrlReason reason) = 0;
virtual bool resume_read(IOCtrlReason reason) = 0;
virtual void force_resume_read() = 0;
virtual bool get_output_buffer_full() = 0;
virtual int on_read() = 0;
virtual int on_write() = 0;
ClientHandler* get_client_handler(); ClientHandler* get_client_handler();
Downstream* get_downstream(); Downstream* get_downstream();
void start_waiting_response();
protected: protected:
ClientHandler *client_handler_; ClientHandler *client_handler_;
bufferevent *bev_;
Downstream *downstream_; Downstream *downstream_;
}; };

View File

@ -44,11 +44,25 @@ timeval max_timeout = { 86400, 0 };
HttpDownstreamConnection::HttpDownstreamConnection HttpDownstreamConnection::HttpDownstreamConnection
(ClientHandler *client_handler) (ClientHandler *client_handler)
: DownstreamConnection(client_handler) : DownstreamConnection(client_handler),
bev_(0),
ioctrl_(0),
response_htp_(new http_parser())
{} {}
HttpDownstreamConnection::~HttpDownstreamConnection() HttpDownstreamConnection::~HttpDownstreamConnection()
{} {
delete response_htp_;
if(bev_) {
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
}
// Downstream and DownstreamConnection may be deleted
// asynchronously.
if(downstream_) {
downstream_->set_downstream_connection(0);
}
}
int HttpDownstreamConnection::attach_downstream(Downstream *downstream) int HttpDownstreamConnection::attach_downstream(Downstream *downstream)
{ {
@ -78,6 +92,12 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream)
} }
downstream->set_downstream_connection(this); downstream->set_downstream_connection(this);
downstream_ = downstream; downstream_ = downstream;
ioctrl_.set_bev(bev_);
http_parser_init(response_htp_, HTTP_RESPONSE);
response_htp_->data = downstream_;
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
bufferevent_enable(bev_, EV_READ); bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_, bufferevent_setcb(bev_,
@ -172,7 +192,17 @@ int HttpDownstreamConnection::push_request_headers()
if(rv != 0) { if(rv != 0) {
return -1; return -1;
} }
start_waiting_response();
// When downstream request is issued, set read timeout. We don't
// know when the request is completely received by the downstream
// server. This function may be called before that happens. Overall
// it does not cause problem for most of the time. If the
// downstream server is too slow to recv/send, the connection will
// be dropped by read timeout.
bufferevent_set_timeouts(bev_,
&get_config()->downstream_read_timeout,
&get_config()->downstream_write_timeout);
return 0; return 0;
} }
@ -223,7 +253,196 @@ int HttpDownstreamConnection::end_upload_data()
return 0; return 0;
} }
int HttpDownstreamConnection::on_connect() namespace {
// Gets called when DownstreamConnection is pooled in ClientHandler.
void idle_eventcb(bufferevent *bev, short events, void *arg)
{
HttpDownstreamConnection *dconn;
dconn = reinterpret_cast<HttpDownstreamConnection*>(arg);
if(events & BEV_EVENT_CONNECTED) {
// Downstream was detached before connection established?
// This may be safe to be left.
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connected?" << dconn;
}
return;
}
if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection EOF " << dconn;
}
} else if(events & BEV_EVENT_TIMEOUT) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection timeout " << dconn;
}
} else if(events & BEV_EVENT_ERROR) {
if(ENABLE_LOG) {
LOG(INFO) << "Idle downstream connection error " << dconn;
}
}
ClientHandler *client_handler = dconn->get_client_handler();
client_handler->remove_downstream_connection(dconn);
delete dconn;
}
} // namespace
void HttpDownstreamConnection::detach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Detaching downstream connection " << this << " from "
<< "downstream " << downstream;
}
downstream->set_downstream_connection(0);
downstream_ = 0;
ioctrl_.force_resume_read();
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_, 0, 0, idle_eventcb, this);
// On idle state, just enable read timeout. Normally idle downstream
// connection will get EOF from the downstream server and closed.
bufferevent_set_timeouts(bev_,
&get_config()->downstream_idle_read_timeout,
&get_config()->downstream_write_timeout);
client_handler_->pool_downstream_connection(this);
}
bufferevent* HttpDownstreamConnection::get_bev()
{
return bev_;
}
void HttpDownstreamConnection::pause_read(IOCtrlReason reason)
{
ioctrl_.pause_read(reason);
}
bool HttpDownstreamConnection::resume_read(IOCtrlReason reason)
{
return ioctrl_.resume_read(reason);
}
void HttpDownstreamConnection::force_resume_read()
{
ioctrl_.force_resume_read();
}
bool HttpDownstreamConnection::get_output_buffer_full()
{
evbuffer *output = bufferevent_get_output(bev_);
return evbuffer_get_length(output) >= Downstream::OUTPUT_UPPER_THRES;
}
namespace {
int htp_hdrs_completecb(http_parser *htp)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
downstream->set_response_http_status(htp->status_code);
downstream->set_response_major(htp->http_major);
downstream->set_response_minor(htp->http_minor);
downstream->set_response_connection_close(!http_should_keep_alive(htp));
downstream->set_response_state(Downstream::HEADER_COMPLETE);
if(downstream->get_upstream()->on_downstream_header_complete(downstream)
!= 0) {
return -1;
}
unsigned int status = downstream->get_response_http_status();
// Ignore the response body. HEAD response may contain
// Content-Length or Transfer-Encoding: chunked. Some server send
// 304 status code with nonzero Content-Length, but without response
// body. See
// http://tools.ietf.org/html/draft-ietf-httpbis-p1-messaging-20#section-3.3
return downstream->get_request_method() == "HEAD" ||
(100 <= status && status <= 199) || status == 204 ||
status == 304 ? 1 : 0;
}
} // namespace
namespace {
int htp_hdr_keycb(http_parser *htp, const char *data, size_t len)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
if(downstream->get_response_header_key_prev()) {
downstream->append_last_response_header_key(data, len);
} else {
downstream->add_response_header(std::string(data, len), "");
}
return 0;
}
} // namespace
namespace {
int htp_hdr_valcb(http_parser *htp, const char *data, size_t len)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
if(downstream->get_response_header_key_prev()) {
downstream->set_last_response_header_value(std::string(data, len));
} else {
downstream->append_last_response_header_value(data, len);
}
return 0;
}
} // namespace
namespace {
int htp_bodycb(http_parser *htp, const char *data, size_t len)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
return downstream->get_upstream()->on_downstream_body
(downstream, reinterpret_cast<const uint8_t*>(data), len);
}
} // namespace
namespace {
int htp_msg_completecb(http_parser *htp)
{
Downstream *downstream;
downstream = reinterpret_cast<Downstream*>(htp->data);
downstream->set_response_state(Downstream::MSG_COMPLETE);
return downstream->get_upstream()->on_downstream_body_complete(downstream);
}
} // namespace
namespace {
http_parser_settings htp_hooks = {
0, /*http_cb on_message_begin;*/
0, /*http_data_cb on_url;*/
htp_hdr_keycb, /*http_data_cb on_header_field;*/
htp_hdr_valcb, /*http_data_cb on_header_value;*/
htp_hdrs_completecb, /*http_cb on_headers_complete;*/
htp_bodycb, /*http_data_cb on_body;*/
htp_msg_completecb /*http_cb on_message_complete;*/
};
} // namespace
int HttpDownstreamConnection::on_read()
{
evbuffer *input = bufferevent_get_input(bev_);
unsigned char *mem = evbuffer_pullup(input, -1);
size_t nread = http_parser_execute(response_htp_, &htp_hooks,
reinterpret_cast<const char*>(mem),
evbuffer_get_length(input));
evbuffer_drain(input, nread);
http_errno htperr = HTTP_PARSER_ERRNO(response_htp_);
if(htperr == HPE_OK) {
return 0;
} else {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream HTTP parser failure: "
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr);
}
return SHRPX_ERR_HTTP_PARSE;
}
}
int HttpDownstreamConnection::on_write()
{ {
return 0; return 0;
} }

View File

@ -27,7 +27,13 @@
#include "shrpx.h" #include "shrpx.h"
#include <event.h>
#include <event2/bufferevent.h>
#include "http-parser/http_parser.h"
#include "shrpx_downstream_connection.h" #include "shrpx_downstream_connection.h"
#include "shrpx_io_control.h"
namespace shrpx { namespace shrpx {
@ -36,12 +42,26 @@ public:
HttpDownstreamConnection(ClientHandler *client_handler); HttpDownstreamConnection(ClientHandler *client_handler);
virtual ~HttpDownstreamConnection(); virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream); virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
virtual int push_request_headers(); virtual int push_request_headers();
virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen); virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen);
virtual int end_upload_data(); virtual int end_upload_data();
virtual int on_connect(); virtual void pause_read(IOCtrlReason reason);
virtual bool resume_read(IOCtrlReason reason);
virtual void force_resume_read();
virtual bool get_output_buffer_full();
virtual int on_read();
virtual int on_write();
bufferevent* get_bev();
private:
bufferevent *bev_;
IOControl ioctrl_;
http_parser *response_htp_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -317,13 +317,15 @@ void HttpsUpstream::pause_read(IOCtrlReason reason)
ioctrl_.pause_read(reason); ioctrl_.pause_read(reason);
} }
void HttpsUpstream::resume_read(IOCtrlReason reason) int HttpsUpstream::resume_read(IOCtrlReason reason)
{ {
if(ioctrl_.resume_read(reason)) { if(ioctrl_.resume_read(reason)) {
// Process remaining data in input buffer here because these bytes // Process remaining data in input buffer here because these bytes
// are not notified by readcb until new data arrive. // are not notified by readcb until new data arrive.
http_parser_pause(htp_, 0); http_parser_pause(htp_, 0);
on_read(); return on_read();
} else {
return 0;
} }
} }
@ -335,12 +337,10 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
HttpsUpstream *upstream; HttpsUpstream *upstream;
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream()); upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
int rv; int rv;
if(get_config()->client_mode) { rv = downstream->on_read();
rv = reinterpret_cast<SpdyDownstreamConnection*>(dconn)->on_read(); if(downstream->get_response_state() == Downstream::MSG_RESET) {
} else { delete upstream->get_client_handler();
rv = downstream->parse_http_response(); } else if(rv == 0) {
}
if(rv == 0) {
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(downstream->get_response_connection_close()) { if(downstream->get_response_connection_close()) {
// Connection close // Connection close
@ -379,8 +379,6 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
// We already sent HTTP response headers to upstream // We already sent HTTP response headers to upstream
// client. Just close the upstream connection. // client. Just close the upstream connection.
delete upstream->get_client_handler(); delete upstream->get_client_handler();
} else if(downstream->get_response_state() == Downstream::MSG_RESET) {
delete upstream->get_client_handler();
} else { } else {
// We did not sent any HTTP response, so sent error // We did not sent any HTTP response, so sent error
// response. Cannot reuse downstream connection in this case. // response. Cannot reuse downstream connection in this case.
@ -406,14 +404,6 @@ void https_downstream_writecb(bufferevent *bev, void *ptr)
HttpsUpstream *upstream; HttpsUpstream *upstream;
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream()); upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
upstream->resume_read(SHRPX_NO_BUFFER); upstream->resume_read(SHRPX_NO_BUFFER);
if(get_config()->client_mode) {
int rv;
rv = reinterpret_cast<SpdyDownstreamConnection*>(dconn)->on_write();
if(rv != 0) {
delete upstream->get_client_handler();
return;
}
}
} }
} // namespace } // namespace
@ -429,11 +419,6 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
LOG(INFO) << "Downstream connection established. downstream " LOG(INFO) << "Downstream connection established. downstream "
<< downstream; << downstream;
} }
if(dconn->on_connect() != 0) {
// TODO Return error status 502
delete upstream->get_client_handler();
return;
}
} else if(events & BEV_EVENT_EOF) { } else if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) { if(ENABLE_LOG) {
LOG(INFO) << "Downstream EOF. stream_id=" LOG(INFO) << "Downstream EOF. stream_id="

View File

@ -57,7 +57,7 @@ public:
int error_reply(int status_code); int error_reply(int status_code);
virtual void pause_read(IOCtrlReason reason); virtual void pause_read(IOCtrlReason reason);
virtual void resume_read(IOCtrlReason reason); virtual int resume_read(IOCtrlReason reason);
virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,

View File

@ -36,6 +36,7 @@
#include "shrpx_ssl.h" #include "shrpx_ssl.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_spdy_session.h"
namespace shrpx { namespace shrpx {
@ -45,7 +46,8 @@ ListenHandler::ListenHandler(event_base *evbase)
ssl::create_ssl_client_context() : ssl::create_ssl_context()), ssl::create_ssl_client_context() : ssl::create_ssl_context()),
worker_round_robin_cnt_(0), worker_round_robin_cnt_(0),
workers_(0), workers_(0),
num_worker_(0) num_worker_(0),
spdy_(0)
{} {}
ListenHandler::~ListenHandler() ListenHandler::~ListenHandler()
@ -93,8 +95,11 @@ int ListenHandler::accept_connection(evutil_socket_t fd,
LOG(INFO) << "<listener> Accepted connection. fd=" << fd; LOG(INFO) << "<listener> Accepted connection. fd=" << fd;
} }
if(num_worker_ == 0) { if(num_worker_ == 0) {
/*ClientHandler* client = */ ClientHandler* client =
ssl::accept_ssl_connection(evbase_, ssl_ctx_, fd, addr, addrlen); ssl::accept_ssl_connection(evbase_, ssl_ctx_, fd, addr, addrlen);
if(get_config()->client_mode) {
client->set_spdy_session(spdy_);
}
} else { } else {
size_t idx = worker_round_robin_cnt_ % num_worker_; size_t idx = worker_round_robin_cnt_ % num_worker_;
++worker_round_robin_cnt_; ++worker_round_robin_cnt_;
@ -114,4 +119,12 @@ event_base* ListenHandler::get_evbase() const
return evbase_; return evbase_;
} }
int ListenHandler::create_spdy_session()
{
int rv;
spdy_ = new SpdySession(evbase_, ssl_ctx_);
rv = spdy_->init_notification();
return rv;
}
} // namespace shrpx } // namespace shrpx

View File

@ -42,6 +42,8 @@ struct WorkerInfo {
bufferevent *bev; bufferevent *bev;
}; };
class SpdySession;
class ListenHandler { class ListenHandler {
public: public:
ListenHandler(event_base *evbase); ListenHandler(event_base *evbase);
@ -49,14 +51,18 @@ public:
int accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen); int accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen);
void create_worker_thread(size_t num); void create_worker_thread(size_t num);
event_base* get_evbase() const; event_base* get_evbase() const;
int create_spdy_session();
private: private:
event_base *evbase_; event_base *evbase_;
// In client-mode, this is for backend SPDY connection. Otherwise, // In client-mode, this is for backend SPDY connection. Otherwise,
// frontend SSL/TLS connection. // for frontend.
SSL_CTX *ssl_ctx_; SSL_CTX *ssl_ctx_;
unsigned int worker_round_robin_cnt_; unsigned int worker_round_robin_cnt_;
WorkerInfo *workers_; WorkerInfo *workers_;
size_t num_worker_; size_t num_worker_;
// Shared SPDY session. NULL if not client mode or
// multi-threaded. In multi-threaded case, see shrpx_worker.cc.
SpdySession *spdy_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -30,12 +30,15 @@
#include <event2/bufferevent_ssl.h> #include <event2/bufferevent_ssl.h>
#include <http-parser/http_parser.h>
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_upstream.h" #include "shrpx_upstream.h"
#include "shrpx_downstream.h" #include "shrpx_downstream.h"
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_error.h" #include "shrpx_error.h"
#include "shrpx_http.h" #include "shrpx_http.h"
#include "shrpx_spdy_session.h"
#include "util.h" #include "util.h"
using namespace spdylay; using namespace spdylay;
@ -45,36 +48,23 @@ namespace shrpx {
SpdyDownstreamConnection::SpdyDownstreamConnection SpdyDownstreamConnection::SpdyDownstreamConnection
(ClientHandler *client_handler) (ClientHandler *client_handler)
: DownstreamConnection(client_handler), : DownstreamConnection(client_handler),
ssl_(0), spdy_(client_handler->get_spdy_session()),
session_(0), request_body_buf_(0),
request_body_buf_(0) sd_(0)
{} {}
SpdyDownstreamConnection::~SpdyDownstreamConnection() SpdyDownstreamConnection::~SpdyDownstreamConnection()
{ {
spdylay_session_del(session_);
int fd = -1;
if(ssl_) {
fd = SSL_get_fd(ssl_);
SSL_shutdown(ssl_);
}
if(bev_) {
// We want to deallocate bev_ between SSL_shutdown and
// SSL_free. This might not be necessary for recent libevent.
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
bev_ = 0;
}
if(ssl_) {
SSL_free(ssl_);
}
if(fd != -1) {
shutdown(fd, SHUT_WR);
close(fd);
}
if(request_body_buf_) { if(request_body_buf_) {
evbuffer_free(request_body_buf_); evbuffer_free(request_body_buf_);
} }
// TODO need RST_STREAM?
spdy_->remove_downstream_connection(this);
// Downstream and DownstreamConnection may be deleted
// asynchronously.
if(downstream_) {
downstream_->set_downstream_connection(0);
}
} }
int SpdyDownstreamConnection::init_request_body_buf() int SpdyDownstreamConnection::init_request_body_buf()
@ -105,47 +95,30 @@ int SpdyDownstreamConnection::attach_downstream(Downstream *downstream)
if(init_request_body_buf() == -1) { if(init_request_body_buf() == -1) {
return -1; return -1;
} }
Upstream *upstream = downstream->get_upstream(); spdy_->add_downstream_connection(this);
if(!bev_) { if(spdy_->get_state() == SpdySession::DISCONNECTED) {
event_base *evbase = client_handler_->get_evbase(); spdy_->notify();
ssl_ = SSL_new(client_handler_->get_ssl_client_ctx());
if(!ssl_) {
LOG(ERROR) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), NULL);
return -1;
}
bev_ = bufferevent_openssl_socket_new(evbase, -1, ssl_,
BUFFEREVENT_SSL_CONNECTING,
BEV_OPT_DEFER_CALLBACKS);
int rv = bufferevent_socket_connect
(bev_,
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
if(rv != 0) {
bufferevent_free(bev_);
bev_ = 0;
return SHRPX_ERR_NETWORK;
}
if(ENABLE_LOG) {
LOG(INFO) << "Connecting to downstream server " << this;
}
} }
downstream->set_downstream_connection(this); downstream->set_downstream_connection(this);
downstream_ = downstream; downstream_ = downstream;
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_,
upstream->get_downstream_readcb(),
upstream->get_downstream_writecb(),
upstream->get_downstream_eventcb(), this);
bufferevent_set_timeouts(bev_,
&get_config()->downstream_read_timeout,
&get_config()->downstream_write_timeout);
return 0; return 0;
} }
void SpdyDownstreamConnection::detach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Detaching spdy downstream connection " << this << " from "
<< "downstream " << downstream;
}
downstream->set_downstream_connection(0);
downstream_ = 0;
// TODO do something to SpdySession? RST_STREAM?
client_handler_->pool_downstream_connection(this);
}
namespace { namespace {
ssize_t spdy_data_read_callback(spdylay_session *session, ssize_t spdy_data_read_callback(spdylay_session *session,
int32_t stream_id, int32_t stream_id,
@ -154,6 +127,12 @@ ssize_t spdy_data_read_callback(spdylay_session *session,
spdylay_data_source *source, spdylay_data_source *source,
void *user_data) void *user_data)
{ {
StreamData *sd;
sd = reinterpret_cast<StreamData*>
(spdylay_session_get_stream_user_data(session, stream_id));
if(!sd || !sd->dconn) {
return SPDYLAY_ERR_DEFERRED;
}
SpdyDownstreamConnection *dconn; SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(source->ptr); dconn = reinterpret_cast<SpdyDownstreamConnection*>(source->ptr);
Downstream *downstream = dconn->get_downstream(); Downstream *downstream = dconn->get_downstream();
@ -163,14 +142,26 @@ ssize_t spdy_data_read_callback(spdylay_session *session,
return SPDYLAY_ERR_DEFERRED; return SPDYLAY_ERR_DEFERRED;
} }
evbuffer *body = dconn->get_request_body_buf(); evbuffer *body = dconn->get_request_body_buf();
int nread = evbuffer_remove(body, buf, length); int nread = 0;
if(nread == 0 && for(;;) {
downstream->get_request_state() == Downstream::MSG_COMPLETE) { nread = evbuffer_remove(body, buf, length);
if(nread == 0) {
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
*eof = 1; *eof = 1;
} break;
if(nread == 0 && *eof != 1) { } else {
if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER) == -1) {
// In this case, downstream may be deleted.
return SPDYLAY_ERR_DEFERRED; return SPDYLAY_ERR_DEFERRED;
} }
if(evbuffer_get_length(body) == 0) {
return SPDYLAY_ERR_DEFERRED;
}
}
} else {
break;
}
}
return nread; return nread;
} }
} // namespace } // namespace
@ -188,10 +179,9 @@ void copy_url_component(std::string& dest, http_parser_url *u, int field,
int SpdyDownstreamConnection::push_request_headers() int SpdyDownstreamConnection::push_request_headers()
{ {
int rv; int rv;
if(!session_) { if(spdy_->get_state() != SpdySession::CONNECTED) {
// If the connection to the backend has not been established, // The SPDY session to the backend has not been established. This
// session_ is not initialized. This function will be called again // function will be called again just after it is established.
// just after SSL/TLS handshake is done.
return 0; return 0;
} }
if(!downstream_) { if(!downstream_) {
@ -318,16 +308,17 @@ int SpdyDownstreamConnection::push_request_headers()
spdylay_data_provider data_prd; spdylay_data_provider data_prd;
data_prd.source.ptr = this; data_prd.source.ptr = this;
data_prd.read_callback = spdy_data_read_callback; data_prd.read_callback = spdy_data_read_callback;
rv = spdylay_submit_request(session_, 0, nv, &data_prd, 0); rv = spdy_->submit_request(this, 0, nv, &data_prd);
} else { } else {
rv = spdylay_submit_request(session_, 0, nv, 0, 0); rv = spdy_->submit_request(this, 0, nv, 0);
} }
delete [] nv; delete [] nv;
if(rv != 0) { if(rv != 0) {
LOG(FATAL) << "spdylay_submit_request() failed"; LOG(FATAL) << "spdylay_submit_request() failed";
return -1; return -1;
} }
return send(); spdy_->notify();
return 0;
} }
int SpdyDownstreamConnection::push_upload_data_chunk(const uint8_t *data, int SpdyDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
@ -339,413 +330,36 @@ int SpdyDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
return -1; return -1;
} }
if(downstream_->get_downstream_stream_id() != -1) { if(downstream_->get_downstream_stream_id() != -1) {
spdylay_session_resume_data(session_, rv = spdy_->resume_data(this);
downstream_->get_downstream_stream_id());
rv = send();
if(rv != 0) { if(rv != 0) {
return -1; return -1;
} }
} spdy_->notify();
size_t bodylen = evbuffer_get_length(request_body_buf_);
if(bodylen > Downstream::DOWNSTREAM_OUTPUT_UPPER_THRES) {
downstream_->get_upstream()->pause_read(SHRPX_NO_BUFFER);
} }
return 0; return 0;
} }
int SpdyDownstreamConnection::end_upload_data() int SpdyDownstreamConnection::end_upload_data()
{ {
int rv;
if(downstream_->get_downstream_stream_id() != -1) { if(downstream_->get_downstream_stream_id() != -1) {
spdylay_session_resume_data(session_, rv = spdy_->resume_data(this);
downstream_->get_downstream_stream_id());
return send();
} else {
return 0;
}
}
namespace {
ssize_t send_callback(spdylay_session *session,
const uint8_t *data, size_t len, int flags,
void *user_data)
{
int rv;
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
bufferevent *bev = dconn->get_bev();
evbuffer *output = bufferevent_get_output(bev);
// Check buffer length and return WOULDBLOCK if it is large enough.
if(evbuffer_get_length(output) > Downstream::DOWNSTREAM_OUTPUT_UPPER_THRES) {
return SPDYLAY_ERR_WOULDBLOCK;
}
rv = evbuffer_add(output, data, len);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return SPDYLAY_ERR_CALLBACK_FAILURE;
} else {
return len;
}
}
} // namespace
namespace {
ssize_t recv_callback(spdylay_session *session,
uint8_t *data, size_t len, int flags, void *user_data)
{
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
bufferevent *bev = dconn->get_bev();
evbuffer *input = bufferevent_get_input(bev);
int nread = evbuffer_remove(input, data, len);
if(nread == -1) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
} else if(nread == 0) {
return SPDYLAY_ERR_WOULDBLOCK;
} else {
return nread;
}
}
} // namespace
namespace {
void on_stream_close_callback
(spdylay_session *session, int32_t stream_id, spdylay_status_code status_code,
void *user_data)
{
int rv;
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy Stream " << stream_id << " is being closed";
}
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
// We might get this close callback when pushed streams are
// closed.
return;
}
downstream->set_response_state(Downstream::MSG_COMPLETE);
rv = downstream->get_upstream()->on_downstream_body_complete(downstream);
if(rv != 0) {
downstream->set_response_state(Downstream::MSG_RESET);
return;
}
}
} // namespace
namespace {
void on_ctrl_recv_callback
(spdylay_session *session, spdylay_frame_type type, spdylay_frame *frame,
void *user_data)
{
int rv;
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
switch(type) {
case SPDYLAY_SYN_STREAM:
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
}
// We just respond pushed stream with RST_STREAM.
spdylay_submit_rst_stream(session, frame->syn_stream.stream_id,
SPDYLAY_REFUSED_STREAM);
break;
case SPDYLAY_RST_STREAM:
if(downstream &&
downstream->get_downstream_stream_id() == frame->rst_stream.stream_id) {
// If we got RST_STREAM, just flag MSG_RESET to indicate
// upstream connection must be terminated.
downstream->set_response_state(Downstream::MSG_RESET);
}
break;
case SPDYLAY_SYN_REPLY: {
if(!downstream ||
downstream->get_downstream_stream_id() != frame->syn_reply.stream_id) {
break;
}
char **nv = frame->syn_reply.nv;
const char *status = 0;
const char *version = 0;
const char *content_length = 0;
for(size_t i = 0; nv[i]; i += 2) {
if(strcmp(nv[i], ":status") == 0) {
unsigned int code = strtoul(nv[i+1], 0, 10);
downstream->set_response_http_status(code);
status = nv[i+1];
} else if(strcmp(nv[i], ":version") == 0) {
// We assume for now that most version is HTTP/1.1 from
// SPDY. So just check if it is HTTP/1.0 and then set response
// minor as so.
downstream->set_response_major(1);
if(util::strieq(nv[i+1], "HTTP/1.0")) {
downstream->set_response_minor(0);
} else {
downstream->set_response_minor(1);
}
version = nv[i+1];
} else if(nv[i][0] != ':') {
if(strcmp(nv[i], "content-length") == 0) {
content_length = nv[i+1];
}
downstream->add_response_header(nv[i], nv[i+1]);
}
}
if(!status || !version) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
return;
}
if(!content_length && downstream->get_request_method() != "HEAD" &&
downstream->get_request_method() != "CONNECT") {
unsigned int status;
status = downstream->get_response_http_status();
if(!((100 <= status && status <= 199) || status == 204 ||
status == 304)) {
// In SPDY, we are supporsed not to receive
// transfer-encoding.
downstream->add_response_header("transfer-encoding", "chunked");
}
}
if(ENABLE_LOG) {
std::stringstream ss;
for(size_t i = 0; nv[i]; i += 2) {
ss << nv[i] << ": " << nv[i+1] << "\n";
}
LOG(INFO) << "Downstream spdy response headers id="
<< frame->syn_reply.stream_id
<< "\n" << ss.str();
}
Upstream *upstream = downstream->get_upstream();
downstream->set_response_state(Downstream::HEADER_COMPLETE);
rv = upstream->on_downstream_header_complete(downstream);
if(rv != 0) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
return;
}
break;
}
default:
break;
}
}
} // namespace
namespace {
void on_data_chunk_recv_callback(spdylay_session *session,
uint8_t flags, int32_t stream_id,
const uint8_t *data, size_t len,
void *user_data)
{
int rv;
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
return;
}
// TODO No manual flow control at the moment.
Upstream *upstream = downstream->get_upstream();
rv = upstream->on_downstream_body(downstream, data, len);
if(rv != 0) {
spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
}
}
} // namespace
namespace {
void before_ctrl_send_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
void *user_data)
{
if(type == SPDYLAY_SYN_STREAM) {
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
if(downstream) {
downstream->set_downstream_stream_id(frame->syn_stream.stream_id);
}
}
}
} // namespace
namespace {
void on_ctrl_not_send_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
int error_code, void *user_data)
{
LOG(WARNING) << "Failed to send control frame type=" << type << ", "
<< "error_code=" << error_code << ":"
<< spdylay_strerror(error_code);
if(type == SPDYLAY_SYN_STREAM) {
// To avoid stream hanging around, flag Downstream::MSG_RESET and
// terminate the upstream and downstream connections.
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
int32_t stream_id = frame->syn_stream.stream_id;
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
return;
}
downstream->set_response_state(Downstream::MSG_RESET);
}
}
} // namespace
namespace {
void on_ctrl_recv_parse_error_callback(spdylay_session *session,
spdylay_frame_type type,
const uint8_t *head, size_t headlen,
const uint8_t *payload,
size_t payloadlen, int error_code,
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "Failed to parse received control frame. type=" << type
<< ", error_code=" << error_code << ":"
<< spdylay_strerror(error_code);
}
}
} // namespace
namespace {
void on_unknown_ctrl_recv_callback(spdylay_session *session,
const uint8_t *head, size_t headlen,
const uint8_t *payload, size_t payloadlen,
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "Received unknown control frame.";
}
}
} // namespace
int SpdyDownstreamConnection::on_connect()
{
int rv;
const unsigned char *next_proto = 0;
unsigned int next_proto_len;
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len);
if(ENABLE_LOG) {
std::string proto(next_proto, next_proto+next_proto_len);
LOG(INFO) << "Downstream negotiated next protocol: " << proto;
}
uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len);
if(!version) {
return -1;
}
spdylay_session_callbacks callbacks;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.send_callback = send_callback;
callbacks.recv_callback = recv_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
callbacks.before_ctrl_send_callback = before_ctrl_send_callback;
callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback;
callbacks.on_ctrl_recv_parse_error_callback =
on_ctrl_recv_parse_error_callback;
callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback;
rv = spdylay_session_client_new(&session_, version, &callbacks, this);
if(rv != 0) { if(rv != 0) {
return -1; return -1;
} }
spdy_->notify();
// TODO Send initial window size when manual flow control is
// implemented.
spdylay_settings_entry entry[1];
entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[0].value = get_config()->spdy_max_concurrent_streams;
entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
rv = spdylay_submit_settings
(session_, SPDYLAY_FLAG_SETTINGS_NONE,
entry, sizeof(entry)/sizeof(spdylay_settings_entry));
if(rv != 0) {
return -1;
} }
rv = send();
if(rv != 0) {
return -1;
}
// We may have pending request
push_request_headers();
return 0; return 0;
} }
int SpdyDownstreamConnection::on_read() int SpdyDownstreamConnection::on_read()
{ {
int rv = 0; return 0;
if((rv = spdylay_session_recv(session_)) < 0) {
if(rv != SPDYLAY_ERR_EOF) {
LOG(ERROR) << "spdylay_session_recv() returned error: "
<< spdylay_strerror(rv);
}
} else if((rv = spdylay_session_send(session_)) < 0) {
LOG(ERROR) << "spdylay_session_send() returned error: "
<< spdylay_strerror(rv);
}
// if(rv == 0) {
// if(spdylay_session_want_read(session_) == 0 &&
// spdylay_session_want_write(session_) == 0) {
// if(ENABLE_LOG) {
// LOG(INFO) << "No more read/write for this SPDY session";
// }
// rv = -1;
// }
// }
if(rv == SPDYLAY_ERR_EOF) {
if(downstream_) {
downstream_->set_response_connection_close(true);
}
rv = 0;
} else if(rv != 0) {
if(downstream_) {
downstream_->set_response_state(Downstream::MSG_RESET);
}
}
return rv;
} }
int SpdyDownstreamConnection::on_write() int SpdyDownstreamConnection::on_write()
{ {
return send(); return 0;
}
int SpdyDownstreamConnection::send()
{
int rv = 0;
if((rv = spdylay_session_send(session_)) < 0) {
LOG(ERROR) << "spdylay_session_send() returned error: "
<< spdylay_strerror(rv);
}
// if(rv == 0) {
// if(spdylay_session_want_read(session_) == 0 &&
// spdylay_session_want_write(session_) == 0) {
// if(ENABLE_LOG) {
// LOG(INFO) << "No more read/write for this SPDY session";
// }
// rv = -1;
// }
// }
return rv;
} }
evbuffer* SpdyDownstreamConnection::get_request_body_buf() const evbuffer* SpdyDownstreamConnection::get_request_body_buf() const
@ -753,4 +367,33 @@ evbuffer* SpdyDownstreamConnection::get_request_body_buf() const
return request_body_buf_; return request_body_buf_;
} }
void SpdyDownstreamConnection::attach_stream_data(StreamData *sd)
{
assert(sd_ == 0 && sd->dconn == 0);
sd_ = sd;
sd_->dconn = this;
}
StreamData* SpdyDownstreamConnection::detach_stream_data()
{
if(sd_) {
StreamData *sd = sd_;
sd_ = 0;
sd->dconn = 0;
return sd;
} else {
return 0;
}
}
bool SpdyDownstreamConnection::get_output_buffer_full()
{
if(request_body_buf_) {
return
evbuffer_get_length(request_body_buf_) >= Downstream::OUTPUT_UPPER_THRES;
} else {
return false;
}
}
} // namespace shrpx } // namespace shrpx

View File

@ -35,28 +35,39 @@
namespace shrpx { namespace shrpx {
struct StreamData;
class SpdySession;
class SpdyDownstreamConnection : public DownstreamConnection { class SpdyDownstreamConnection : public DownstreamConnection {
public: public:
SpdyDownstreamConnection(ClientHandler *client_handler); SpdyDownstreamConnection(ClientHandler *client_handler);
virtual ~SpdyDownstreamConnection(); virtual ~SpdyDownstreamConnection();
virtual int attach_downstream(Downstream *downstream); virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
virtual int push_request_headers(); virtual int push_request_headers();
virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen); virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen);
virtual int end_upload_data(); virtual int end_upload_data();
virtual int on_connect(); virtual void pause_read(IOCtrlReason reason) {}
virtual bool resume_read(IOCtrlReason reason) { return true; }
virtual void force_resume_read() {}
int on_read(); virtual bool get_output_buffer_full();
int on_write();
virtual int on_read();
virtual int on_write();
int send(); int send();
int init_request_body_buf(); int init_request_body_buf();
evbuffer* get_request_body_buf() const; evbuffer* get_request_body_buf() const;
void attach_stream_data(StreamData *sd);
StreamData* detach_stream_data();
private: private:
SSL *ssl_; SpdySession *spdy_;
spdylay_session *session_;
evbuffer *request_body_buf_; evbuffer *request_body_buf_;
StreamData *sd_;
}; };
} // namespace shrpx } // namespace shrpx

822
src/shrpx_spdy_session.cc Normal file
View File

@ -0,0 +1,822 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_spdy_session.h"
#include <unistd.h>
#include <vector>
#include <openssl/err.h>
#include <event2/bufferevent_ssl.h>
#include "shrpx_upstream.h"
#include "shrpx_downstream.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
#include "shrpx_spdy_downstream_connection.h"
#include "shrpx_client_handler.h"
#include "util.h"
using namespace spdylay;
namespace shrpx {
SpdySession::SpdySession(event_base *evbase, SSL_CTX *ssl_ctx)
: evbase_(evbase),
ssl_ctx_(ssl_ctx),
ssl_(0),
session_(0),
bev_(0),
state_(DISCONNECTED),
notified_(false),
wrbev_(0),
rdbev_(0)
{}
SpdySession::~SpdySession()
{
disconnect();
}
int SpdySession::disconnect()
{
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy session disconnecting " << this;
}
spdylay_session_del(session_);
session_ = 0;
int fd = -1;
if(ssl_) {
fd = SSL_get_fd(ssl_);
SSL_shutdown(ssl_);
}
if(bev_) {
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
bev_ = 0;
}
if(ssl_) {
SSL_free(ssl_);
}
ssl_ = 0;
if(fd != -1) {
shutdown(fd, SHUT_WR);
close(fd);
}
notified_ = false;
state_ = DISCONNECTED;
// Delete all client handler associated to Downstream. When deleting
// SpdyDownstreamConnection, it calls this object's
// remove_downstream_connection(). So first dump them in vector and
// iterate and delete them.
std::vector<SpdyDownstreamConnection*> vec(dconns_.begin(), dconns_.end());
for(size_t i = 0; i < vec.size(); ++i) {
remove_downstream_connection(vec[i]);
delete vec[i]->get_client_handler();
}
dconns_.clear();
for(std::set<StreamData*>::iterator i = streams_.begin(),
eoi = streams_.end(); i != eoi; ++i) {
delete *i;
}
streams_.clear();
return 0;
}
namespace {
void notify_readcb(bufferevent *bev, void *arg)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(arg);
spdy->clear_notify();
switch(spdy->get_state()) {
case SpdySession::DISCONNECTED:
rv = spdy->initiate_connection();
if(rv != 0) {
LOG(FATAL) << "Downstream spdy could not initiate connection " << spdy;
DIE();
}
break;
case SpdySession::CONNECTED:
rv = spdy->send();
if(rv != 0) {
spdy->disconnect();
}
break;
}
}
} // namespace
namespace {
void notify_eventcb(bufferevent *bev, short events, void *arg)
{
// TODO should DIE()?
if(events & BEV_EVENT_EOF) {
LOG(ERROR) << "Connection to main thread lost: eof";
}
if(events & BEV_EVENT_ERROR) {
LOG(ERROR) << "Connection to main thread lost: network error";
}
}
} // namespace
int SpdySession::init_notification()
{
int rv;
int sockpair[2];
rv = socketpair(AF_UNIX, SOCK_STREAM, 0, sockpair);
if(rv == -1) {
LOG(FATAL) << "socketpair() failed: " << strerror(errno);
return -1;
}
wrbev_ = bufferevent_socket_new(evbase_, sockpair[0],
BEV_OPT_CLOSE_ON_FREE|
BEV_OPT_DEFER_CALLBACKS);
if(!wrbev_) {
LOG(FATAL) << "bufferevent_socket_new() failed";
for(int i = 0; i < 2; ++i) {
close(sockpair[i]);
}
return -1;
}
rdbev_ = bufferevent_socket_new(evbase_, sockpair[1],
BEV_OPT_CLOSE_ON_FREE|
BEV_OPT_DEFER_CALLBACKS);
if(!rdbev_) {
LOG(FATAL) << "bufferevent_socket_new() failed";
close(sockpair[1]);
return -1;
}
bufferevent_enable(rdbev_, EV_READ);
bufferevent_setcb(rdbev_, notify_readcb, 0, notify_eventcb, this);
return 0;
}
namespace {
void readcb(bufferevent *bev, void *ptr)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(ptr);
rv = spdy->on_read();
if(rv != 0) {
spdy->disconnect();
}
}
} // namespace
namespace {
void writecb(bufferevent *bev, void *ptr)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(ptr);
rv = spdy->on_write();
if(rv != 0) {
spdy->disconnect();
}
}
} // namespace
namespace {
void eventcb(bufferevent *bev, short events, void *ptr)
{
SpdySession *spdy = reinterpret_cast<SpdySession*>(ptr);
if(events & BEV_EVENT_CONNECTED) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy connection established. " << spdy;
}
spdy->connected();
if(spdy->on_connect() != 0) {
spdy->disconnect();
return;
}
} else if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy EOF. " << spdy;
}
spdy->disconnect();
} else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy error/timeout. " << spdy;
}
spdy->disconnect();
}
}
} // namespace
int SpdySession::initiate_connection()
{
int rv;
assert(state_ == DISCONNECTED);
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy initiating connection " << this;
}
ssl_ = SSL_new(ssl_ctx_);
if(!ssl_) {
LOG(ERROR) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), NULL);
return -1;
}
bev_ = bufferevent_openssl_socket_new(evbase_, -1, ssl_,
BUFFEREVENT_SSL_CONNECTING,
BEV_OPT_DEFER_CALLBACKS);
rv = bufferevent_socket_connect
(bev_,
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
if(rv != 0) {
bufferevent_free(bev_);
bev_ = 0;
return SHRPX_ERR_NETWORK;
}
if(ENABLE_LOG) {
LOG(INFO) << "Connecting to downstream " << this;
}
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_, readcb, writecb, eventcb, this);
// No timeout for SPDY session
state_ = CONNECTING;
return 0;
}
void SpdySession::connected()
{
state_ = CONNECTED;
}
void SpdySession::add_downstream_connection(SpdyDownstreamConnection *dconn)
{
dconns_.insert(dconn);
}
void SpdySession::remove_downstream_connection(SpdyDownstreamConnection *dconn)
{
dconns_.erase(dconn);
dconn->detach_stream_data();
}
void SpdySession::remove_stream_data(StreamData *sd)
{
streams_.erase(sd);
if(sd->dconn) {
sd->dconn->detach_stream_data();
}
delete sd;
}
int SpdySession::submit_request(SpdyDownstreamConnection *dconn,
uint8_t pri, const char **nv,
const spdylay_data_provider *data_prd)
{
assert(state_ == CONNECTED);
StreamData *sd = new StreamData();
int rv = spdylay_submit_request(session_, pri, nv, data_prd, sd);
if(rv == 0) {
dconn->attach_stream_data(sd);
streams_.insert(sd);
} else {
LOG(FATAL) << "spdylay_submit_request() failed: "
<< spdylay_strerror(rv);
delete sd;
return -1;
}
return 0;
}
int SpdySession::submit_rst_stream(SpdyDownstreamConnection *docnn,
int32_t stream_id, uint32_t status_code)
{
assert(state_ == CONNECTED);
int rv = spdylay_submit_rst_stream(session_, stream_id, status_code);
if(rv != 0) {
LOG(FATAL) << "spdylay_submit_rst_stream() failed: "
<< spdylay_strerror(rv);
return -1;
}
return 0;
}
int SpdySession::resume_data(SpdyDownstreamConnection *dconn)
{
assert(state_ == CONNECTED);
Downstream *downstream = dconn->get_downstream();
int rv = spdylay_session_resume_data(session_,
downstream->get_downstream_stream_id());
switch(rv) {
case 0:
case SPDYLAY_ERR_INVALID_ARGUMENT:
return 0;
default:
LOG(FATAL) << "spdylay_resume_session() failed: "
<< spdylay_strerror(rv);
return -1;
}
}
namespace {
void call_downstream_readcb(SpdySession *spdy, Downstream *downstream)
{
Upstream *upstream = downstream->get_upstream();
if(upstream) {
(upstream->get_downstream_readcb())
(spdy->get_bev(),
downstream->get_downstream_connection());
}
}
} // namespace
namespace {
ssize_t send_callback(spdylay_session *session,
const uint8_t *data, size_t len, int flags,
void *user_data)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
bufferevent *bev = spdy->get_bev();
evbuffer *output = bufferevent_get_output(bev);
// Check buffer length and return WOULDBLOCK if it is large enough.
if(evbuffer_get_length(output) > Downstream::OUTPUT_UPPER_THRES) {
return SPDYLAY_ERR_WOULDBLOCK;
}
rv = evbuffer_add(output, data, len);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return SPDYLAY_ERR_CALLBACK_FAILURE;
} else {
return len;
}
}
} // namespace
namespace {
ssize_t recv_callback(spdylay_session *session,
uint8_t *data, size_t len, int flags, void *user_data)
{
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
bufferevent *bev = spdy->get_bev();
evbuffer *input = bufferevent_get_input(bev);
int nread = evbuffer_remove(input, data, len);
if(nread == -1) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
} else if(nread == 0) {
return SPDYLAY_ERR_WOULDBLOCK;
} else {
return nread;
}
}
} // namespace
namespace {
void on_stream_close_callback
(spdylay_session *session, int32_t stream_id, spdylay_status_code status_code,
void *user_data)
{
int rv;
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy Stream " << stream_id << " is being closed";
}
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
StreamData *sd;
sd = reinterpret_cast<StreamData*>
(spdylay_session_get_stream_user_data(session, stream_id));
if(sd == 0) {
// We might get this close callback when pushed streams are
// closed.
return;
}
SpdyDownstreamConnection* dconn = sd->dconn;
if(dconn) {
Downstream *downstream = dconn->get_downstream();
if(downstream && downstream->get_downstream_stream_id() == stream_id) {
Upstream *upstream = downstream->get_upstream();
if(status_code == SPDYLAY_OK) {
downstream->set_response_state(Downstream::MSG_COMPLETE);
rv = upstream->on_downstream_body_complete(downstream);
if(rv != 0) {
downstream->set_response_state(Downstream::MSG_RESET);
}
} else {
downstream->set_response_state(Downstream::MSG_RESET);
}
call_downstream_readcb(spdy, downstream);
// dconn may be deleted
}
}
// The life time of StreamData ends here
spdy->remove_stream_data(sd);
}
} // namespace
namespace {
void on_ctrl_recv_callback
(spdylay_session *session, spdylay_frame_type type, spdylay_frame *frame,
void *user_data)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
StreamData *sd;
Downstream *downstream;
switch(type) {
case SPDYLAY_SYN_STREAM:
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
}
// We just respond pushed stream with RST_STREAM.
spdylay_submit_rst_stream(session, frame->syn_stream.stream_id,
SPDYLAY_REFUSED_STREAM);
break;
case SPDYLAY_RST_STREAM:
sd = reinterpret_cast<StreamData*>
(spdylay_session_get_stream_user_data(session,
frame->rst_stream.stream_id));
if(sd && sd->dconn) {
downstream = sd->dconn->get_downstream();
if(downstream &&
downstream->get_downstream_stream_id() ==
frame->rst_stream.stream_id) {
// If we got RST_STREAM, just flag MSG_RESET to indicate
// upstream connection must be terminated.
downstream->set_response_state(Downstream::MSG_RESET);
call_downstream_readcb(spdy, downstream);
}
}
break;
case SPDYLAY_SYN_REPLY: {
sd = reinterpret_cast<StreamData*>
(spdylay_session_get_stream_user_data(session,
frame->syn_reply.stream_id));
if(!sd || !sd->dconn) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_INTERNAL_ERROR);
break;
}
downstream = sd->dconn->get_downstream();
if(!downstream ||
downstream->get_downstream_stream_id() != frame->syn_reply.stream_id) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_INTERNAL_ERROR);
break;
}
char **nv = frame->syn_reply.nv;
const char *status = 0;
const char *version = 0;
const char *content_length = 0;
for(size_t i = 0; nv[i]; i += 2) {
if(strcmp(nv[i], ":status") == 0) {
unsigned int code = strtoul(nv[i+1], 0, 10);
downstream->set_response_http_status(code);
status = nv[i+1];
} else if(strcmp(nv[i], ":version") == 0) {
// We assume for now that most version is HTTP/1.1 from
// SPDY. So just check if it is HTTP/1.0 and then set response
// minor as so.
downstream->set_response_major(1);
if(util::strieq(nv[i+1], "HTTP/1.0")) {
downstream->set_response_minor(0);
} else {
downstream->set_response_minor(1);
}
version = nv[i+1];
} else if(nv[i][0] != ':') {
if(strcmp(nv[i], "content-length") == 0) {
content_length = nv[i+1];
}
downstream->add_response_header(nv[i], nv[i+1]);
}
}
if(!status || !version) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
call_downstream_readcb(spdy, downstream);
return;
}
if(!content_length && downstream->get_request_method() != "HEAD" &&
downstream->get_request_method() != "CONNECT") {
unsigned int status;
status = downstream->get_response_http_status();
if(!((100 <= status && status <= 199) || status == 204 ||
status == 304)) {
// In SPDY, we are supporsed not to receive
// transfer-encoding.
downstream->add_response_header("transfer-encoding", "chunked");
}
}
if(ENABLE_LOG) {
std::stringstream ss;
for(size_t i = 0; nv[i]; i += 2) {
ss << nv[i] << ": " << nv[i+1] << "\n";
}
LOG(INFO) << "Downstream spdy response headers id="
<< frame->syn_reply.stream_id
<< "\n" << ss.str();
}
Upstream *upstream = downstream->get_upstream();
downstream->set_response_state(Downstream::HEADER_COMPLETE);
rv = upstream->on_downstream_header_complete(downstream);
if(rv != 0) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
}
call_downstream_readcb(spdy, downstream);
break;
}
default:
break;
}
}
} // namespace
namespace {
void on_data_chunk_recv_callback(spdylay_session *session,
uint8_t flags, int32_t stream_id,
const uint8_t *data, size_t len,
void *user_data)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
StreamData *sd;
sd = reinterpret_cast<StreamData*>
(spdylay_session_get_stream_user_data(session, stream_id));
if(!sd || !sd->dconn) {
spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR);
return;
}
Downstream *downstream = sd->dconn->get_downstream();
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR);
return;
}
// TODO No manual flow control at the moment.
Upstream *upstream = downstream->get_upstream();
rv = upstream->on_downstream_body(downstream, data, len);
if(rv != 0) {
spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
}
call_downstream_readcb(spdy, downstream);
}
} // namespace
namespace {
void before_ctrl_send_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
void *user_data)
{
if(type == SPDYLAY_SYN_STREAM) {
StreamData *sd;
sd = reinterpret_cast<StreamData*>
(spdylay_session_get_stream_user_data(session,
frame->syn_stream.stream_id));
if(!sd || !sd->dconn) {
spdylay_submit_rst_stream(session, frame->syn_stream.stream_id,
SPDYLAY_CANCEL);
return;
}
Downstream *downstream = sd->dconn->get_downstream();
if(downstream) {
downstream->set_downstream_stream_id(frame->syn_stream.stream_id);
} else {
spdylay_submit_rst_stream(session, frame->syn_stream.stream_id,
SPDYLAY_CANCEL);
}
}
}
} // namespace
namespace {
void on_ctrl_not_send_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
int error_code, void *user_data)
{
LOG(WARNING) << "Failed to send control frame type=" << type << ", "
<< "error_code=" << error_code << ":"
<< spdylay_strerror(error_code);
if(type == SPDYLAY_SYN_STREAM) {
// To avoid stream hanging around, flag Downstream::MSG_RESET and
// terminate the upstream and downstream connections.
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
StreamData *sd;
sd = reinterpret_cast<StreamData*>
(spdylay_session_get_stream_user_data(session,
frame->syn_stream.stream_id));
if(!sd) {
return;
}
if(sd->dconn) {
Downstream *downstream = sd->dconn->get_downstream();
if(!downstream ||
downstream->get_downstream_stream_id() !=
frame->syn_stream.stream_id) {
return;
}
downstream->set_response_state(Downstream::MSG_RESET);
call_downstream_readcb(spdy, downstream);
}
spdy->remove_stream_data(sd);
}
}
} // namespace
namespace {
void on_ctrl_recv_parse_error_callback(spdylay_session *session,
spdylay_frame_type type,
const uint8_t *head, size_t headlen,
const uint8_t *payload,
size_t payloadlen, int error_code,
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "Failed to parse received control frame. type=" << type
<< ", error_code=" << error_code << ":"
<< spdylay_strerror(error_code);
}
}
} // namespace
namespace {
void on_unknown_ctrl_recv_callback(spdylay_session *session,
const uint8_t *head, size_t headlen,
const uint8_t *payload, size_t payloadlen,
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "Received unknown control frame.";
}
}
} // namespace
int SpdySession::on_connect()
{
int rv;
const unsigned char *next_proto = 0;
unsigned int next_proto_len;
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len);
if(ENABLE_LOG) {
std::string proto(next_proto, next_proto+next_proto_len);
LOG(INFO) << "Downstream negotiated next protocol: " << proto;
}
uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len);
if(!version) {
return -1;
}
spdylay_session_callbacks callbacks;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.send_callback = send_callback;
callbacks.recv_callback = recv_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
callbacks.before_ctrl_send_callback = before_ctrl_send_callback;
callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback;
callbacks.on_ctrl_recv_parse_error_callback =
on_ctrl_recv_parse_error_callback;
callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback;
rv = spdylay_session_client_new(&session_, version, &callbacks, this);
if(rv != 0) {
return -1;
}
// TODO Send initial window size when manual flow control is
// implemented.
spdylay_settings_entry entry[1];
entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[0].value = get_config()->spdy_max_concurrent_streams;
entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
rv = spdylay_submit_settings
(session_, SPDYLAY_FLAG_SETTINGS_NONE,
entry, sizeof(entry)/sizeof(spdylay_settings_entry));
if(rv != 0) {
return -1;
}
rv = send();
if(rv != 0) {
return -1;
}
// submit pending request
for(std::set<SpdyDownstreamConnection*>::iterator i = dconns_.begin(),
eoi = dconns_.end(); i != eoi; ++i) {
if((*i)->push_request_headers() != 0) {
return -1;
}
}
return 0;
}
int SpdySession::on_read()
{
int rv = 0;
if((rv = spdylay_session_recv(session_)) < 0) {
if(rv != SPDYLAY_ERR_EOF) {
LOG(ERROR) << "spdylay_session_recv() returned error: "
<< spdylay_strerror(rv);
}
} else if((rv = spdylay_session_send(session_)) < 0) {
LOG(ERROR) << "spdylay_session_send() returned error: "
<< spdylay_strerror(rv);
}
// if(rv == 0) {
// if(spdylay_session_want_read(session_) == 0 &&
// spdylay_session_want_write(session_) == 0) {
// if(ENABLE_LOG) {
// LOG(INFO) << "No more read/write for this SPDY session";
// }
// rv = -1;
// }
// }
return rv;
}
int SpdySession::on_write()
{
return send();
}
int SpdySession::send()
{
int rv = 0;
if((rv = spdylay_session_send(session_)) < 0) {
LOG(ERROR) << "spdylay_session_send() returned error: "
<< spdylay_strerror(rv);
}
// if(rv == 0) {
// if(spdylay_session_want_read(session_) == 0 &&
// spdylay_session_want_write(session_) == 0) {
// if(ENABLE_LOG) {
// LOG(INFO) << "No more read/write for this SPDY session";
// }
// rv = -1;
// }
// }
return rv;
}
void SpdySession::clear_notify()
{
evbuffer *input = bufferevent_get_output(rdbev_);
evbuffer_drain(input, evbuffer_get_length(input));
notified_ = false;
}
void SpdySession::notify()
{
if(!notified_) {
bufferevent_write(wrbev_, "1", 1);
notified_ = true;
}
}
bufferevent* SpdySession::get_bev() const
{
return bev_;
}
int SpdySession::get_state() const
{
return state_;
}
} // namespace shrpx

106
src/shrpx_spdy_session.h Normal file
View File

@ -0,0 +1,106 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_SPDY_SESSION_H
#define SHRPX_SPDY_SESSION_H
#include "shrpx.h"
#include <set>
#include <openssl/ssl.h>
#include <event.h>
#include <event2/bufferevent.h>
#include <spdylay/spdylay.h>
namespace shrpx {
class SpdyDownstreamConnection;
struct StreamData {
SpdyDownstreamConnection *dconn;
};
class SpdySession {
public:
SpdySession(event_base *evbase, SSL_CTX *ssl_ctx);
~SpdySession();
int init_notification();
int disconnect();
int initiate_connection();
void connected();
void add_downstream_connection(SpdyDownstreamConnection *dconn);
void remove_downstream_connection(SpdyDownstreamConnection *dconn);
void remove_stream_data(StreamData *sd);
int submit_request(SpdyDownstreamConnection *dconn,
uint8_t pri, const char **nv,
const spdylay_data_provider *data_prd);
int submit_rst_stream(SpdyDownstreamConnection *docnn,
int32_t stream_id, uint32_t status_code);
int resume_data(SpdyDownstreamConnection *dconn);
int on_connect();
int on_read();
int on_write();
int send();
void clear_notify();
void notify();
bufferevent* get_bev() const;
int get_state() const;
enum {
DISCONNECTED,
CONNECTING,
CONNECTED
};
private:
event_base *evbase_;
SSL_CTX *ssl_ctx_;
SSL *ssl_;
spdylay_session *session_;
bufferevent *bev_;
std::set<SpdyDownstreamConnection*> dconns_;
std::set<StreamData*> streams_;
int state_;
bool notified_;
bufferevent *wrbev_;
bufferevent *rdbev_;
};
} // namespace shrpx
#endif // SHRPX_SPDY_SESSION_H

View File

@ -460,7 +460,7 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr)
delete downstream; delete downstream;
return; return;
} }
int rv = downstream->parse_http_response(); int rv = downstream->on_read();
if(rv != 0) { if(rv != 0) {
if(ENABLE_LOG) { if(ENABLE_LOG) {
LOG(INFO) << "Downstream HTTP parser failure"; LOG(INFO) << "Downstream HTTP parser failure";
@ -846,7 +846,9 @@ int32_t SpdyUpstream::get_initial_window_size() const
void SpdyUpstream::pause_read(IOCtrlReason reason) void SpdyUpstream::pause_read(IOCtrlReason reason)
{} {}
void SpdyUpstream::resume_read(IOCtrlReason reason) int SpdyUpstream::resume_read(IOCtrlReason reason)
{} {
return 0;
}
} // namespace shrpx } // namespace shrpx

View File

@ -59,7 +59,7 @@ public:
int error_reply(Downstream *downstream, int status_code); int error_reply(Downstream *downstream, int status_code);
virtual void pause_read(IOCtrlReason reason); virtual void pause_read(IOCtrlReason reason);
virtual void resume_read(IOCtrlReason reason); virtual int resume_read(IOCtrlReason reason);
virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,

View File

@ -221,9 +221,6 @@ ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx,
BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS); BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS);
} }
ClientHandler *client_handler = new ClientHandler(bev, fd, ssl, host); ClientHandler *client_handler = new ClientHandler(bev, fd, ssl, host);
if(get_config()->client_mode) {
client_handler->set_ssl_client_ctx(ssl_ctx);
}
return client_handler; return client_handler;
} else { } else {
LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv); LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv);

View File

@ -29,11 +29,13 @@
#include "shrpx_ssl.h" #include "shrpx_ssl.h"
#include "shrpx_log.h" #include "shrpx_log.h"
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_spdy_session.h"
namespace shrpx { namespace shrpx {
ThreadEventReceiver::ThreadEventReceiver(SSL_CTX *ssl_ctx) ThreadEventReceiver::ThreadEventReceiver(SSL_CTX *ssl_ctx, SpdySession *spdy)
: ssl_ctx_(ssl_ctx) : ssl_ctx_(ssl_ctx),
spdy_(spdy)
{} {}
ThreadEventReceiver::~ThreadEventReceiver() ThreadEventReceiver::~ThreadEventReceiver()
@ -56,6 +58,7 @@ void ThreadEventReceiver::on_read(bufferevent *bev)
&wev.client_addr.sa, &wev.client_addr.sa,
wev.client_addrlen); wev.client_addrlen);
if(client_handler) { if(client_handler) {
client_handler->set_spdy_session(spdy_);
if(ENABLE_LOG) { if(ENABLE_LOG) {
LOG(INFO) << "ClientHandler " << client_handler << " created"; LOG(INFO) << "ClientHandler " << client_handler << " created";
} }

View File

@ -35,6 +35,8 @@
namespace shrpx { namespace shrpx {
class SpdySession;
struct WorkerEvent { struct WorkerEvent {
evutil_socket_t client_fd; evutil_socket_t client_fd;
sockaddr_union client_addr; sockaddr_union client_addr;
@ -43,11 +45,14 @@ struct WorkerEvent {
class ThreadEventReceiver { class ThreadEventReceiver {
public: public:
ThreadEventReceiver(SSL_CTX *ssl_ctx); ThreadEventReceiver(SSL_CTX *ssl_ctx, SpdySession *spdy);
~ThreadEventReceiver(); ~ThreadEventReceiver();
void on_read(bufferevent *bev); void on_read(bufferevent *bev);
private: private:
SSL_CTX *ssl_ctx_; SSL_CTX *ssl_ctx_;
// Shared SPDY session for each thread. NULL if not client mode. Not
// deleted by this object.
SpdySession *spdy_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -53,7 +53,7 @@ public:
virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual int on_downstream_body_complete(Downstream *downstream) = 0;
virtual void pause_read(IOCtrlReason reason) = 0; virtual void pause_read(IOCtrlReason reason) = 0;
virtual void resume_read(IOCtrlReason reason) = 0; virtual int resume_read(IOCtrlReason reason) = 0;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -33,6 +33,7 @@
#include "shrpx_ssl.h" #include "shrpx_ssl.h"
#include "shrpx_thread_event_receiver.h" #include "shrpx_thread_event_receiver.h"
#include "shrpx_log.h" #include "shrpx_log.h"
#include "shrpx_spdy_session.h"
namespace shrpx { namespace shrpx {
@ -72,7 +73,14 @@ void Worker::run()
event_base *evbase = event_base_new(); event_base *evbase = event_base_new();
bufferevent *bev = bufferevent_socket_new(evbase, fd_, bufferevent *bev = bufferevent_socket_new(evbase, fd_,
BEV_OPT_DEFER_CALLBACKS); BEV_OPT_DEFER_CALLBACKS);
ThreadEventReceiver *receiver = new ThreadEventReceiver(ssl_ctx_); SpdySession *spdy = 0;
if(get_config()->client_mode) {
spdy = new SpdySession(evbase, ssl_ctx_);
if(spdy->init_notification() == -1) {
DIE();
}
}
ThreadEventReceiver *receiver = new ThreadEventReceiver(ssl_ctx_, spdy);
bufferevent_enable(bev, EV_READ); bufferevent_enable(bev, EV_READ);
bufferevent_setcb(bev, readcb, 0, eventcb, receiver); bufferevent_setcb(bev, readcb, 0, eventcb, receiver);