Added multi thread support

This commit is contained in:
Tatsuhiro Tsujikawa 2012-06-06 01:26:04 +09:00
parent c0b564abe4
commit 8f1c49e75c
16 changed files with 592 additions and 181 deletions

View File

@ -78,6 +78,9 @@ shrpx_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} \
shrpx_log.cc shrpx_log.h \
shrpx_http.cc shrpx_http.h \
shrpx_io_control.cc shrpx_io_control.h \
shrpx_ssl.cc shrpx_ssl.h \
shrpx_thread_event_receiver.cc shrpx_thread_event_receiver.h \
shrpx_worker.cc shrpx_worker.h \
htparse/htparse.c htparse/htparse.h
noinst_PROGRAMS = spdycli

View File

@ -30,9 +30,7 @@
#include <netdb.h>
#include <signal.h>
#include <cstdlib>
#include <iostream>
#include <openssl/ssl.h>
#include <openssl/err.h>
@ -46,82 +44,6 @@
namespace shrpx {
namespace {
std::pair<unsigned char*, size_t> next_proto;
unsigned char proto_list[23];
} // namespace
namespace {
int next_proto_cb(SSL *s, const unsigned char **data, unsigned int *len,
void *arg)
{
std::pair<unsigned char*, size_t> *next_proto =
reinterpret_cast<std::pair<unsigned char*, size_t>* >(arg);
*data = next_proto->first;
*len = next_proto->second;
return SSL_TLSEXT_ERR_OK;
}
} // namespace
namespace {
int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
{
// We don't verify the client certificate. Just request it for the
// testing purpose.
return 1;
}
} // namespace
namespace {
SSL_CTX* create_ssl_ctx()
{
// TODO lock function
SSL_CTX *ssl_ctx;
ssl_ctx = SSL_CTX_new(SSLv23_server_method());
if(!ssl_ctx) {
std::cerr << ERR_error_string(ERR_get_error(), 0) << std::endl;
return NULL;
}
SSL_CTX_set_options(ssl_ctx,
SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_COMPRESSION);
SSL_CTX_set_mode(ssl_ctx, SSL_MODE_AUTO_RETRY);
SSL_CTX_set_mode(ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
if(SSL_CTX_use_PrivateKey_file(ssl_ctx,
get_config()->private_key_file,
SSL_FILETYPE_PEM) != 1) {
std::cerr << "SSL_CTX_use_PrivateKey_file failed." << std::endl;
return NULL;
}
if(SSL_CTX_use_certificate_file(ssl_ctx, get_config()->cert_file,
SSL_FILETYPE_PEM) != 1) {
std::cerr << "SSL_CTX_use_certificate_file failed." << std::endl;
return NULL;
}
if(SSL_CTX_check_private_key(ssl_ctx) != 1) {
std::cerr << "SSL_CTX_check_private_key failed." << std::endl;
return NULL;
}
if(get_config()->verify_client) {
SSL_CTX_set_verify(ssl_ctx,
SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE |
SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
verify_callback);
}
// We speaks "http/1.1", "spdy/2" and "spdy/3".
proto_list[0] = 6;
memcpy(&proto_list[1], "spdy/3", 6);
proto_list[7] = 6;
memcpy(&proto_list[8], "spdy/2", 6);
proto_list[14] = 8;
memcpy(&proto_list[15], "http/1.1", 8);
next_proto.first = proto_list;
next_proto.second = sizeof(proto_list);
SSL_CTX_set_next_protos_advertised_cb(ssl_ctx, next_proto_cb, &next_proto);
return ssl_ctx;
}
} // namespace
namespace {
void ssl_acceptcb(evconnlistener *listener, int fd,
sockaddr *addr, int addrlen, void *arg)
@ -144,45 +66,20 @@ int cache_downstream_host_address()
#ifdef AI_ADDRCONFIG
hints.ai_flags |= AI_ADDRCONFIG;
#endif // AI_ADDRCONFIG
addrinfo *res, *rp;
addrinfo *res;
rv = getaddrinfo(get_config()->downstream_host, service, &hints, &res);
if(rv != 0) {
LOG(ERROR) << "getaddrinfo: " << gai_strerror(rv);
return -1;
}
for(rp = res; rp; rp = rp->ai_next) {
int fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if(fd == -1) {
continue;
}
rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
close(fd);
if(rv == -1) {
continue;
}
break;
}
if(rp == 0 && res) {
LOG(INFO) << "Using first returned address for downstream "
<< get_config()->downstream_host
<< ", port "
<< get_config()->downstream_port;
rp = res;
}
if(rp != 0) {
memcpy(&mod_config()->downstream_addr, rp->ai_addr, rp->ai_addrlen);
mod_config()->downstream_addrlen = rp->ai_addrlen;
LOG(FATAL) << "Unable to get downstream address: " << gai_strerror(rv);
DIE();
}
LOG(INFO) << "Using first returned address for downstream "
<< get_config()->downstream_host
<< ", port "
<< get_config()->downstream_port;
memcpy(&mod_config()->downstream_addr, res->ai_addr, res->ai_addrlen);
mod_config()->downstream_addrlen = res->ai_addrlen;
freeaddrinfo(res);
if(rp == 0) {
LOG(ERROR) << "No usable address found for downstream "
<< get_config()->downstream_host
<< ", port "
<< get_config()->downstream_port;
return -1;
} else {
return 0;
}
return 0;
}
} // namespace
@ -245,12 +142,11 @@ evconnlistener* create_evlistener(ListenHandler *handler)
namespace {
int event_loop()
{
SSL_CTX *ssl_ctx = create_ssl_ctx();
if(ssl_ctx == NULL) {
return -1;
}
event_base *evbase = event_base_new();
ListenHandler *listener_handler = new ListenHandler(evbase, ssl_ctx);
ListenHandler *listener_handler = new ListenHandler(evbase);
if(get_config()->num_worker > 1) {
listener_handler->create_worker_thread(get_config()->num_worker);
}
evconnlistener *evlistener = create_evlistener(listener_handler);
if(evlistener == NULL) {
return -1;
@ -261,7 +157,6 @@ int event_loop()
event_base_loop(evbase, 0);
evconnlistener_free(evlistener);
SSL_CTX_free(ssl_ctx);
return 0;
}
} // namespace
@ -311,6 +206,9 @@ int main(int argc, char **argv)
if(cache_downstream_host_address() == -1) {
exit(EXIT_FAILURE);
}
mod_config()->num_worker = 4;
event_loop();
return 0;
}

View File

@ -29,6 +29,8 @@
# include <config.h>
#endif // HAVE_CONFIG_H
#include <cassert>
#include "shrpx_log.h"
#define DIE() \

View File

@ -49,6 +49,7 @@ namespace {
void upstream_writecb(bufferevent *bev, void *arg)
{
ClientHandler *handler = reinterpret_cast<ClientHandler*>(arg);
// We actually depend on write low-warter mark == 0.
if(handler->get_should_close_after_write()) {
delete handler;
} else {
@ -65,19 +66,19 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg)
bool finish = false;
if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "<upstream> SSL/TLS handshake EOF";
LOG(INFO) << "Upstream handshake EOF";
}
finish = true;
}
if(events & BEV_EVENT_ERROR) {
if(ENABLE_LOG) {
LOG(INFO) << "<upstream> SSL/TLS network error";
LOG(INFO) << "Upstream network error";
}
finish = true;
}
if(events & BEV_EVENT_TIMEOUT) {
if(ENABLE_LOG) {
LOG(INFO) << "SPDY upstream SSL/TLS time out";
LOG(INFO) << "Upstream time out";
}
finish = true;
}
@ -86,8 +87,7 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg)
} else {
if(events & BEV_EVENT_CONNECTED) {
if(ENABLE_LOG) {
LOG(INFO) << "Connected Handler "
<< handler;
LOG(INFO) << "Upstream connected. handler " << handler;
}
handler->set_bev_cb(upstream_readcb, upstream_writecb, upstream_eventcb);
handler->validate_next_proto();
@ -167,7 +167,7 @@ int ClientHandler::validate_next_proto()
if(next_proto) {
std::string proto(next_proto, next_proto+next_proto_len);
if(ENABLE_LOG) {
LOG(INFO) << "<upstream> The negotiated next protocol: " << proto;
LOG(INFO) << "Upstream negotiated next protocol: " << proto;
}
uint16_t version = spdylay_npn_get_version(next_proto, next_proto_len);
if(version) {
@ -177,9 +177,12 @@ int ClientHandler::validate_next_proto()
}
} else {
if(ENABLE_LOG) {
LOG(INFO) << "<upstream> No proto negotiated";
LOG(INFO) << "No proto negotiated.";
}
}
if(ENABLE_LOG) {
LOG(INFO) << "Use HTTP/1.1";
}
HttpsUpstream *https_upstream = new HttpsUpstream(this);
upstream_ = https_upstream;
return 0;

View File

@ -38,7 +38,8 @@ Config::Config()
downstream_host(0),
downstream_port(0),
downstream_hostport(0),
downstream_addrlen(0)
downstream_addrlen(0),
num_worker(0)
{}
namespace {

View File

@ -64,6 +64,7 @@ struct Config {
timeval spdy_upstream_write_timeout;
timeval downstream_read_timeout;
timeval downstream_write_timeout;
size_t num_worker;
Config();
};

View File

@ -71,7 +71,7 @@ namespace {
int htp_msg_begin(htparser *htp)
{
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<https> request start";
LOG(INFO) << "Upstream https request start";
}
HttpsUpstream *upstream;
upstream = reinterpret_cast<HttpsUpstream*>(htparser_get_userdata(htp));
@ -108,7 +108,7 @@ namespace {
int htp_hdrs_begincb(htparser *htp)
{
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<https> request headers start";
LOG(INFO) << "Upstream https request headers start";
}
HttpsUpstream *upstream;
upstream = reinterpret_cast<HttpsUpstream*>(htparser_get_userdata(htp));
@ -148,7 +148,7 @@ namespace {
int htp_hdrs_completecb(htparser *htp)
{
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<https> request headers complete";
LOG(INFO) << "Upstream https request headers complete";
}
HttpsUpstream *upstream;
upstream = reinterpret_cast<HttpsUpstream*>(htparser_get_userdata(htp));
@ -178,7 +178,7 @@ namespace {
int htp_msg_completecb(htparser *htp)
{
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<https> request complete";
LOG(INFO) << "Upstream https request complete";
}
HttpsUpstream *upstream;
upstream = reinterpret_cast<HttpsUpstream*>(htparser_get_userdata(htp));
@ -237,7 +237,7 @@ int HttpsUpstream::on_read()
}
} else if(htperr != htparse_error_none) {
if(ENABLE_LOG) {
LOG(INFO) << "<upstream> http parse failure: "
LOG(INFO) << "Upstream http parse failure: "
<< htparser_get_strerror(htp_);
}
get_client_handler()->set_should_close_after_write(true);
@ -328,18 +328,19 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
if(events & BEV_EVENT_CONNECTED) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> Connection established. " << downstream;
LOG(INFO) << "Downstream connection established. downstream "
<< downstream;
}
}
if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> EOF stream_id="
LOG(INFO) << "Downstream EOF. stream_id="
<< downstream->get_stream_id();
}
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// Server may indicate the end of the request by EOF
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> Assuming content-length is 0 byte";
LOG(INFO) << "Assuming downstream content-length is 0 byte";
}
upstream->on_downstream_body_complete(downstream);
//downstream->set_response_state(Downstream::MSG_COMPLETE);
@ -348,7 +349,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
} else {
// error
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> Treated as error";
LOG(INFO) << "Treated as downstream error";
}
upstream->error_reply(502);
}
@ -357,7 +358,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
upstream->resume_read(SHRPX_MSG_BLOCK);
} else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> error/timeout. " << downstream;
LOG(INFO) << "Downstream error/timeout. " << downstream;
}
if(downstream->get_response_state() == Downstream::INITIAL) {
int status;
@ -439,7 +440,7 @@ Downstream* HttpsUpstream::get_last_downstream()
int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> on_downstream_header_complete";
LOG(INFO) << "Downstream on_downstream_header_complete";
}
std::string hdrs = "HTTP/1.1 ";
hdrs += http::get_status_string(downstream->get_response_http_status());
@ -467,7 +468,7 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)
}
hdrs += "\r\n";
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<https> Response headers\n" << hdrs;
LOG(INFO) << "Upstream https response headers\n" << hdrs;
}
evbuffer *output = bufferevent_get_output(handler_->get_bev());
evbuffer_add(output, hdrs.c_str(), hdrs.size());
@ -496,7 +497,7 @@ int HttpsUpstream::on_downstream_body_complete(Downstream *downstream)
evbuffer_add(output, "0\r\n\r\n", 5);
}
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> on_downstream_body_complete";
LOG(INFO) << "Downstream on_downstream_body_complete";
}
if(downstream->get_request_connection_close()) {
ClientHandler *handler = downstream->get_upstream()->get_client_handler();

View File

@ -24,48 +24,82 @@
*/
#include "shrpx_listen_handler.h"
#include <pthread.h>
#include <cerrno>
#include <event2/bufferevent_ssl.h>
#include "shrpx_client_handler.h"
#include "shrpx_thread_event_receiver.h"
#include "shrpx_ssl.h"
#include "shrpx_worker.h"
namespace shrpx {
ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *ssl_ctx)
ListenHandler::ListenHandler(event_base *evbase)
: evbase_(evbase),
ssl_ctx_(ssl_ctx)
ssl_ctx_(ssl::create_ssl_context()),
worker_round_robin_cnt_(0),
workers_(0),
num_worker_(0)
{}
ListenHandler::~ListenHandler()
{}
void ListenHandler::create_worker_thread(size_t num)
{
workers_ = new WorkerInfo[num];
num_worker_ = 0;
for(size_t i = 0; i < num; ++i) {
int rv;
pthread_t thread;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
WorkerInfo *info = &workers_[num_worker_];
rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info->sv);
if(rv == -1) {
LOG(ERROR) << "socketpair() failed: " << strerror(errno);
continue;
}
rv = pthread_create(&thread, &attr, start_threaded_worker, &info->sv[1]);
if(rv != 0) {
LOG(ERROR) << "pthread_create() failed: " << strerror(rv);
for(size_t j = 0; j < 2; ++j) {
close(info->sv[j]);
}
continue;
}
bufferevent *bev = bufferevent_socket_new(evbase_, info->sv[0],
BEV_OPT_DEFER_CALLBACKS);
info->bev = bev;
if(ENABLE_LOG) {
LOG(INFO) << "Created thread#" << num_worker_;
}
++num_worker_;
}
}
int ListenHandler::accept_connection(evutil_socket_t fd,
sockaddr *addr, int addrlen)
{
if(ENABLE_LOG) {
LOG(INFO) << "<listener> Accepted connection. fd=" << fd;
}
char host[NI_MAXHOST];
int rv;
rv = getnameinfo(addr, addrlen, host, sizeof(host), 0, 0, NI_NUMERICHOST);
if(rv == 0) {
SSL *ssl = SSL_new(ssl_ctx_);
bufferevent *bev = bufferevent_openssl_socket_new
(evbase_, fd, ssl,
BUFFEREVENT_SSL_ACCEPTING,
BEV_OPT_DEFER_CALLBACKS);
if(bev == NULL) {
if(ENABLE_LOG) {
LOG(ERROR) << "<listener> bufferevent_openssl_socket_new failed";
}
close(fd);
} else {
/*ClientHandler *client_handler =*/ new ClientHandler(bev, ssl, host);
}
if(num_worker_ == 0) {
/*ClientHandler* client = */
ssl::accept_ssl_connection(evbase_, ssl_ctx_, fd, addr, addrlen);
} else {
if(ENABLE_LOG) {
LOG(INFO) << "<listener> getnameinfo failed";
}
close(fd);
size_t idx = worker_round_robin_cnt_ % num_worker_;
++worker_round_robin_cnt_;
WorkerEvent wev;
wev.client_fd = fd;
memcpy(&wev.client_addr, addr, addrlen);
wev.client_addrlen = addrlen;
evbuffer *output = bufferevent_get_output(workers_[idx].bev);
evbuffer_add(output, &wev, sizeof(wev));
}
return 0;
}

View File

@ -31,19 +31,29 @@
#include <sys/socket.h>
#include <openssl/ssl.h>
#include <event.h>
namespace shrpx {
struct WorkerInfo {
int sv[2];
bufferevent *bev;
};
class ListenHandler {
public:
ListenHandler(event_base *evbase, SSL_CTX *ssl_ctx);
ListenHandler(event_base *evbase);
~ListenHandler();
int accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen);
void create_worker_thread(size_t num);
event_base* get_evbase() const;
private:
event_base *evbase_;
SSL_CTX *ssl_ctx_;
SSL_CTX *ssl_ctx_;
unsigned int worker_round_robin_cnt_;
WorkerInfo *workers_;
size_t num_worker_;
};
} // namespace shrpx

View File

@ -90,8 +90,7 @@ void on_stream_close_callback
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<spdy> Stream " << stream_id
<< " is being closed";
LOG(INFO) << "Upstream spdy Stream " << stream_id << " is being closed";
}
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
Downstream *downstream = upstream->get_downstream_queue()->find(stream_id);
@ -114,7 +113,7 @@ void on_ctrl_recv_callback
switch(type) {
case SPDYLAY_SYN_STREAM: {
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<spdy> Received upstream SYN_STREAM stream_id="
LOG(INFO) << "Upstream spdy received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
}
Downstream *downstream = new Downstream(upstream,
@ -139,14 +138,14 @@ void on_ctrl_recv_callback
for(size_t i = 0; nv[i]; i += 2) {
ss << nv[i] << ": " << nv[i+1] << "\n";
}
LOG(INFO) << "<upstream>::<spdy> Request headers:\n" << ss.str();
LOG(INFO) << "Upstream spdy request headers:\n" << ss.str();
}
downstream->push_request_headers();
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) {
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<spdy> "
LOG(INFO) << "Upstream spdy "
<< "Setting Downstream::MSG_COMPLETE for Downstream "
<< downstream;
}
@ -169,7 +168,7 @@ void on_data_chunk_recv_callback(spdylay_session *session,
void *user_data)
{
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<spdy> Received upstream DATA data stream_id="
LOG(INFO) << "Upstream spdy received upstream DATA data stream_id="
<< stream_id;
}
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
@ -178,8 +177,8 @@ void on_data_chunk_recv_callback(spdylay_session *session,
downstream->push_upload_data_chunk(data, len);
if(flags & SPDYLAY_DATA_FLAG_FIN) {
if(ENABLE_LOG) {
LOG(INFO) << "<upstream>::<spdy> "
<< "Setting Downstream::MSG_COMPLETE for Downstream "
LOG(INFO) << "Upstream spdy "
<< "setting Downstream::MSG_COMPLETE for Downstream "
<< downstream;
}
downstream->set_request_state(Downstream::MSG_COMPLETE);
@ -309,13 +308,13 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
if(events & BEV_EVENT_CONNECTED) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> Connection established. Downstream "
LOG(INFO) << "Downstream connection established. Downstream "
<< downstream;
}
}
if(events & BEV_EVENT_EOF) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> EOF stream_id="
LOG(INFO) << "Downstream EOF stream_id="
<< downstream->get_stream_id();
}
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
@ -328,7 +327,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// Server may indicate the end of the request by EOF
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> Assuming content-length is 0 byte";
LOG(INFO) << "Assuming downstream content-length is 0 byte";
}
downstream->set_response_state(Downstream::MSG_COMPLETE);
upstream->on_downstream_body_complete(downstream);
@ -342,7 +341,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
} else if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> error/timeout. Downstream " << downstream;
LOG(INFO) << "Downstream error/timeout. Downstream " << downstream;
}
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
upstream->remove_downstream(downstream);
@ -474,7 +473,7 @@ spdylay_session* SpdyUpstream::get_spdy_session()
int SpdyUpstream::on_downstream_header_complete(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> on_downstream_header_complete";
LOG(INFO) << "Downstream on_downstream_header_complete";
}
size_t nheader = downstream->get_response_headers().size();
const char **nv = new const char*[nheader * 2 + 4 + 1];
@ -504,7 +503,7 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream)
for(size_t i = 0; nv[i]; i += 2) {
ss << nv[i] << ": " << nv[i+1] << "\n";
}
LOG(INFO) << "<upstream>::<spdy> Response headers\n" << ss.str();
LOG(INFO) << "Upstream spdy response headers\n" << ss.str();
}
spdylay_data_provider data_prd;
data_prd.source.ptr = downstream;
@ -520,7 +519,7 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len)
{
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> on_downstream_body";
LOG(INFO) << "Downstream on_downstream_body";
}
evbuffer *body = downstream->get_response_body_buf();
evbuffer_add(body, data, len);
@ -537,7 +536,7 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream,
int SpdyUpstream::on_downstream_body_complete(Downstream *downstream)
{
if(ENABLE_LOG) {
LOG(INFO) << "<downstream> on_downstream_body_complete";
LOG(INFO) << "Downstream on_downstream_body_complete";
}
spdylay_session_resume_data(session_, downstream->get_stream_id());
return 0;

141
examples/shrpx_ssl.cc Normal file
View File

@ -0,0 +1,141 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_ssl.h"
#include <sys/socket.h>
#include <netdb.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h>
#include "shrpx_log.h"
#include "shrpx_client_handler.h"
#include "shrpx_config.h"
namespace shrpx {
namespace ssl {
namespace {
std::pair<unsigned char*, size_t> next_proto;
unsigned char proto_list[23];
} // namespace
namespace {
int next_proto_cb(SSL *s, const unsigned char **data, unsigned int *len,
void *arg)
{
std::pair<unsigned char*, size_t> *next_proto =
reinterpret_cast<std::pair<unsigned char*, size_t>* >(arg);
*data = next_proto->first;
*len = next_proto->second;
return SSL_TLSEXT_ERR_OK;
}
} // namespace
namespace {
int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
{
// We don't verify the client certificate. Just request it for the
// testing purpose.
return 1;
}
} // namespace
SSL_CTX* create_ssl_context()
{
SSL_CTX *ssl_ctx;
ssl_ctx = SSL_CTX_new(SSLv23_server_method());
if(!ssl_ctx) {
LOG(FATAL) << ERR_error_string(ERR_get_error(), 0);
DIE();
}
SSL_CTX_set_options(ssl_ctx,
SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_COMPRESSION);
SSL_CTX_set_mode(ssl_ctx, SSL_MODE_AUTO_RETRY);
SSL_CTX_set_mode(ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
if(SSL_CTX_use_PrivateKey_file(ssl_ctx,
get_config()->private_key_file,
SSL_FILETYPE_PEM) != 1) {
LOG(FATAL) << "SSL_CTX_use_PrivateKey_file failed.";
DIE();
}
if(SSL_CTX_use_certificate_file(ssl_ctx, get_config()->cert_file,
SSL_FILETYPE_PEM) != 1) {
LOG(FATAL) << "SSL_CTX_use_certificate_file failed.";
DIE();
}
if(SSL_CTX_check_private_key(ssl_ctx) != 1) {
LOG(FATAL) << "SSL_CTX_check_private_key failed.";
DIE();
}
if(get_config()->verify_client) {
SSL_CTX_set_verify(ssl_ctx,
SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE |
SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
verify_callback);
}
// We speaks "http/1.1", "spdy/2" and "spdy/3".
proto_list[0] = 6;
memcpy(&proto_list[1], "spdy/3", 6);
proto_list[7] = 6;
memcpy(&proto_list[8], "spdy/2", 6);
proto_list[14] = 8;
memcpy(&proto_list[15], "http/1.1", 8);
next_proto.first = proto_list;
next_proto.second = sizeof(proto_list);
SSL_CTX_set_next_protos_advertised_cb(ssl_ctx, next_proto_cb, &next_proto);
return ssl_ctx;
}
ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen)
{
char host[NI_MAXHOST];
int rv;
rv = getnameinfo(addr, addrlen, host, sizeof(host), 0, 0, NI_NUMERICHOST);
if(rv == 0) {
SSL *ssl = SSL_new(ssl_ctx);
if(!ssl) {
LOG(ERROR) << "SSL_new() failed";
return 0;
}
bufferevent *bev = bufferevent_openssl_socket_new
(evbase, fd, ssl,
BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS);
ClientHandler *client_handler = new ClientHandler(bev, ssl, host);
return client_handler;
} else {
LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv);
return 0;
}
}
} // namespace ssl
} // namespace shrpx

51
examples/shrpx_ssl.h Normal file
View File

@ -0,0 +1,51 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_SSL_H
#define SHRPX_SSL_H
#include "shrpx.h"
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <event.h>
namespace shrpx {
class ClientHandler;
namespace ssl {
SSL_CTX* create_ssl_context();
ClientHandler* accept_ssl_connection(event_base *evbase, SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen);
} // namespace ssl
} // namespace shrpx
#endif // SHRPX_SSL_H

View File

@ -0,0 +1,69 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_thread_event_receiver.h"
#include "shrpx_ssl.h"
#include "shrpx_log.h"
#include "shrpx_client_handler.h"
namespace shrpx {
ThreadEventReceiver::ThreadEventReceiver(SSL_CTX *ssl_ctx)
: ssl_ctx_(ssl_ctx)
{}
ThreadEventReceiver::~ThreadEventReceiver()
{}
void ThreadEventReceiver::on_read(bufferevent *bev)
{
evbuffer *input = bufferevent_get_input(bev);
while(evbuffer_get_length(input) >= sizeof(WorkerEvent)) {
WorkerEvent wev;
evbuffer_remove(input, &wev, sizeof(WorkerEvent));
if(ENABLE_LOG) {
LOG(INFO) << "WorkerEvent: client_fd=" << wev.client_fd
<< ", addrlen=" << wev.client_addrlen;
}
event_base *evbase = bufferevent_get_base(bev);
ClientHandler *client_handler;
client_handler = ssl::accept_ssl_connection(evbase, ssl_ctx_,
wev.client_fd,
&wev.client_addr.sa,
wev.client_addrlen);
if(client_handler) {
if(ENABLE_LOG) {
LOG(INFO) << "ClientHandler " << client_handler << " created";
}
} else {
if(ENABLE_LOG) {
LOG(ERROR) << "ClientHandler creation failed";
}
close(wev.client_fd);
}
}
}
} // namespace shrpx

View File

@ -0,0 +1,55 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_THREAD_EVENT_RECEIVER_H
#define SHRPX_THREAD_EVENT_RECEIVER_H
#include "shrpx.h"
#include <openssl/ssl.h>
#include <event2/bufferevent.h>
#include "shrpx_config.h"
namespace shrpx {
struct WorkerEvent {
evutil_socket_t client_fd;
sockaddr_union client_addr;
size_t client_addrlen;
};
class ThreadEventReceiver {
public:
ThreadEventReceiver(SSL_CTX *ssl_ctx);
~ThreadEventReceiver();
void on_read(bufferevent *bev);
private:
SSL_CTX *ssl_ctx_;
};
} // namespace shrpx
#endif // SHRPX_THREAD_EVENT_RECEIVER_H

93
examples/shrpx_worker.cc Normal file
View File

@ -0,0 +1,93 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_worker.h"
#include <unistd.h>
#include <sys/socket.h>
#include <event.h>
#include <event2/bufferevent.h>
#include "shrpx_ssl.h"
#include "shrpx_thread_event_receiver.h"
#include "shrpx_log.h"
namespace shrpx {
Worker::Worker(int fd)
: fd_(fd),
ssl_ctx_(ssl::create_ssl_context())
{}
Worker::~Worker()
{
SSL_CTX_free(ssl_ctx_);
shutdown(fd_, SHUT_WR);
close(fd_);
}
namespace {
void readcb(bufferevent *bev, void *arg)
{
ThreadEventReceiver *receiver = reinterpret_cast<ThreadEventReceiver*>(arg);
receiver->on_read(bev);
}
} // namespace
namespace {
void eventcb(bufferevent *bev, short events, void *arg)
{
if(events & BEV_EVENT_EOF) {
LOG(ERROR) << "Connection to main thread lost: eof";
}
if(events & BEV_EVENT_ERROR) {
LOG(ERROR) << "Connection to main thread lost: network error";
}
}
} // namespace
void Worker::run()
{
event_base *evbase = event_base_new();
bufferevent *bev = bufferevent_socket_new(evbase, fd_,
BEV_OPT_DEFER_CALLBACKS);
ThreadEventReceiver *receiver = new ThreadEventReceiver(ssl_ctx_);
bufferevent_enable(bev, EV_READ);
bufferevent_setcb(bev, readcb, 0, eventcb, receiver);
event_base_loop(evbase, 0);
delete receiver;
}
void* start_threaded_worker(void *arg)
{
int fd = *reinterpret_cast<int*>(arg);
Worker worker(fd);
worker.run();
return 0;
}
} // namespace shrpx

50
examples/shrpx_worker.h Normal file
View File

@ -0,0 +1,50 @@
/*
* Spdylay - SPDY Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_WORKER_H
#define SHRPX_WORKER_H
#include "shrpx.h"
#include <openssl/ssl.h>
#include <openssl/err.h>
namespace shrpx {
class Worker {
public:
Worker(int fd);
~Worker();
void run();
private:
// Channel to the main thread
int fd_;
SSL_CTX *ssl_ctx_;
};
void* start_threaded_worker(void *arg);
} // namespace shrpx
#endif // SHRPX_WORKER_H