Add --client-mode option

With --client-mode option, shrpx now accepts unencrypted HTTP
connections and communicates with backend server in SPDY.  In short,
this is the "reversed" operation mode against normal mode.  This may
be useful for testing purpose because it can sit between HTTP client
and shrpx "normal" mode.
This commit is contained in:
Tatsuhiro Tsujikawa 2012-11-18 21:23:13 +09:00
parent aa64a7f7f5
commit 026f4ca3a2
27 changed files with 1313 additions and 231 deletions

View File

@ -82,6 +82,8 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \
shrpx_downstream_queue.cc shrpx_downstream_queue.h \
shrpx_downstream.cc shrpx_downstream.h \
shrpx_downstream_connection.cc shrpx_downstream_connection.h \
shrpx_http_downstream_connection.cc shrpx_http_downstream_connection.h \
shrpx_spdy_downstream_connection.cc shrpx_spdy_downstream_connection.h \
shrpx_log.cc shrpx_log.h \
shrpx_http.cc shrpx_http.h \
shrpx_io_control.cc shrpx_io_control.h \

View File

@ -349,6 +349,7 @@ void fill_default_config()
mod_config()->backlog = 256;
mod_config()->ciphers = 0;
mod_config()->client_mode = false;
}
} // namespace
@ -359,6 +360,9 @@ void print_usage(std::ostream& out)
<< " [-c <NUM>] [-L <LEVEL>] [OPTIONS...]\n"
<< " <PRIVATE_KEY> <CERT>\n"
<< "\n"
<< " shrpx --client-mode [-Dh] [-b <HOST,PORT>] [-f <HOST,PORT>]\n"
<< " [-n <CORES>] [-c <NUM>] [-L <LEVEL>] [OPTIONS...]\n"
<< "\n"
<< "A reverse proxy for SPDY/HTTPS.\n"
<< std::endl;
}
@ -445,6 +449,10 @@ void print_help(std::ostream& out)
<< get_config()->backlog << "\n"
<< " --ciphers=<SUITE> Set allowed cipher list. The format of the\n"
<< " string is described in OpenSSL ciphers(1).\n"
<< " --client-mode Instead of accepting SPDY/HTTPS connection,\n"
<< " accept HTTP connection and communicate with\n"
<< " backend server in SPDY. This is for testing\n"
<< " purpose.\n"
<< " -h, --help Print this help.\n"
<< std::endl;
}
@ -483,6 +491,7 @@ int main(int argc, char **argv)
{"syslog-facility", required_argument, &flag, 14 },
{"backlog", required_argument, &flag, 15 },
{"ciphers", required_argument, &flag, 16 },
{"client-mode", no_argument, &flag, 17 },
{"help", no_argument, 0, 'h' },
{0, 0, 0, 0 }
};
@ -591,6 +600,10 @@ int main(int argc, char **argv)
// --ciphers
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_CIPHERS, optarg));
break;
case 17:
// --client-mode
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_CLIENT_MODE, "yes"));
break;
default:
break;
}
@ -608,12 +621,6 @@ int main(int argc, char **argv)
}
}
if((!get_config()->private_key_file || !get_config()->cert_file) &&
argc - optind < 2) {
print_usage(std::cerr);
LOG(FATAL) << "Too few arguments";
exit(EXIT_FAILURE);
}
if(argc - optind >= 2) {
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_PRIVATE_KEY_FILE,
argv[optind++]));
@ -628,6 +635,20 @@ int main(int argc, char **argv)
}
}
if(!get_config()->client_mode) {
if(!get_config()->private_key_file || !get_config()->cert_file) {
print_usage(std::cerr);
LOG(FATAL) << "Too few arguments";
exit(EXIT_FAILURE);
}
}
if(get_config()->spdy_proxy && get_config()->client_mode) {
LOG(FATAL) << "--spdy-proxy and --client-mode cannot be used "
<< "at the same time.";
exit(EXIT_FAILURE);
}
char hostport[NI_MAXHOST+16];
bool downstream_ipv6_addr =
is_ipv6_numeric_addr(get_config()->downstream_host);

View File

@ -31,7 +31,8 @@
#include "shrpx_spdy_upstream.h"
#include "shrpx_https_upstream.h"
#include "shrpx_config.h"
#include "shrpx_downstream_connection.h"
#include "shrpx_http_downstream_connection.h"
#include "shrpx_spdy_downstream_connection.h"
#include "shrpx_accesslog.h"
namespace shrpx {
@ -116,8 +117,11 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg)
}
} // namespace
ClientHandler::ClientHandler(bufferevent *bev, SSL *ssl, const char *ipaddr)
ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
const char *ipaddr)
: bev_(bev),
fd_(fd),
ssl_client_ctx_(0),
ssl_(ssl),
upstream_(0),
ipaddr_(ipaddr),
@ -127,7 +131,13 @@ ClientHandler::ClientHandler(bufferevent *bev, SSL *ssl, const char *ipaddr)
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
set_upstream_timeouts(&get_config()->upstream_read_timeout,
&get_config()->upstream_write_timeout);
set_bev_cb(0, upstream_writecb, upstream_eventcb);
if(ssl_) {
set_bev_cb(0, upstream_writecb, upstream_eventcb);
} else {
// For client-mode
upstream_ = new HttpsUpstream(this);
set_bev_cb(upstream_readcb, upstream_writecb, upstream_eventcb);
}
}
ClientHandler::~ClientHandler()
@ -135,13 +145,16 @@ ClientHandler::~ClientHandler()
if(ENABLE_LOG) {
LOG(INFO) << "Deleting ClientHandler " << this;
}
int fd = SSL_get_fd(ssl_);
SSL_shutdown(ssl_);
if(ssl_) {
SSL_shutdown(ssl_);
}
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
SSL_free(ssl_);
shutdown(fd, SHUT_WR);
close(fd);
if(ssl_) {
SSL_free(ssl_);
}
shutdown(fd_, SHUT_WR);
close(fd_);
delete upstream_;
for(std::set<DownstreamConnection*>::iterator i = dconn_pool_.begin();
i != dconn_pool_.end(); ++i) {
@ -186,8 +199,8 @@ int ClientHandler::validate_next_proto()
unsigned int next_proto_len;
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len);
if(next_proto) {
std::string proto(next_proto, next_proto+next_proto_len);
if(ENABLE_LOG) {
std::string proto(next_proto, next_proto+next_proto_len);
LOG(INFO) << "Upstream negotiated next protocol: " << proto;
}
uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len);
@ -257,7 +270,11 @@ DownstreamConnection* ClientHandler::get_downstream_connection()
if(ENABLE_LOG) {
LOG(INFO) << "Downstream connection pool is empty. Create new one";
}
return new DownstreamConnection(this);
if(get_config()->client_mode) {
return new SpdyDownstreamConnection(this);
} else {
return new HttpDownstreamConnection(this);
}
} else {
DownstreamConnection *dconn = *dconn_pool_.begin();
dconn_pool_.erase(dconn);
@ -280,4 +297,14 @@ SSL* ClientHandler::get_ssl() const
return ssl_;
}
void ClientHandler::set_ssl_client_ctx(SSL_CTX *ssl_ctx)
{
ssl_client_ctx_ = ssl_ctx;
}
SSL_CTX* ClientHandler::get_ssl_client_ctx() const
{
return ssl_client_ctx_;
}
} // namespace shrpx

View File

@ -39,7 +39,7 @@ class DownstreamConnection;
class ClientHandler {
public:
ClientHandler(bufferevent *bev, SSL *ssl, const char *ipaddr);
ClientHandler(bufferevent *bev, int fd, SSL *ssl, const char *ipaddr);
~ClientHandler();
int on_read();
int on_event();
@ -60,8 +60,13 @@ public:
DownstreamConnection* get_downstream_connection();
size_t get_pending_write_length();
SSL* get_ssl() const;
void set_ssl_client_ctx(SSL_CTX *ssl_ctx);
SSL_CTX* get_ssl_client_ctx() const;
private:
bufferevent *bev_;
int fd_;
// SSL_CTX for SSL object to connect backend SPDY server
SSL_CTX *ssl_client_ctx_;
SSL *ssl_;
Upstream *upstream_;
std::string ipaddr_;

View File

@ -68,6 +68,7 @@ const char SHRPX_OPT_SYSLOG[] = "syslog";
const char SHRPX_OPT_SYSLOG_FACILITY[] = "syslog-facility";
const char SHRPX_OPT_BACKLOG[] = "backlog";
const char SHRPX_OPT_CIPHERS[] = "ciphers";
const char SHRPX_OPT_CLIENT_MODE[] = "client-mode";
Config::Config()
: verbose(false),
@ -96,7 +97,8 @@ Config::Config()
syslog_facility(0),
use_syslog(false),
backlog(0),
ciphers(0)
ciphers(0),
client_mode(false)
{}
namespace {
@ -246,6 +248,8 @@ int parse_config(const char *opt, const char *optarg)
mod_config()->backlog = strtol(optarg, 0, 10);
} else if(util::strieq(opt, SHRPX_OPT_CIPHERS)) {
set_config_str(&mod_config()->ciphers, optarg);
} else if(util::strieq(opt, SHRPX_OPT_CLIENT_MODE)) {
mod_config()->client_mode = util::strieq(optarg, "yes");
} else if(util::strieq(opt, "conf")) {
LOG(WARNING) << "conf is ignored";
} else {

View File

@ -60,6 +60,7 @@ extern const char SHRPX_OPT_SYSLOG[];
extern const char SHRPX_OPT_SYSLOG_FACILITY[];
extern const char SHRPX_OPT_BACKLOG[];
extern const char SHRPX_OPT_CIPHERS[];
extern const char SHRPX_OPT_CLIENT_MODE[];
union sockaddr_union {
sockaddr sa;
@ -104,6 +105,7 @@ struct Config {
bool use_syslog;
int backlog;
char *ciphers;
bool client_mode;
Config();
};

View File

@ -30,7 +30,6 @@
#include "shrpx_client_handler.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
#include "shrpx_http.h"
#include "shrpx_downstream_connection.h"
#include "util.h"
@ -44,6 +43,7 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
stream_id_(stream_id),
priority_(priority),
ioctrl_(0),
downstream_stream_id_(-1),
request_state_(INITIAL),
request_major_(1),
request_minor_(1),
@ -156,6 +156,11 @@ void check_connection_close(bool *connection_close,
}
} // namespace
const Headers& Downstream::get_request_headers() const
{
return request_headers_;
}
void Downstream::add_request_header(const std::string& name,
const std::string& value)
{
@ -277,10 +282,6 @@ bool Downstream::get_expect_100_continue() const
return request_expect_100_continue_;
}
namespace {
const size_t DOWNSTREAM_OUTPUT_UPPER_THRES = 64*1024;
} // namespace
bool Downstream::get_output_buffer_full()
{
if(dconn_) {
@ -296,86 +297,7 @@ bool Downstream::get_output_buffer_full()
// Downstream. Otherwise, the program will crash.
int Downstream::push_request_headers()
{
std::string hdrs = request_method_;
hdrs += " ";
hdrs += request_path_;
hdrs += " ";
hdrs += "HTTP/1.1\r\n";
std::string via_value;
std::string xff_value;
for(Headers::const_iterator i = request_headers_.begin();
i != request_headers_.end(); ++i) {
if(util::strieq((*i).first.c_str(), "X-Forwarded-Proto") ||
util::strieq((*i).first.c_str(), "keep-alive") ||
util::strieq((*i).first.c_str(), "connection") ||
util::strieq((*i).first.c_str(), "proxy-connection")) {
continue;
}
if(util::strieq((*i).first.c_str(), "via")) {
via_value = (*i).second;
continue;
}
if(util::strieq((*i).first.c_str(), "x-forwarded-for")) {
xff_value = (*i).second;
continue;
}
if(util::strieq((*i).first.c_str(), "expect") &&
util::strifind((*i).second.c_str(), "100-continue")) {
continue;
}
hdrs += (*i).first;
hdrs += ": ";
hdrs += (*i).second;
hdrs += "\r\n";
}
if(request_connection_close_) {
hdrs += "Connection: close\r\n";
}
if(get_config()->add_x_forwarded_for) {
hdrs += "X-Forwarded-For: ";
if(!xff_value.empty()) {
hdrs += xff_value;
hdrs += ", ";
}
hdrs += upstream_->get_client_handler()->get_ipaddr();
hdrs += "\r\n";
} else if(!xff_value.empty()) {
hdrs += "X-Forwarded-For: ";
hdrs += xff_value;
hdrs += "\r\n";
}
if(request_method_ != "CONNECT") {
hdrs += "X-Forwarded-Proto: ";
if(util::istartsWith(request_path_, "http:")) {
hdrs += "http";
} else {
hdrs += "https";
}
hdrs += "\r\n";
}
hdrs += "Via: ";
hdrs += via_value;
if(!via_value.empty()) {
hdrs += ", ";
}
hdrs += http::create_via_header_value(request_major_, request_minor_);
hdrs += "\r\n";
hdrs += "\r\n";
if(ENABLE_LOG) {
LOG(INFO) << "Downstream request headers id="
<< stream_id_ << "\n" << hdrs;
}
bufferevent *bev = dconn_->get_bev();
evbuffer *output = bufferevent_get_output(bev);
int rv;
rv = evbuffer_add(output, hdrs.c_str(), hdrs.size());
if(rv != 0) {
return -1;
}
dconn_->start_waiting_response();
return 0;
return dconn_->push_request_headers();
}
int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen)
@ -386,49 +308,12 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen)
LOG(WARNING) << "dconn_ is NULL";
return 0;
}
ssize_t res = 0;
int rv;
bufferevent *bev = dconn_->get_bev();
evbuffer *output = bufferevent_get_output(bev);
if(chunked_request_) {
char chunk_size_hex[16];
rv = snprintf(chunk_size_hex, sizeof(chunk_size_hex), "%X\r\n",
static_cast<unsigned int>(datalen));
res += rv;
rv = evbuffer_add(output, chunk_size_hex, rv);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
}
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
res += rv;
if(chunked_request_) {
rv = evbuffer_add(output, "\r\n", 2);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
res += 2;
}
return res;
return dconn_->push_upload_data_chunk(data, datalen);
}
int Downstream::end_upload_data()
{
if(chunked_request_) {
bufferevent *bev = dconn_->get_bev();
evbuffer *output = bufferevent_get_output(bev);
if(evbuffer_add(output, "0\r\n\r\n", 5) != 0) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
}
return 0;
return dconn_->end_upload_data();
}
const Headers& Downstream::get_response_headers() const
@ -441,6 +326,8 @@ void Downstream::add_response_header(const std::string& name,
{
response_header_key_prev_ = true;
response_headers_.push_back(std::make_pair(name, value));
check_transfer_encoding_chunked(&chunked_response_,
response_headers_.back());
}
void Downstream::set_last_response_header_value(const std::string& value)
@ -512,6 +399,11 @@ bool Downstream::get_chunked_response() const
return chunked_response_;
}
void Downstream::set_chunked_response(bool f)
{
chunked_response_ = f;
}
bool Downstream::get_response_connection_close() const
{
return response_connection_close_;
@ -697,4 +589,14 @@ bool Downstream::tunnel_established() const
200 <= response_http_status_ && response_http_status_ < 300;
}
void Downstream::set_downstream_stream_id(int32_t stream_id)
{
downstream_stream_id_ = stream_id;
}
int32_t Downstream::get_downstream_stream_id() const
{
return downstream_stream_id_;
}
} // namespace shrpx

View File

@ -58,6 +58,10 @@ public:
void pause_read(IOCtrlReason reason);
bool resume_read(IOCtrlReason reason);
void force_resume_read();
// Set stream ID for downstream SPDY connection.
void set_downstream_stream_id(int32_t stream_id);
int32_t get_downstream_stream_id() const;
void set_downstream_connection(DownstreamConnection *dconn);
DownstreamConnection* get_downstream_connection();
// Returns true if output buffer is full. If underlying dconn_ is
@ -99,7 +103,8 @@ public:
MSG_COMPLETE,
STREAM_CLOSED,
CONNECT_FAIL,
IDLE
IDLE,
MSG_RESET
};
void set_request_state(int state);
int get_request_state() const;
@ -120,6 +125,7 @@ public:
int get_response_minor() const;
int get_response_version() const;
bool get_chunked_response() const;
void set_chunked_response(bool f);
bool get_response_connection_close() const;
void set_response_connection_close(bool f);
int parse_http_response();
@ -127,12 +133,17 @@ public:
int get_response_state() const;
int init_response_body_buf();
evbuffer* get_response_body_buf();
static const size_t DOWNSTREAM_OUTPUT_UPPER_THRES = 64*1024;
private:
Upstream *upstream_;
DownstreamConnection *dconn_;
int32_t stream_id_;
int priority_;
IOControl ioctrl_;
// stream ID in backend connection
int32_t downstream_stream_id_;
int request_state_;
std::string request_method_;
std::string request_path_;

View File

@ -32,19 +32,11 @@
namespace shrpx {
// Workaround for the inability for Bufferevent to remove timeout from
// bufferevent. Specify this long timeout instead of removing.
namespace {
timeval max_timeout = { 86400, 0 };
} // namespace
DownstreamConnection::DownstreamConnection(ClientHandler *client_handler)
: client_handler_(client_handler),
bev_(0),
downstream_(0)
{
}
{}
DownstreamConnection::~DownstreamConnection()
{
@ -59,48 +51,6 @@ DownstreamConnection::~DownstreamConnection()
}
}
int DownstreamConnection::attach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Attaching downstream connection " << this << " to "
<< "downstream " << downstream;
}
Upstream *upstream = downstream->get_upstream();
if(!bev_) {
event_base *evbase = client_handler_->get_evbase();
bev_ = bufferevent_socket_new
(evbase, -1,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
int rv = bufferevent_socket_connect
(bev_,
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
if(rv != 0) {
bufferevent_free(bev_);
bev_ = 0;
return SHRPX_ERR_NETWORK;
}
if(ENABLE_LOG) {
LOG(INFO) << "Connecting to downstream server " << this;
}
}
downstream->set_downstream_connection(this);
downstream_ = downstream;
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_,
upstream->get_downstream_readcb(),
upstream->get_downstream_writecb(),
upstream->get_downstream_eventcb(), this);
// HTTP request/response model, we first issue request to downstream
// server, so just enable write timeout here.
bufferevent_set_timeouts(bev_,
&max_timeout,
&get_config()->downstream_write_timeout);
return 0;
}
// When downstream request is issued, call this function to set read
// timeout. We don't know when the request is completely received by
// the downstream server. This function may be called before that

View File

@ -38,16 +38,21 @@ class Downstream;
class DownstreamConnection {
public:
DownstreamConnection(ClientHandler *client_handler);
~DownstreamConnection();
int attach_downstream(Downstream *downstream);
virtual ~DownstreamConnection();
virtual int attach_downstream(Downstream *downstream) = 0;
void detach_downstream(Downstream *downstream);
bufferevent* get_bev();
int push_data(const void *data, size_t len);
virtual int push_request_headers() = 0;
virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen) = 0;
virtual int end_upload_data() = 0;
virtual int on_connect() = 0;
ClientHandler* get_client_handler();
Downstream* get_downstream();
void start_waiting_response();
private:
protected:
ClientHandler *client_handler_;
bufferevent *bev_;
Downstream *downstream_;

View File

@ -129,6 +129,18 @@ std::string modify_location_header_value(const std::string& uri)
return uri;
}
void capitalize(std::string& s, size_t offset)
{
s[offset] = util::upcase(s[offset]);
for(size_t i = offset+1, eoi = s.size(); i < eoi; ++i) {
if(s[i-1] == '-') {
s[i] = util::upcase(s[i]);
} else {
s[i] = util::lowcase(s[i]);
}
}
}
} // namespace http
} // namespace shrpx

View File

@ -39,6 +39,8 @@ std::string create_via_header_value(int major, int minor);
std::string modify_location_header_value(const std::string& uri);
void capitalize(std::string& s, size_t offset);
} // namespace http
} // namespace shrpx

View File

@ -0,0 +1,231 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_http_downstream_connection.h"
#include "shrpx_client_handler.h"
#include "shrpx_upstream.h"
#include "shrpx_downstream.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
#include "shrpx_http.h"
#include "util.h"
using namespace spdylay;
namespace shrpx {
// Workaround for the inability for Bufferevent to remove timeout from
// bufferevent. Specify this long timeout instead of removing.
namespace {
timeval max_timeout = { 86400, 0 };
} // namespace
HttpDownstreamConnection::HttpDownstreamConnection
(ClientHandler *client_handler)
: DownstreamConnection(client_handler)
{}
HttpDownstreamConnection::~HttpDownstreamConnection()
{}
int HttpDownstreamConnection::attach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Attaching downstream connection " << this << " to "
<< "downstream " << downstream;
}
Upstream *upstream = downstream->get_upstream();
if(!bev_) {
event_base *evbase = client_handler_->get_evbase();
bev_ = bufferevent_socket_new
(evbase, -1,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
int rv = bufferevent_socket_connect
(bev_,
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
if(rv != 0) {
bufferevent_free(bev_);
bev_ = 0;
return SHRPX_ERR_NETWORK;
}
if(ENABLE_LOG) {
LOG(INFO) << "Connecting to downstream server " << this;
}
}
downstream->set_downstream_connection(this);
downstream_ = downstream;
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_,
upstream->get_downstream_readcb(),
upstream->get_downstream_writecb(),
upstream->get_downstream_eventcb(), this);
// HTTP request/response model, we first issue request to downstream
// server, so just enable write timeout here.
bufferevent_set_timeouts(bev_,
&max_timeout,
&get_config()->downstream_write_timeout);
return 0;
}
int HttpDownstreamConnection::push_request_headers()
{
std::string hdrs = downstream_->get_request_method();
hdrs += " ";
hdrs += downstream_->get_request_path();
hdrs += " ";
hdrs += "HTTP/1.1\r\n";
std::string via_value;
std::string xff_value;
const Headers& request_headers = downstream_->get_request_headers();
for(Headers::const_iterator i = request_headers.begin();
i != request_headers.end(); ++i) {
if(util::strieq((*i).first.c_str(), "X-Forwarded-Proto") ||
util::strieq((*i).first.c_str(), "keep-alive") ||
util::strieq((*i).first.c_str(), "connection") ||
util::strieq((*i).first.c_str(), "proxy-connection")) {
continue;
}
if(util::strieq((*i).first.c_str(), "via")) {
via_value = (*i).second;
continue;
}
if(util::strieq((*i).first.c_str(), "x-forwarded-for")) {
xff_value = (*i).second;
continue;
}
if(util::strieq((*i).first.c_str(), "expect") &&
util::strifind((*i).second.c_str(), "100-continue")) {
continue;
}
hdrs += (*i).first;
hdrs += ": ";
hdrs += (*i).second;
hdrs += "\r\n";
}
if(downstream_->get_request_connection_close()) {
hdrs += "Connection: close\r\n";
}
if(get_config()->add_x_forwarded_for) {
hdrs += "X-Forwarded-For: ";
if(!xff_value.empty()) {
hdrs += xff_value;
hdrs += ", ";
}
hdrs += downstream_->get_upstream()->get_client_handler()->get_ipaddr();
hdrs += "\r\n";
} else if(!xff_value.empty()) {
hdrs += "X-Forwarded-For: ";
hdrs += xff_value;
hdrs += "\r\n";
}
if(downstream_->get_request_method() != "CONNECT") {
hdrs += "X-Forwarded-Proto: ";
if(util::istartsWith(downstream_->get_request_path(), "http:")) {
hdrs += "http";
} else {
hdrs += "https";
}
hdrs += "\r\n";
}
hdrs += "Via: ";
hdrs += via_value;
if(!via_value.empty()) {
hdrs += ", ";
}
hdrs += http::create_via_header_value(downstream_->get_request_major(),
downstream_->get_request_minor());
hdrs += "\r\n";
hdrs += "\r\n";
if(ENABLE_LOG) {
LOG(INFO) << "Downstream request headers id="
<< downstream_->get_stream_id() << "\n" << hdrs;
}
evbuffer *output = bufferevent_get_output(bev_);
int rv;
rv = evbuffer_add(output, hdrs.c_str(), hdrs.size());
if(rv != 0) {
return -1;
}
start_waiting_response();
return 0;
}
int HttpDownstreamConnection::push_upload_data_chunk
(const uint8_t *data, size_t datalen)
{
ssize_t res = 0;
int rv;
int chunked = downstream_->get_chunked_request();
evbuffer *output = bufferevent_get_output(bev_);
if(chunked) {
char chunk_size_hex[16];
rv = snprintf(chunk_size_hex, sizeof(chunk_size_hex), "%X\r\n",
static_cast<unsigned int>(datalen));
res += rv;
rv = evbuffer_add(output, chunk_size_hex, rv);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
}
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
res += rv;
if(chunked) {
rv = evbuffer_add(output, "\r\n", 2);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
res += 2;
}
return res;
}
int HttpDownstreamConnection::end_upload_data()
{
if(downstream_->get_chunked_request()) {
evbuffer *output = bufferevent_get_output(bev_);
if(evbuffer_add(output, "0\r\n\r\n", 5) != 0) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
}
return 0;
}
int HttpDownstreamConnection::on_connect()
{
return 0;
}
} // namespace shrpx

View File

@ -0,0 +1,49 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_HTTP_DOWNSTREAM_CONNECTION_H
#define SHRPX_HTTP_DOWNSTREAM_CONNECTION_H
#include "shrpx.h"
#include "shrpx_downstream_connection.h"
namespace shrpx {
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(ClientHandler *client_handler);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual int push_request_headers();
virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen);
virtual int end_upload_data();
virtual int on_connect();
};
} // namespace shrpx
#endif // SHRPX_HTTP_DOWNSTREAM_CONNECTION_H

View File

@ -30,6 +30,7 @@
#include "shrpx_client_handler.h"
#include "shrpx_downstream.h"
#include "shrpx_downstream_connection.h"
#include "shrpx_spdy_downstream_connection.h"
#include "shrpx_http.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
@ -248,7 +249,7 @@ int HttpsUpstream::on_read()
} else {
assert(downstream->get_request_state() == Downstream::MSG_COMPLETE);
if(downstream->get_downstream_connection() == 0) {
// Error response already be sent
// Error response has already be sent
assert(downstream->get_response_state() == Downstream::MSG_COMPLETE);
delete_downstream();
} else {
@ -287,10 +288,6 @@ int HttpsUpstream::on_read()
return 0;
}
namespace {
void https_downstream_readcb(bufferevent *bev, void *ptr);
} // namespace
int HttpsUpstream::on_write()
{
Downstream *downstream = get_downstream();
@ -332,7 +329,12 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
Downstream *downstream = dconn->get_downstream();
HttpsUpstream *upstream;
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
int rv = downstream->parse_http_response();
int rv;
if(get_config()->client_mode) {
rv = reinterpret_cast<SpdyDownstreamConnection*>(dconn)->on_read();
} else {
rv = downstream->parse_http_response();
}
if(rv == 0) {
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(downstream->get_response_connection_close()) {
@ -372,6 +374,8 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
// We already sent HTTP response headers to upstream
// client. Just close the upstream connection.
delete upstream->get_client_handler();
} else if(downstream->get_response_state() == Downstream::MSG_RESET) {
delete upstream->get_client_handler();
} else {
// We did not sent any HTTP response, so sent error
// response. Cannot reuse downstream connection in this case.
@ -397,6 +401,14 @@ void https_downstream_writecb(bufferevent *bev, void *ptr)
HttpsUpstream *upstream;
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
upstream->resume_read(SHRPX_NO_BUFFER);
if(get_config()->client_mode) {
int rv;
rv = reinterpret_cast<SpdyDownstreamConnection*>(dconn)->on_write();
if(rv != 0) {
delete upstream->get_client_handler();
return;
}
}
}
} // namespace
@ -412,6 +424,11 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
LOG(INFO) << "Downstream connection established. downstream "
<< downstream;
}
if(dconn->on_connect() != 0) {
// TODO Return error status 502
delete upstream->get_client_handler();
return;
}
} else if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "Downstream EOF. stream_id="
@ -555,6 +572,7 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)
via_value = (*i).second;
} else {
hdrs += (*i).first;
http::capitalize(hdrs, hdrs.size()-(*i).first.size());
hdrs += ": ";
hdrs += (*i).second;
hdrs += "\r\n";

View File

@ -34,7 +34,6 @@ extern "C" {
}
#include "shrpx_upstream.h"
#include "shrpx_io_control.h"
namespace shrpx {
@ -57,8 +56,8 @@ public:
Downstream* get_downstream() const;
int error_reply(int status_code);
void pause_read(IOCtrlReason reason);
void resume_read(IOCtrlReason reason);
virtual void pause_read(IOCtrlReason reason);
virtual void resume_read(IOCtrlReason reason);
virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream,

View File

@ -35,12 +35,14 @@
#include "shrpx_thread_event_receiver.h"
#include "shrpx_ssl.h"
#include "shrpx_worker.h"
#include "shrpx_config.h"
namespace shrpx {
ListenHandler::ListenHandler(event_base *evbase)
: evbase_(evbase),
ssl_ctx_(ssl::create_ssl_context()),
ssl_ctx_(get_config()->client_mode ?
ssl::create_ssl_client_context() : ssl::create_ssl_context()),
worker_round_robin_cnt_(0),
workers_(0),
num_worker_(0)

View File

@ -51,6 +51,8 @@ public:
event_base* get_evbase() const;
private:
event_base *evbase_;
// In client-mode, this is for backend SPDY connection. Otherwise,
// frontend SSL/TLS connection.
SSL_CTX *ssl_ctx_;
unsigned int worker_round_robin_cnt_;
WorkerInfo *workers_;

View File

@ -0,0 +1,695 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_spdy_downstream_connection.h"
#include <unistd.h>
#include <openssl/err.h>
#include <event2/bufferevent_ssl.h>
#include "shrpx_client_handler.h"
#include "shrpx_upstream.h"
#include "shrpx_downstream.h"
#include "shrpx_config.h"
#include "shrpx_error.h"
#include "shrpx_http.h"
#include "util.h"
using namespace spdylay;
namespace shrpx {
SpdyDownstreamConnection::SpdyDownstreamConnection
(ClientHandler *client_handler)
: DownstreamConnection(client_handler),
ssl_(0),
session_(0),
request_body_buf_(0)
{}
SpdyDownstreamConnection::~SpdyDownstreamConnection()
{
spdylay_session_del(session_);
int fd = -1;
if(ssl_) {
fd = SSL_get_fd(ssl_);
SSL_shutdown(ssl_);
}
if(bev_) {
// We want to deallocate bev_ between SSL_shutdown and
// SSL_free. This might not be necessary for recent libevent.
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
bev_ = 0;
}
if(ssl_) {
SSL_free(ssl_);
}
if(fd != -1) {
shutdown(fd, SHUT_WR);
close(fd);
}
if(request_body_buf_) {
evbuffer_free(request_body_buf_);
}
}
namespace {
void body_buf_cb(evbuffer *body, size_t oldlen, size_t newlen, void *arg)
{
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(arg);
if(newlen == 0) {
Downstream *downstream = dconn->get_downstream();
if(downstream) {
downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER);
}
}
}
} // namespace
int SpdyDownstreamConnection::init_request_body_buf()
{
int rv;
if(request_body_buf_) {
rv = evbuffer_drain(request_body_buf_,
evbuffer_get_length(request_body_buf_));
if(rv != 0) {
return -1;
}
} else {
request_body_buf_ = evbuffer_new();
if(request_body_buf_ == 0) {
return -1;
}
evbuffer_setcb(request_body_buf_, body_buf_cb, this);
}
return 0;
}
int SpdyDownstreamConnection::attach_downstream(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "Attaching downstream connection " << this << " to "
<< "downstream " << downstream;
}
if(init_request_body_buf() == -1) {
return -1;
}
Upstream *upstream = downstream->get_upstream();
if(!bev_) {
event_base *evbase = client_handler_->get_evbase();
ssl_ = SSL_new(client_handler_->get_ssl_client_ctx());
if(!ssl_) {
LOG(ERROR) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), NULL);
return -1;
}
bev_ = bufferevent_openssl_socket_new(evbase, -1, ssl_,
BUFFEREVENT_SSL_CONNECTING,
BEV_OPT_DEFER_CALLBACKS);
int rv = bufferevent_socket_connect
(bev_,
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
if(rv != 0) {
bufferevent_free(bev_);
bev_ = 0;
return SHRPX_ERR_NETWORK;
}
if(ENABLE_LOG) {
LOG(INFO) << "Connecting to downstream server " << this;
}
}
downstream->set_downstream_connection(this);
downstream_ = downstream;
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_,
upstream->get_downstream_readcb(),
upstream->get_downstream_writecb(),
upstream->get_downstream_eventcb(), this);
bufferevent_set_timeouts(bev_,
&get_config()->downstream_read_timeout,
&get_config()->downstream_write_timeout);
return 0;
}
namespace {
ssize_t spdy_data_read_callback(spdylay_session *session,
int32_t stream_id,
uint8_t *buf, size_t length,
int *eof,
spdylay_data_source *source,
void *user_data)
{
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(source->ptr);
Downstream *downstream = dconn->get_downstream();
if(!downstream) {
// In this case, RST_STREAM should have been issued. But depending
// on the priority, DATA frame may come first.
return SPDYLAY_ERR_DEFERRED;
}
evbuffer *body = dconn->get_request_body_buf();
int nread = evbuffer_remove(body, buf, length);
if(nread == 0 &&
downstream->get_request_state() == Downstream::MSG_COMPLETE) {
*eof = 1;
}
if(nread == 0 && *eof != 1) {
return SPDYLAY_ERR_DEFERRED;
}
return nread;
}
} // namespace
int SpdyDownstreamConnection::push_request_headers()
{
int rv;
if(!session_) {
// If the connection to the backend has not been established,
// session_ is not initialized. This function will be called again
// just after SSL/TLS handshake is done.
return 0;
}
if(!downstream_) {
return 0;
}
size_t nheader = downstream_->get_request_headers().size();
// 10 means :method, :scheme, :path, :version and possible via
// header field. We rename host header field as :host.
const char **nv = new const char*[nheader * 2 + 10 + 1];
size_t hdidx = 0;
std::string via_value;
nv[hdidx++] = ":method";
nv[hdidx++] = downstream_->get_request_method().c_str();
nv[hdidx++] = ":scheme";
nv[hdidx++] = "https";
nv[hdidx++] = ":path";
nv[hdidx++] = downstream_->get_request_path().c_str();
nv[hdidx++] = ":version";
nv[hdidx++] = "HTTP/1.1";
bool chunked_encoding = false;
bool content_length = false;
for(Headers::const_iterator i = downstream_->get_request_headers().begin();
i != downstream_->get_request_headers().end(); ++i) {
if(util::strieq((*i).first.c_str(), "transfer-encoding")) {
if(util::strieq((*i).second.c_str(), "chunked")) {
chunked_encoding = true;
}
// Ignore transfer-encoding
} else if(util::strieq((*i).first.c_str(), "keep-alive") || // HTTP/1.0?
util::strieq((*i).first.c_str(), "connection") ||
util:: strieq((*i).first.c_str(), "proxy-connection")) {
// These are ignored
} else if(util::strieq((*i).first.c_str(), "via")) {
via_value = (*i).second;
} else if(util::strieq((*i).first.c_str(), "host")) {
nv[hdidx++] = ":host";
nv[hdidx++] = (*i).second.c_str();
} else {
if(util::strieq((*i).first.c_str(), "content-length")) {
content_length = true;
}
nv[hdidx++] = (*i).first.c_str();
nv[hdidx++] = (*i).second.c_str();
}
}
if(!via_value.empty()) {
via_value += ", ";
}
via_value += http::create_via_header_value(downstream_->get_request_major(),
downstream_->get_request_minor());
nv[hdidx++] = "via";
nv[hdidx++] = via_value.c_str();
nv[hdidx++] = 0;
if(ENABLE_LOG) {
std::stringstream ss;
for(size_t i = 0; nv[i]; i += 2) {
ss << nv[i] << ": " << nv[i+1] << "\n";
}
LOG(INFO) << "Downstream spdy request headers id="
<< downstream_->get_stream_id() << "\n"
<< ss.str();
}
if(chunked_encoding || content_length) {
// Request-body is expected.
spdylay_data_provider data_prd;
data_prd.source.ptr = this;
data_prd.read_callback = spdy_data_read_callback;
rv = spdylay_submit_request(session_, 0, nv, &data_prd, 0);
} else {
rv = spdylay_submit_request(session_, 0, nv, 0, 0);
}
delete [] nv;
if(rv != 0) {
LOG(FATAL) << "spdylay_submit_request() failed";
return -1;
}
return send();
}
int SpdyDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
size_t datalen)
{
int rv = evbuffer_add(request_body_buf_, data, datalen);
if(rv != 0) {
LOG(FATAL) << "evbuffer_add() failed";
return -1;
}
if(downstream_->get_downstream_stream_id() != -1) {
spdylay_session_resume_data(session_,
downstream_->get_downstream_stream_id());
}
size_t bodylen = evbuffer_get_length(request_body_buf_);
if(bodylen > Downstream::DOWNSTREAM_OUTPUT_UPPER_THRES) {
downstream_->get_upstream()->pause_read(SHRPX_NO_BUFFER);
}
return 0;
}
int SpdyDownstreamConnection::end_upload_data()
{
if(downstream_->get_downstream_stream_id() != -1) {
spdylay_session_resume_data(session_,
downstream_->get_downstream_stream_id());
}
return 0;
}
namespace {
ssize_t send_callback(spdylay_session *session,
const uint8_t *data, size_t len, int flags,
void *user_data)
{
int rv;
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
bufferevent *bev = dconn->get_bev();
evbuffer *output = bufferevent_get_output(bev);
// Check buffer length and return WOULDBLOCK if it is large enough.
if(evbuffer_get_length(output) > Downstream::DOWNSTREAM_OUTPUT_UPPER_THRES) {
return SPDYLAY_ERR_WOULDBLOCK;
}
rv = evbuffer_add(output, data, len);
if(rv == -1) {
LOG(FATAL) << "evbuffer_add() failed";
return SPDYLAY_ERR_CALLBACK_FAILURE;
} else {
return len;
}
}
} // namespace
namespace {
ssize_t recv_callback(spdylay_session *session,
uint8_t *data, size_t len, int flags, void *user_data)
{
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
bufferevent *bev = dconn->get_bev();
evbuffer *input = bufferevent_get_input(bev);
int nread = evbuffer_remove(input, data, len);
if(nread == -1) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
} else if(nread == 0) {
return SPDYLAY_ERR_WOULDBLOCK;
} else {
return nread;
}
}
} // namespace
namespace {
void on_stream_close_callback
(spdylay_session *session, int32_t stream_id, spdylay_status_code status_code,
void *user_data)
{
int rv;
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy Stream " << stream_id << " is being closed";
}
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
// We might get this close callback when pushed streams are
// closed.
return;
}
downstream->set_response_state(Downstream::MSG_COMPLETE);
rv = downstream->get_upstream()->on_downstream_body_complete(downstream);
if(rv != 0) {
downstream->set_response_state(Downstream::MSG_RESET);
return;
}
}
} // namespace
namespace {
void on_ctrl_recv_callback
(spdylay_session *session, spdylay_frame_type type, spdylay_frame *frame,
void *user_data)
{
int rv;
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
switch(type) {
case SPDYLAY_SYN_STREAM:
if(ENABLE_LOG) {
LOG(INFO) << "Downstream spdy received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
}
// We just respond pushed stream with RST_STREAM.
spdylay_submit_rst_stream(session, frame->syn_stream.stream_id,
SPDYLAY_REFUSED_STREAM);
break;
case SPDYLAY_RST_STREAM:
if(downstream &&
downstream->get_downstream_stream_id() == frame->rst_stream.stream_id) {
// If we got RST_STREAM, just flag MSG_RESET to indicate
// upstream connection must be terminated.
downstream->set_response_state(Downstream::MSG_RESET);
}
break;
case SPDYLAY_SYN_REPLY: {
if(!downstream ||
downstream->get_downstream_stream_id() != frame->syn_reply.stream_id) {
break;
}
char **nv = frame->syn_reply.nv;
const char *status = 0;
const char *version = 0;
const char *content_length = 0;
for(size_t i = 0; nv[i]; i += 2) {
if(strcmp(nv[i], ":status") == 0) {
unsigned int code = strtoul(nv[i+1], 0, 10);
downstream->set_response_http_status(code);
status = nv[i+1];
} else if(strcmp(nv[i], ":version") == 0) {
// We assume for now that most version is HTTP/1.1 from
// SPDY. So just check if it is HTTP/1.0 and then set response
// minor as so.
downstream->set_response_major(1);
if(util::strieq(nv[i+1], "HTTP/1.0")) {
downstream->set_response_minor(0);
} else {
downstream->set_response_minor(1);
}
version = nv[i+1];
} else if(nv[i][0] != ':') {
if(strcmp(nv[i], "content-length") == 0) {
content_length = nv[i+1];
}
downstream->add_response_header(nv[i], nv[i+1]);
}
}
if(!status || !version) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
return;
}
if(!content_length && downstream->get_request_method() != "HEAD") {
unsigned int status;
status = downstream->get_response_http_status();
if(!((100 <= status && status <= 199) || status == 204 ||
status == 304)) {
// In SPDY, we are supporsed not to receive
// transfer-encoding.
downstream->add_response_header("transfer-encoding", "chunked");
}
}
if(ENABLE_LOG) {
std::stringstream ss;
for(size_t i = 0; nv[i]; i += 2) {
ss << nv[i] << ": " << nv[i+1] << "\n";
}
LOG(INFO) << "Downstream spdy response headers id="
<< frame->syn_reply.stream_id
<< "\n" << ss.str();
}
Upstream *upstream = downstream->get_upstream();
downstream->set_response_state(Downstream::HEADER_COMPLETE);
rv = upstream->on_downstream_header_complete(downstream);
if(rv != 0) {
spdylay_submit_rst_stream(session, frame->syn_reply.stream_id,
SPDYLAY_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
return;
}
break;
}
default:
break;
}
}
} // namespace
namespace {
void on_data_chunk_recv_callback(spdylay_session *session,
uint8_t flags, int32_t stream_id,
const uint8_t *data, size_t len,
void *user_data)
{
int rv;
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
return;
}
// TODO No manual flow control at the moment.
Upstream *upstream = downstream->get_upstream();
rv = upstream->on_downstream_body(downstream, data, len);
if(rv != 0) {
spdylay_submit_rst_stream(session, stream_id, SPDYLAY_INTERNAL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
}
}
} // namespace
namespace {
void before_ctrl_send_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
void *user_data)
{
if(type == SPDYLAY_SYN_STREAM) {
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
if(downstream) {
downstream->set_downstream_stream_id(frame->syn_stream.stream_id);
}
}
}
} // namespace
namespace {
void on_ctrl_not_send_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
int error_code, void *user_data)
{
LOG(WARNING) << "Failed to send control frame type=" << type << ", "
<< "error_code=" << error_code << ":"
<< spdylay_strerror(error_code);
if(type == SPDYLAY_SYN_STREAM) {
// To avoid stream hanging around, flag Downstream::MSG_RESET and
// terminate the upstream and downstream connections.
SpdyDownstreamConnection *dconn;
dconn = reinterpret_cast<SpdyDownstreamConnection*>(user_data);
Downstream *downstream = dconn->get_downstream();
int32_t stream_id = frame->syn_stream.stream_id;
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
return;
}
downstream->set_response_state(Downstream::MSG_RESET);
}
}
} // namespace
namespace {
void on_ctrl_recv_parse_error_callback(spdylay_session *session,
spdylay_frame_type type,
const uint8_t *head, size_t headlen,
const uint8_t *payload,
size_t payloadlen, int error_code,
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "Failed to parse received control frame. type=" << type
<< ", error_code=" << error_code << ":"
<< spdylay_strerror(error_code);
}
}
} // namespace
namespace {
void on_unknown_ctrl_recv_callback(spdylay_session *session,
const uint8_t *head, size_t headlen,
const uint8_t *payload, size_t payloadlen,
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "Received unknown control frame.";
}
}
} // namespace
int SpdyDownstreamConnection::on_connect()
{
int rv;
const unsigned char *next_proto = 0;
unsigned int next_proto_len;
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len);
if(ENABLE_LOG) {
std::string proto(next_proto, next_proto+next_proto_len);
LOG(INFO) << "Downstream negotiated next protocol: " << proto;
}
uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len);
if(!version) {
return -1;
}
spdylay_session_callbacks callbacks;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.send_callback = send_callback;
callbacks.recv_callback = recv_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
callbacks.before_ctrl_send_callback = before_ctrl_send_callback;
callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback;
callbacks.on_ctrl_recv_parse_error_callback =
on_ctrl_recv_parse_error_callback;
callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback;
rv = spdylay_session_client_new(&session_, version, &callbacks, this);
if(rv != 0) {
return -1;
}
// TODO Send initial window size when manual flow control is
// implemented.
spdylay_settings_entry entry[1];
entry[0].settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[0].value = get_config()->spdy_max_concurrent_streams;
entry[0].flags = SPDYLAY_ID_FLAG_SETTINGS_NONE;
rv = spdylay_submit_settings
(session_, SPDYLAY_FLAG_SETTINGS_NONE,
entry, sizeof(entry)/sizeof(spdylay_settings_entry));
if(rv != 0) {
return -1;
}
rv = send();
if(rv != 0) {
return -1;
}
// We may have pending request
push_request_headers();
return 0;
}
int SpdyDownstreamConnection::on_read()
{
int rv = 0;
if((rv = spdylay_session_recv(session_)) < 0) {
if(rv != SPDYLAY_ERR_EOF) {
LOG(ERROR) << "spdylay_session_recv() returned error: "
<< spdylay_strerror(rv);
}
} else if((rv = spdylay_session_send(session_)) < 0) {
LOG(ERROR) << "spdylay_session_send() returned error: "
<< spdylay_strerror(rv);
}
// if(rv == 0) {
// if(spdylay_session_want_read(session_) == 0 &&
// spdylay_session_want_write(session_) == 0) {
// if(ENABLE_LOG) {
// LOG(INFO) << "No more read/write for this SPDY session";
// }
// rv = -1;
// }
// }
if(rv == SPDYLAY_ERR_EOF) {
if(downstream_) {
downstream_->set_response_connection_close(true);
}
rv = 0;
} else if(rv != 0) {
if(downstream_) {
downstream_->set_response_state(Downstream::MSG_RESET);
}
}
return rv;
}
int SpdyDownstreamConnection::on_write()
{
return send();
}
int SpdyDownstreamConnection::send()
{
int rv = 0;
if((rv = spdylay_session_send(session_)) < 0) {
LOG(ERROR) << "spdylay_session_send() returned error: "
<< spdylay_strerror(rv);
}
// if(rv == 0) {
// if(spdylay_session_want_read(session_) == 0 &&
// spdylay_session_want_write(session_) == 0) {
// if(ENABLE_LOG) {
// LOG(INFO) << "No more read/write for this SPDY session";
// }
// rv = -1;
// }
// }
return rv;
}
evbuffer* SpdyDownstreamConnection::get_request_body_buf() const
{
return request_body_buf_;
}
} // namespace shrpx

View File

@ -0,0 +1,64 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_SPDY_DOWNSTREAM_CONNECTION_H
#define SHRPX_SPDY_DOWNSTREAM_CONNECTION_H
#include "shrpx.h"
#include <openssl/ssl.h>
#include <spdylay/spdylay.h>
#include "shrpx_downstream_connection.h"
namespace shrpx {
class SpdyDownstreamConnection : public DownstreamConnection {
public:
SpdyDownstreamConnection(ClientHandler *client_handler);
virtual ~SpdyDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual int push_request_headers();
virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen);
virtual int end_upload_data();
virtual int on_connect();
int on_read();
int on_write();
int send();
int init_request_body_buf();
evbuffer* get_request_body_buf() const;
private:
SSL *ssl_;
spdylay_session *session_;
evbuffer *request_body_buf_;
};
} // namespace shrpx
#endif // SHRPX_SPDY_DOWNSTREAM_CONNECTION_H

View File

@ -793,11 +793,11 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream)
int rv;
rv = spdylay_submit_response(session_, downstream->get_stream_id(), nv,
&data_prd);
delete [] nv;
if(rv != 0) {
LOG(FATAL) << "spdylay_submit_response() failed";
return -1;
}
delete [] nv;
return 0;
}
@ -843,4 +843,10 @@ int32_t SpdyUpstream::get_initial_window_size() const
return initial_window_size_;
}
void SpdyUpstream::pause_read(IOCtrlReason reason)
{}
void SpdyUpstream::resume_read(IOCtrlReason reason)
{}
} // namespace shrpx

View File

@ -58,6 +58,9 @@ public:
int window_update(Downstream *downstream);
int error_reply(Downstream *downstream, int status_code);
virtual void pause_read(IOCtrlReason reason);
virtual void resume_read(IOCtrlReason reason);
virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len);

View File

@ -34,6 +34,8 @@
#include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h>
#include <spdylay/spdylay.h>
#include "shrpx_log.h"
#include "shrpx_client_handler.h"
#include "shrpx_config.h"
@ -141,6 +143,48 @@ SSL_CTX* create_ssl_context()
return ssl_ctx;
}
namespace {
int select_next_proto_cb(SSL* ssl,
unsigned char **out, unsigned char *outlen,
const unsigned char *in, unsigned int inlen,
void *arg)
{
if(spdylay_select_next_protocol(out, outlen, in, inlen) <= 0) {
*out = (unsigned char*)"SPDY/3";
*outlen = 6;
}
return SSL_TLSEXT_ERR_OK;
}
} // namespace
SSL_CTX* create_ssl_client_context()
{
SSL_CTX *ssl_ctx;
ssl_ctx = SSL_CTX_new(SSLv23_client_method());
if(!ssl_ctx) {
LOG(FATAL) << ERR_error_string(ERR_get_error(), 0);
DIE();
}
SSL_CTX_set_options(ssl_ctx,
SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_COMPRESSION |
SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION);
if(get_config()->ciphers) {
if(SSL_CTX_set_cipher_list(ssl_ctx, get_config()->ciphers) == 0) {
LOG(FATAL) << "SSL_CTX_set_cipher_list failed: "
<< ERR_error_string(ERR_get_error(), NULL);
DIE();
}
}
SSL_CTX_set_mode(ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE);
SSL_CTX_set_mode(ssl_ctx, SSL_MODE_AUTO_RETRY);
SSL_CTX_set_mode(ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
SSL_CTX_set_next_proto_select_cb(ssl_ctx, select_next_proto_cb, 0);
return ssl_ctx;
}
ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen)
@ -149,12 +193,6 @@ ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx,
int rv;
rv = getnameinfo(addr, addrlen, host, sizeof(host), 0, 0, NI_NUMERICHOST);
if(rv == 0) {
SSL *ssl = SSL_new(ssl_ctx);
if(!ssl) {
LOG(ERROR) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), NULL);
return 0;
}
int val = 1;
rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *>(&val), sizeof(val));
@ -162,11 +200,25 @@ ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx,
LOG(WARNING) << "Setting option TCP_NODELAY failed: "
<< strerror(errno);
}
bufferevent *bev = bufferevent_openssl_socket_new
(evbase, fd, ssl,
BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS);
ClientHandler *client_handler = new ClientHandler(bev, ssl, host);
SSL *ssl = 0;
bufferevent *bev;
if(get_config()->client_mode) {
bev = bufferevent_socket_new(evbase, fd, BEV_OPT_DEFER_CALLBACKS);
} else {
ssl = SSL_new(ssl_ctx);
if(!ssl) {
LOG(ERROR) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), NULL);
return 0;
}
bev = bufferevent_openssl_socket_new
(evbase, fd, ssl,
BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS);
}
ClientHandler *client_handler = new ClientHandler(bev, fd, ssl, host);
if(get_config()->client_mode) {
client_handler->set_ssl_client_ctx(ssl_ctx);
}
return client_handler;
} else {
LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv);

View File

@ -40,6 +40,8 @@ namespace ssl {
SSL_CTX* create_ssl_context();
SSL_CTX* create_ssl_client_context();
ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen);

View File

@ -29,6 +29,8 @@
#include <event2/bufferevent.h>
#include "shrpx_io_control.h"
namespace shrpx {
class ClientHandler;
@ -49,6 +51,9 @@ public:
virtual int on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len) = 0;
virtual int on_downstream_body_complete(Downstream *downstream) = 0;
virtual void pause_read(IOCtrlReason reason) = 0;
virtual void resume_read(IOCtrlReason reason) = 0;
};
} // namespace shrpx

View File

@ -119,17 +119,6 @@ time_t parse_http_date(const std::string& s)
return timegm(&tm);
}
namespace {
char lowcase(char c)
{
if('A' <= c && c <= 'Z') {
return c+('a'-'A');
} else {
return c;
}
}
} // namespace
bool startsWith(const std::string& a, const std::string& b)
{
return startsWith(a.begin(), a.end(), b.begin(), b.end());
@ -184,6 +173,24 @@ bool strifind(const char *a, const char *b)
return false;
}
char upcase(char c)
{
if('a' <= c && c <= 'z') {
return c-'a'+'A';
} else {
return c;
}
}
char lowcase(char c)
{
if('A' <= c && c <= 'Z') {
return c-'A'+'a';
} else {
return c;
}
}
} // namespace util
} // namespace spdylay

View File

@ -239,6 +239,10 @@ bool strieq(const char *a, const char *b);
bool strifind(const char *a, const char *b);
char upcase(char c);
char lowcase(char c);
} // namespace util
} // namespace spdylay