Port shrpx to nghttp2 use

This commit is contained in:
Tatsuhiro Tsujikawa 2013-07-26 19:33:25 +09:00
parent 459a269049
commit 18f450fd2a
11 changed files with 330 additions and 394 deletions

View File

@ -35,11 +35,7 @@ AM_CXXFLAGS = -std=c++11
LDADD = $(top_builddir)/lib/libnghttp2.la
bin_PROGRAMS += nghttp nghttpd
if HAVE_LIBEVENT_OPENSSL
# bin_PROGRAMS += shrpx
endif # HAVE_LIBEVENT_OPENSSL
bin_PROGRAMS += nghttp nghttpd shrpx
HELPER_OBJECTS = util.cc timegm.c app_helper.cc
HELPER_HFILES = util.h timegm.h app_helper.h nghttp2_config.h
@ -72,31 +68,31 @@ nghttpd_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttpd.cc \
HttpServer.cc HttpServer.h
if HAVE_LIBEVENT_OPENSSL
# SHRPX_SRCS = \
# util.cc util.h timegm.c timegm.h base64.h \
# shrpx_config.cc shrpx_config.h \
# shrpx_error.h \
# shrpx_listen_handler.cc shrpx_listen_handler.h \
# shrpx_client_handler.cc shrpx_client_handler.h \
# shrpx_upstream.h \
# shrpx_spdy_upstream.cc shrpx_spdy_upstream.h \
# shrpx_https_upstream.cc shrpx_https_upstream.h \
# shrpx_downstream_queue.cc shrpx_downstream_queue.h \
# shrpx_downstream.cc shrpx_downstream.h \
# shrpx_downstream_connection.cc shrpx_downstream_connection.h \
# shrpx_http_downstream_connection.cc shrpx_http_downstream_connection.h \
# shrpx_spdy_downstream_connection.cc shrpx_spdy_downstream_connection.h \
# shrpx_spdy_session.cc shrpx_spdy_session.h \
# shrpx_log.cc shrpx_log.h \
# shrpx_http.cc shrpx_http.h \
# shrpx_io_control.cc shrpx_io_control.h \
# shrpx_ssl.cc shrpx_ssl.h \
# shrpx_thread_event_receiver.cc shrpx_thread_event_receiver.h \
# shrpx_worker.cc shrpx_worker.h \
# shrpx_accesslog.cc shrpx_accesslog.h\
# http-parser/http_parser.c http-parser/http_parser.h
SHRPX_SRCS = \
util.cc util.h timegm.c timegm.h base64.h \
shrpx_config.cc shrpx_config.h \
shrpx_error.h \
shrpx_listen_handler.cc shrpx_listen_handler.h \
shrpx_client_handler.cc shrpx_client_handler.h \
shrpx_upstream.h \
shrpx_spdy_upstream.cc shrpx_spdy_upstream.h \
shrpx_https_upstream.cc shrpx_https_upstream.h \
shrpx_downstream_queue.cc shrpx_downstream_queue.h \
shrpx_downstream.cc shrpx_downstream.h \
shrpx_downstream_connection.cc shrpx_downstream_connection.h \
shrpx_http_downstream_connection.cc shrpx_http_downstream_connection.h \
shrpx_spdy_downstream_connection.cc shrpx_spdy_downstream_connection.h \
shrpx_spdy_session.cc shrpx_spdy_session.h \
shrpx_log.cc shrpx_log.h \
shrpx_http.cc shrpx_http.h \
shrpx_io_control.cc shrpx_io_control.h \
shrpx_ssl.cc shrpx_ssl.h \
shrpx_thread_event_receiver.cc shrpx_thread_event_receiver.h \
shrpx_worker.cc shrpx_worker.h \
shrpx_accesslog.cc shrpx_accesslog.h\
http-parser/http_parser.c http-parser/http_parser.h
# shrpx_SOURCES = ${SHRPX_SRCS} shrpx.cc shrpx.h
shrpx_SOURCES = ${SHRPX_SRCS} shrpx.cc shrpx.h
# if HAVE_CUNIT
# check_PROGRAMS += shrpx-unittest

View File

@ -358,9 +358,7 @@ void fill_default_config()
mod_config()->spdy_downstream_window_bits = 16;
mod_config()->spdy_upstream_no_tls = false;
mod_config()->spdy_upstream_version = 3;
mod_config()->spdy_downstream_no_tls = false;
mod_config()->spdy_downstream_version = 3;
set_config_str(&mod_config()->downstream_host, "127.0.0.1");
mod_config()->downstream_port = 80;
@ -543,14 +541,8 @@ void print_help(std::ostream& out)
<< get_config()->spdy_upstream_window_bits << "\n"
<< " --frontend-spdy-no-tls\n"
<< " Disable SSL/TLS on frontend SPDY\n"
<< " connections. SPDY protocol must be specified\n"
<< " using --frontend-spdy-proto. This option\n"
<< " also disables frontend HTTP/1.1.\n"
<< " --frontend-spdy-proto\n"
<< " Specify SPDY protocol used in frontend\n"
<< " connection if --frontend-spdy-no-tls is\n"
<< " used. Default: spdy/"
<< get_config()->spdy_upstream_version << "\n"
<< " connections.This option also disables\n"
<< " frontend HTTP/1.1.\n"
<< " --backend-spdy-window-bits=<N>\n"
<< " Sets the initial window size of SPDY\n"
<< " backend connection to 2**<N>.\n"
@ -558,13 +550,6 @@ void print_help(std::ostream& out)
<< get_config()->spdy_downstream_window_bits << "\n"
<< " --backend-spdy-no-tls\n"
<< " Disable SSL/TLS on backend SPDY connections.\n"
<< " SPDY protocol must be specified using\n"
<< " --backend-spdy-proto\n"
<< " --backend-spdy-proto\n"
<< " Specify SPDY protocol used in backend\n"
<< " connection if --backend-spdy-no-tls is used.\n"
<< " Default: spdy/"
<< get_config()->spdy_downstream_version << "\n"
<< "\n"
<< " Mode:\n"
<< " -s, --spdy-proxy Enable secure SPDY proxy mode.\n"
@ -664,9 +649,7 @@ int main(int argc, char **argv)
{"spdy-bridge", no_argument, &flag, 25},
{"backend-http-proxy-uri", required_argument, &flag, 26},
{"backend-spdy-no-tls", no_argument, &flag, 27},
{"backend-spdy-proto", required_argument, &flag, 28},
{"frontend-spdy-no-tls", no_argument, &flag, 29},
{"frontend-spdy-proto", required_argument, &flag, 30},
{"backend-tls-sni-field", required_argument, &flag, 31},
{"honor-cipher-order", no_argument, &flag, 32},
{0, 0, 0, 0 }
@ -833,21 +816,11 @@ int main(int argc, char **argv)
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_BACKEND_SPDY_NO_TLS,
"yes"));
break;
case 28:
// --backend-spdy-proto
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_BACKEND_SPDY_PROTO,
optarg));
break;
case 29:
// --frontend-spdy-no-tls
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_FRONTEND_SPDY_NO_TLS,
"yes"));
break;
case 30:
// --frontend-spdy-proto
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_FRONTEND_SPDY_PROTO,
optarg));
break;
case 31:
// --backend-tls-sni-field
cmdcfgs.push_back(std::make_pair(SHRPX_OPT_BACKEND_TLS_SNI_FIELD,

View File

@ -123,24 +123,24 @@ ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
: bev_(bev),
fd_(fd),
ssl_(ssl),
upstream_(0),
upstream_(nullptr),
ipaddr_(ipaddr),
should_close_after_write_(false),
spdy_(0)
spdy_(nullptr)
{
bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
set_upstream_timeouts(&get_config()->upstream_read_timeout,
&get_config()->upstream_write_timeout);
if(ssl_) {
set_bev_cb(0, upstream_writecb, upstream_eventcb);
set_bev_cb(nullptr, upstream_writecb, upstream_eventcb);
} else {
if(get_config()->client_mode) {
// Client mode
upstream_ = new HttpsUpstream(this);
} else {
// no-TLS SPDY
upstream_ = new SpdyUpstream(get_config()->spdy_upstream_version, this);
upstream_ = new SpdyUpstream(this);
}
set_bev_cb(upstream_readcb, upstream_writecb, upstream_eventcb);
}
@ -201,18 +201,16 @@ void ClientHandler::set_upstream_timeouts(const timeval *read_timeout,
int ClientHandler::validate_next_proto()
{
const unsigned char *next_proto = 0;
const unsigned char *next_proto = nullptr;
unsigned int next_proto_len;
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len);
if(next_proto) {
std::string proto(next_proto, next_proto+next_proto_len);
if(LOG_ENABLED(INFO)) {
std::string proto(next_proto, next_proto+next_proto_len);
CLOG(INFO, this) << "The negotiated next protocol: " << proto;
}
uint16_t version = nghttp2_npn_get_version(next_proto, next_proto_len);
if(version) {
SpdyUpstream *spdy_upstream = new SpdyUpstream(version, this);
upstream_ = spdy_upstream;
if(proto == NGHTTP2_PROTO_VERSION_ID) {
upstream_ = new SpdyUpstream(this);
return 0;
}
} else {
@ -223,8 +221,7 @@ int ClientHandler::validate_next_proto()
if(LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use HTTP/1.1";
}
HttpsUpstream *https_upstream = new HttpsUpstream(this);
upstream_ = https_upstream;
upstream_ = new HttpsUpstream(this);
return 0;
}

View File

@ -76,9 +76,7 @@ SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT[] = "backend-keep-alive-timeout";
const char SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS[] = "frontend-spdy-window-bits";
const char SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS[] = "backend-spdy-window-bits";
const char SHRPX_OPT_FRONTEND_SPDY_NO_TLS[] = "frontend-spdy-no-tls";
const char SHRPX_OPT_FRONTEND_SPDY_PROTO[] = "frontend-spdy-proto";
const char SHRPX_OPT_BACKEND_SPDY_NO_TLS[] = "backend-spdy-no-tls";
const char SHRPX_OPT_BACKEND_SPDY_PROTO[] = "backend-spdy-proto";
const char SHRPX_OPT_BACKEND_TLS_SNI_FIELD[] = "backend-tls-sni-field";
const char SHRPX_OPT_PID_FILE[] = "pid-file";
const char SHRPX_OPT_USER[] = "user";
@ -188,23 +186,6 @@ void set_config_str(char **destp, const char *val)
*destp = strdup(val);
}
namespace {
// Parses |optarg| as SPDY NPN protocol string and returns SPDY
// protocol version number. This function returns -1 on error.
int parse_spdy_proto(const char *optarg)
{
size_t len = strlen(optarg);
const unsigned char *proto;
proto = reinterpret_cast<const unsigned char*>(optarg);
uint16_t version = nghttp2_npn_get_version(proto, len);
if(!version) {
LOG(ERROR) << "Unsupported SPDY version: " << optarg;
return -1;
}
return version;
}
} // namespace
int parse_config(const char *opt, const char *optarg)
{
char host[NI_MAXHOST];
@ -286,22 +267,8 @@ int parse_config(const char *opt, const char *optarg)
}
} else if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_NO_TLS)) {
mod_config()->spdy_upstream_no_tls = util::strieq(optarg, "yes");
} else if(util::strieq(opt, SHRPX_OPT_FRONTEND_SPDY_PROTO)) {
int version = parse_spdy_proto(optarg);
if(version == -1) {
return -1;
} else {
mod_config()->spdy_upstream_version = version;
}
} else if(util::strieq(opt, SHRPX_OPT_BACKEND_SPDY_NO_TLS)) {
mod_config()->spdy_downstream_no_tls = util::strieq(optarg, "yes");
} else if(util::strieq(opt, SHRPX_OPT_BACKEND_SPDY_PROTO)) {
int version = parse_spdy_proto(optarg);
if(version == -1) {
return -1;
} else {
mod_config()->spdy_downstream_version = version;
}
} else if(util::strieq(opt, SHRPX_OPT_BACKEND_TLS_SNI_FIELD)) {
set_config_str(&mod_config()->backend_tls_sni_name, optarg);
} else if(util::strieq(opt, SHRPX_OPT_PID_FILE)) {

View File

@ -69,9 +69,7 @@ extern const char SHRPX_OPT_BACKEND_KEEP_ALIVE_TIMEOUT[];
extern const char SHRPX_OPT_FRONTEND_SPDY_WINDOW_BITS[];
extern const char SHRPX_OPT_BACKEND_SPDY_WINDOW_BITS[];
extern const char SHRPX_OPT_FRONTEND_SPDY_NO_TLS[];
extern const char SHRPX_OPT_FRONTEND_SPDY_PROTO[];
extern const char SHRPX_OPT_BACKEND_SPDY_NO_TLS[];
extern const char SHRPX_OPT_BACKEND_SPDY_PROTO[];
extern const char SHRPX_OPT_PID_FILE[];
extern const char SHRPX_OPT_USER[];
extern const char SHRPX_OPT_SYSLOG[];
@ -133,9 +131,7 @@ struct Config {
size_t spdy_upstream_window_bits;
size_t spdy_downstream_window_bits;
bool spdy_upstream_no_tls;
uint16_t spdy_upstream_version;
bool spdy_downstream_no_tls;
uint16_t spdy_downstream_version;
char *backend_tls_sni_name;
char *pid_file;
uid_t uid;

View File

@ -39,7 +39,7 @@ namespace shrpx {
Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
: upstream_(upstream),
dconn_(0),
dconn_(nullptr),
stream_id_(stream_id),
priority_(priority),
downstream_stream_id_(-1),
@ -57,7 +57,8 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
chunked_response_(false),
response_connection_close_(false),
response_header_key_prev_(false),
response_body_buf_(0),
response_body_buf_(nullptr),
response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
recv_window_size_(0)
{}
@ -489,14 +490,15 @@ int32_t Downstream::get_downstream_stream_id() const
return downstream_stream_id_;
}
uint32_t Downstream::get_response_rst_stream_status_code() const
nghttp2_error_code Downstream::get_response_rst_stream_error_code() const
{
return response_rst_stream_status_code_;
return response_rst_stream_error_code_;
}
void Downstream::set_response_rst_stream_status_code(uint32_t status_code)
void Downstream::set_response_rst_stream_error_code
(nghttp2_error_code error_code)
{
response_rst_stream_status_code_ = status_code;
response_rst_stream_error_code_ = error_code;
}
} // namespace shrpx

View File

@ -35,6 +35,8 @@
#include <event.h>
#include <event2/bufferevent.h>
#include <nghttp2/nghttp2.h>
#include "shrpx_io_control.h"
namespace shrpx {
@ -128,8 +130,8 @@ public:
int get_response_state() const;
int init_response_body_buf();
evbuffer* get_response_body_buf();
uint32_t get_response_rst_stream_status_code() const;
void set_response_rst_stream_status_code(uint32_t status_code);
nghttp2_error_code get_response_rst_stream_error_code() const;
void set_response_rst_stream_error_code(nghttp2_error_code error_code);
// Call this method when there is incoming data in downstream
// connection.
@ -166,8 +168,8 @@ private:
// This buffer is used to temporarily store downstream response
// body. Spdylay reads data from this in the callback.
evbuffer *response_body_buf_;
// RST_STREAM status_code from downstream SPDY connection
uint32_t response_rst_stream_status_code_;
// RST_STREAM error_code from downstream SPDY connection
nghttp2_error_code response_rst_stream_error_code_;
int32_t recv_window_size_;
};

View File

@ -49,14 +49,14 @@ namespace shrpx {
SpdySession::SpdySession(event_base *evbase, SSL_CTX *ssl_ctx)
: evbase_(evbase),
ssl_ctx_(ssl_ctx),
ssl_(0),
ssl_(nullptr),
fd_(-1),
session_(0),
bev_(0),
session_(nullptr),
bev_(nullptr),
state_(DISCONNECTED),
notified_(false),
wrbev_(0),
rdbev_(0),
wrbev_(nullptr),
rdbev_(nullptr),
flow_control_(false),
proxy_htp_(0)
{}
@ -565,10 +565,11 @@ int SpdySession::submit_request(SpdyDownstreamConnection *dconn,
}
int SpdySession::submit_rst_stream(SpdyDownstreamConnection *dconn,
int32_t stream_id, uint32_t status_code)
int32_t stream_id,
nghttp2_error_code error_code)
{
assert(state_ == CONNECTED);
int rv = nghttp2_submit_rst_stream(session_, stream_id, status_code);
int rv = nghttp2_submit_rst_stream(session_, stream_id, error_code);
if(rv != 0) {
SSLOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: "
<< nghttp2_strerror(rv);
@ -584,7 +585,8 @@ int SpdySession::submit_window_update(SpdyDownstreamConnection *dconn,
int rv;
int32_t stream_id;
stream_id = dconn->get_downstream()->get_downstream_stream_id();
rv = nghttp2_submit_window_update(session_, stream_id, amount);
rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_NONE,
stream_id, amount);
if(rv < NGHTTP2_ERR_FATAL) {
SSLOG(FATAL, this) << "nghttp2_submit_window_update() failed: "
<< nghttp2_strerror(rv);
@ -678,29 +680,28 @@ ssize_t recv_callback(nghttp2_session *session,
namespace {
void on_stream_close_callback
(nghttp2_session *session, int32_t stream_id, nghttp2_status_code status_code,
(nghttp2_session *session, int32_t stream_id, nghttp2_error_code error_code,
void *user_data)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
auto spdy = reinterpret_cast<SpdySession*>(user_data);
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "Stream stream_id=" << stream_id
<< " is being closed";
}
StreamData *sd;
sd = reinterpret_cast<StreamData*>
auto sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, stream_id));
if(sd == 0) {
// We might get this close callback when pushed streams are
// closed.
return;
}
SpdyDownstreamConnection* dconn = sd->dconn;
auto dconn = sd->dconn;
if(dconn) {
Downstream *downstream = dconn->get_downstream();
auto downstream = dconn->get_downstream();
if(downstream && downstream->get_downstream_stream_id() == stream_id) {
Upstream *upstream = downstream->get_upstream();
if(status_code == NGHTTP2_OK) {
auto upstream = downstream->get_upstream();
if(error_code == NGHTTP2_NO_ERROR) {
downstream->set_response_state(Downstream::MSG_COMPLETE);
rv = upstream->on_downstream_body_complete(downstream);
if(rv != 0) {
@ -719,108 +720,75 @@ void on_stream_close_callback
} // namespace
namespace {
void on_ctrl_recv_callback
(nghttp2_session *session, nghttp2_frame_type type, nghttp2_frame *frame,
void *user_data)
void on_frame_recv_callback
(nghttp2_session *session, nghttp2_frame *frame, void *user_data)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
StreamData *sd;
Downstream *downstream;
switch(type) {
case NGHTTP2_SYN_STREAM:
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "Received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
auto spdy = reinterpret_cast<SpdySession*>(user_data);
switch(frame->hd.type) {
case NGHTTP2_HEADERS: {
if(frame->headers.cat != NGHTTP2_HCAT_RESPONSE) {
break;
}
// We just respond pushed stream with RST_STREAM.
nghttp2_submit_rst_stream(session, frame->syn_stream.stream_id,
NGHTTP2_REFUSED_STREAM);
break;
case NGHTTP2_RST_STREAM:
sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session,
frame->rst_stream.stream_id));
if(sd && sd->dconn) {
downstream = sd->dconn->get_downstream();
if(downstream &&
downstream->get_downstream_stream_id() ==
frame->rst_stream.stream_id) {
if(downstream->tunnel_established() &&
downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// For tunneled connection, we has to submit RST_STREAM to
// upstream *after* whole response body is sent. We just set
// MSG_COMPLETE here. Upstream will take care of that.
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "RST_STREAM against tunneled stream "
<< "stream_id="
<< frame->rst_stream.stream_id;
}
downstream->get_upstream()->on_downstream_body_complete(downstream);
downstream->set_response_state(Downstream::MSG_COMPLETE);
} else {
// If we got RST_STREAM, just flag MSG_RESET to indicate
// upstream connection must be terminated.
downstream->set_response_state(Downstream::MSG_RESET);
}
downstream->set_response_rst_stream_status_code
(frame->rst_stream.status_code);
call_downstream_readcb(spdy, downstream);
}
}
break;
case NGHTTP2_SYN_REPLY: {
sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session,
frame->syn_reply.stream_id));
auto sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if(!sd || !sd->dconn) {
nghttp2_submit_rst_stream(session, frame->syn_reply.stream_id,
nghttp2_submit_rst_stream(session, frame->hd.stream_id,
NGHTTP2_INTERNAL_ERROR);
break;
}
downstream = sd->dconn->get_downstream();
auto downstream = sd->dconn->get_downstream();
if(!downstream ||
downstream->get_downstream_stream_id() != frame->syn_reply.stream_id) {
nghttp2_submit_rst_stream(session, frame->syn_reply.stream_id,
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
nghttp2_submit_rst_stream(session, frame->hd.stream_id,
NGHTTP2_INTERNAL_ERROR);
break;
}
char **nv = frame->syn_reply.nv;
const char *status = 0;
const char *version = 0;
const char *content_length = 0;
for(size_t i = 0; nv[i]; i += 2) {
if(strcmp(nv[i], ":status") == 0) {
unsigned int code = strtoul(nv[i+1], 0, 10);
auto nva = frame->headers.nva;
std::string status, version, content_length;
for(size_t i = 0; i < frame->headers.nvlen; ++i) {
if(util::strieq(":status", nva[i].name, nva[i].namelen)) {
status.assign(reinterpret_cast<char*>(nva[i].value),
nva[i].valuelen);
auto code = strtoul(status.c_str(), nullptr, 10);
downstream->set_response_http_status(code);
status = nv[i+1];
} else if(strcmp(nv[i], ":version") == 0) {
} else if(util::strieq(":version", nva[i].name, nva[i].namelen)) {
// We assume for now that most version is HTTP/1.1 from
// SPDY. So just check if it is HTTP/1.0 and then set response
// minor as so.
downstream->set_response_major(1);
if(util::strieq(nv[i+1], "HTTP/1.0")) {
version.assign(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen);
if(util::strieq("HTTP/1.0", version.c_str())) {
downstream->set_response_minor(0);
} else {
downstream->set_response_minor(1);
}
version = nv[i+1];
} else if(nv[i][0] != ':') {
if(strcmp(nv[i], "content-length") == 0) {
content_length = nv[i+1];
} else if(nva[i].namelen > 0 && nva[i].name[0] != ':') {
if(util::strieq("content-length", nva[i].name, nva[i].namelen)) {
content_length.assign(reinterpret_cast<char*>(nva[i].value),
nva[i].valuelen);
}
downstream->add_response_header(nv[i], nv[i+1]);
downstream->add_response_header
(std::string(reinterpret_cast<char*>(nva[i].name),
nva[i].namelen),
std::string(reinterpret_cast<char*>(nva[i].value),
nva[i].valuelen));
}
}
if(!status || !version) {
nghttp2_submit_rst_stream(session, frame->syn_reply.stream_id,
if(version.empty()) {
// If no version, just assume it is HTTP/1.1
downstream->set_response_major(1);
downstream->set_response_minor(1);
}
if(status.empty()) {
nghttp2_submit_rst_stream(session, frame->hd.stream_id,
NGHTTP2_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
call_downstream_readcb(spdy, downstream);
return;
}
if(!content_length && downstream->get_request_method() != "HEAD" &&
if(content_length.empty() && downstream->get_request_method() != "HEAD" &&
downstream->get_request_method() != "CONNECT") {
unsigned int status;
status = downstream->get_response_http_status();
@ -843,28 +811,72 @@ void on_ctrl_recv_callback
if(LOG_ENABLED(INFO)) {
std::stringstream ss;
for(size_t i = 0; nv[i]; i += 2) {
ss << TTY_HTTP_HD << nv[i] << TTY_RST << ": " << nv[i+1] << "\n";
for(size_t i = 0; i < frame->headers.nvlen; ++i) {
ss << TTY_HTTP_HD;
ss.write(reinterpret_cast<char*>(nva[i].name), nva[i].namelen);
ss << TTY_RST << ": ";
ss.write(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen);
ss << "\n";
}
SSLOG(INFO, spdy) << "HTTP response headers. stream_id="
<< frame->syn_reply.stream_id
<< frame->hd.stream_id
<< "\n" << ss.str();
}
Upstream *upstream = downstream->get_upstream();
auto upstream = downstream->get_upstream();
downstream->set_response_state(Downstream::HEADER_COMPLETE);
if(downstream->tunnel_established()) {
downstream->set_response_connection_close(true);
}
rv = upstream->on_downstream_header_complete(downstream);
if(rv != 0) {
nghttp2_submit_rst_stream(session, frame->syn_reply.stream_id,
nghttp2_submit_rst_stream(session, frame->hd.stream_id,
NGHTTP2_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
}
call_downstream_readcb(spdy, downstream);
break;
}
case NGHTTP2_RST_STREAM: {
auto sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if(sd && sd->dconn) {
auto downstream = sd->dconn->get_downstream();
if(downstream &&
downstream->get_downstream_stream_id() == frame->hd.stream_id) {
if(downstream->tunnel_established() &&
downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// For tunneled connection, we has to submit RST_STREAM to
// upstream *after* whole response body is sent. We just set
// MSG_COMPLETE here. Upstream will take care of that.
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "RST_STREAM against tunneled stream "
<< "stream_id="
<< frame->hd.stream_id;
}
downstream->get_upstream()->on_downstream_body_complete(downstream);
downstream->set_response_state(Downstream::MSG_COMPLETE);
} else {
// If we got RST_STREAM, just flag MSG_RESET to indicate
// upstream connection must be terminated.
downstream->set_response_state(Downstream::MSG_RESET);
}
downstream->set_response_rst_stream_error_code
(frame->rst_stream.error_code);
call_downstream_readcb(spdy, downstream);
}
}
break;
}
case NGHTTP2_PUSH_PROMISE:
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "Received downstream PUSH_PROMISE stream_id="
<< frame->hd.stream_id;
}
// We just respond with RST_STREAM.
nghttp2_submit_rst_stream(session, frame->hd.stream_id,
NGHTTP2_REFUSED_STREAM);
break;
default:
break;
}
@ -878,15 +890,14 @@ void on_data_chunk_recv_callback(nghttp2_session *session,
void *user_data)
{
int rv;
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
StreamData *sd;
sd = reinterpret_cast<StreamData*>
auto spdy = reinterpret_cast<SpdySession*>(user_data);
auto sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, stream_id));
if(!sd || !sd->dconn) {
nghttp2_submit_rst_stream(session, stream_id, NGHTTP2_INTERNAL_ERROR);
return;
}
Downstream *downstream = sd->dconn->get_downstream();
auto downstream = sd->dconn->get_downstream();
if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
nghttp2_submit_rst_stream(session, stream_id, NGHTTP2_INTERNAL_ERROR);
return;
@ -909,7 +920,7 @@ void on_data_chunk_recv_callback(nghttp2_session *session,
}
}
Upstream *upstream = downstream->get_upstream();
auto upstream = downstream->get_upstream();
rv = upstream->on_downstream_body(downstream, data, len);
if(rv != 0) {
nghttp2_submit_rst_stream(session, stream_id, NGHTTP2_INTERNAL_ERROR);
@ -920,57 +931,52 @@ void on_data_chunk_recv_callback(nghttp2_session *session,
} // namespace
namespace {
void before_ctrl_send_callback(nghttp2_session *session,
nghttp2_frame_type type,
nghttp2_frame *frame,
void *user_data)
void before_frame_send_callback(nghttp2_session *session,
nghttp2_frame *frame,
void *user_data)
{
if(type == NGHTTP2_SYN_STREAM) {
StreamData *sd;
sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session,
frame->syn_stream.stream_id));
if(frame->hd.type == NGHTTP2_HEADERS &&
frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
auto sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if(!sd || !sd->dconn) {
nghttp2_submit_rst_stream(session, frame->syn_stream.stream_id,
nghttp2_submit_rst_stream(session, frame->hd.stream_id,
NGHTTP2_CANCEL);
return;
}
Downstream *downstream = sd->dconn->get_downstream();
auto downstream = sd->dconn->get_downstream();
if(downstream) {
downstream->set_downstream_stream_id(frame->syn_stream.stream_id);
downstream->set_downstream_stream_id(frame->hd.stream_id);
} else {
nghttp2_submit_rst_stream(session, frame->syn_stream.stream_id,
NGHTTP2_CANCEL);
nghttp2_submit_rst_stream(session, frame->hd.stream_id, NGHTTP2_CANCEL);
}
}
}
} // namespace
namespace {
void on_ctrl_not_send_callback(nghttp2_session *session,
nghttp2_frame_type type,
nghttp2_frame *frame,
int error_code, void *user_data)
void on_frame_not_send_callback(nghttp2_session *session,
nghttp2_frame *frame,
int lib_error_code, void *user_data)
{
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
SSLOG(WARNING, spdy) << "Failed to send control frame type=" << type << ", "
<< "error_code=" << error_code << ":"
<< nghttp2_strerror(error_code);
if(type == NGHTTP2_SYN_STREAM) {
auto spdy = reinterpret_cast<SpdySession*>(user_data);
SSLOG(WARNING, spdy) << "Failed to send control frame type="
<< frame->hd.type << ", "
<< "lib_error_code=" << lib_error_code << ":"
<< nghttp2_strerror(lib_error_code);
if(frame->hd.type == NGHTTP2_HEADERS &&
frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
// To avoid stream hanging around, flag Downstream::MSG_RESET and
// terminate the upstream and downstream connections.
StreamData *sd;
sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session,
frame->syn_stream.stream_id));
auto sd = reinterpret_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if(!sd) {
return;
}
if(sd->dconn) {
Downstream *downstream = sd->dconn->get_downstream();
auto downstream = sd->dconn->get_downstream();
if(!downstream ||
downstream->get_downstream_stream_id() !=
frame->syn_stream.stream_id) {
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
return;
}
downstream->set_response_state(Downstream::MSG_RESET);
@ -982,30 +988,30 @@ void on_ctrl_not_send_callback(nghttp2_session *session,
} // namespace
namespace {
void on_ctrl_recv_parse_error_callback(nghttp2_session *session,
nghttp2_frame_type type,
const uint8_t *head, size_t headlen,
const uint8_t *payload,
size_t payloadlen, int error_code,
void *user_data)
void on_frame_recv_parse_error_callback(nghttp2_session *session,
nghttp2_frame_type type,
const uint8_t *head, size_t headlen,
const uint8_t *payload,
size_t payloadlen, int lib_error_code,
void *user_data)
{
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
auto spdy = reinterpret_cast<SpdySession*>(user_data);
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "Failed to parse received control frame. type="
<< type
<< ", error_code=" << error_code << ":"
<< nghttp2_strerror(error_code);
<< ", lib_error_code=" << lib_error_code << ":"
<< nghttp2_strerror(lib_error_code);
}
}
} // namespace
namespace {
void on_unknown_ctrl_recv_callback(nghttp2_session *session,
const uint8_t *head, size_t headlen,
const uint8_t *payload, size_t payloadlen,
void *user_data)
void on_unknown_frame_recv_callback(nghttp2_session *session,
const uint8_t *head, size_t headlen,
const uint8_t *payload, size_t payloadlen,
void *user_data)
{
SpdySession *spdy = reinterpret_cast<SpdySession*>(user_data);
auto spdy = reinterpret_cast<SpdySession*>(user_data);
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "Received unknown control frame";
}
@ -1015,76 +1021,70 @@ void on_unknown_ctrl_recv_callback(nghttp2_session *session,
int SpdySession::on_connect()
{
int rv;
uint16_t version;
const unsigned char *next_proto = 0;
unsigned int next_proto_len;
if(ssl_ctx_) {
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len);
std::string proto(next_proto, next_proto+next_proto_len);
if(LOG_ENABLED(INFO)) {
std::string proto(next_proto, next_proto+next_proto_len);
SSLOG(INFO, this) << "Negotiated next protocol: " << proto;
}
version = nghttp2_npn_get_version(next_proto, next_proto_len);
if(!version) {
if(proto != NGHTTP2_PROTO_VERSION_ID) {
return -1;
}
} else {
version = get_config()->spdy_downstream_version;
}
nghttp2_session_callbacks callbacks;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.send_callback = send_callback;
callbacks.recv_callback = recv_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback;
callbacks.on_frame_recv_callback = on_frame_recv_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
callbacks.before_ctrl_send_callback = before_ctrl_send_callback;
callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback;
callbacks.on_ctrl_recv_parse_error_callback =
on_ctrl_recv_parse_error_callback;
callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback;
callbacks.before_frame_send_callback = before_frame_send_callback;
callbacks.on_frame_not_send_callback = on_frame_not_send_callback;
callbacks.on_frame_recv_parse_error_callback =
on_frame_recv_parse_error_callback;
callbacks.on_unknown_frame_recv_callback = on_unknown_frame_recv_callback;
rv = nghttp2_session_client_new(&session_, version, &callbacks, this);
rv = nghttp2_session_client_new(&session_, &callbacks, this);
if(rv != 0) {
return -1;
}
if(version == NGHTTP2_PROTO_SPDY3) {
int val = 1;
flow_control_ = true;
rv = nghttp2_session_set_option(session_,
NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE, &val,
sizeof(val));
assert(rv == 0);
} else {
flow_control_ = false;
}
int val = 1;
flow_control_ = true;
rv = nghttp2_session_set_option(session_,
NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE, &val,
sizeof(val));
assert(rv == 0);
nghttp2_settings_entry entry[2];
entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[0].value = get_config()->spdy_max_concurrent_streams;
entry[0].flags = NGHTTP2_ID_FLAG_SETTINGS_NONE;
entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry[1].value = get_initial_window_size();
entry[1].flags = NGHTTP2_ID_FLAG_SETTINGS_NONE;
rv = nghttp2_submit_settings
(session_, NGHTTP2_FLAG_SETTINGS_NONE,
entry, sizeof(entry)/sizeof(nghttp2_settings_entry));
(session_, entry, sizeof(entry)/sizeof(nghttp2_settings_entry));
if(rv != 0) {
return -1;
}
// Disable connection-level flow control
rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_END_FLOW_CONTROL,
0, 0);
if(rv != 0) {
return -1;
}
rv = send();
if(rv != 0) {
return -1;
}
// submit pending request
for(std::set<SpdyDownstreamConnection*>::iterator i = dconns_.begin(),
eoi = dconns_.end(); i != eoi; ++i) {
if((*i)->push_request_headers() != 0) {
for(auto dconn : dconns_) {
if(dconn->push_request_headers() != 0) {
return -1;
}
}

View File

@ -68,7 +68,7 @@ public:
const nghttp2_data_provider *data_prd);
int submit_rst_stream(SpdyDownstreamConnection *dconn,
int32_t stream_id, uint32_t status_code);
int32_t stream_id, nghttp2_error_code error_code);
int submit_window_update(SpdyDownstreamConnection *dconn, int32_t amount);

View File

@ -91,7 +91,7 @@ ssize_t recv_callback(nghttp2_session *session,
namespace {
void on_stream_close_callback
(nghttp2_session *session, int32_t stream_id, nghttp2_status_code status_code,
(nghttp2_session *session, int32_t stream_id, nghttp2_error_code error_code,
void *user_data)
{
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
@ -136,52 +136,52 @@ void on_stream_close_callback
} // namespace
namespace {
void on_ctrl_recv_callback
(nghttp2_session *session, nghttp2_frame_type type, nghttp2_frame *frame,
void *user_data)
void on_frame_recv_callback
(nghttp2_session *session, nghttp2_frame *frame, void *user_data)
{
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
switch(type) {
case NGHTTP2_SYN_STREAM: {
if(LOG_ENABLED(INFO)) {
ULOG(INFO, upstream) << "Received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
auto upstream = reinterpret_cast<SpdyUpstream*>(user_data);
switch(frame->hd.type) {
case NGHTTP2_HEADERS: {
if(frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
break;
}
Downstream *downstream;
downstream = new Downstream(upstream,
frame->syn_stream.stream_id,
frame->syn_stream.pri);
if(LOG_ENABLED(INFO)) {
ULOG(INFO, upstream) << "Received upstream request HEADERS stream_id="
<< frame->hd.stream_id;
}
auto downstream = new Downstream(upstream,
frame->hd.stream_id,
frame->headers.pri);
upstream->add_downstream(downstream);
downstream->init_response_body_buf();
char **nv = frame->syn_stream.nv;
const char *path = 0;
const char *scheme = 0;
const char *host = 0;
const char *method = 0;
for(size_t i = 0; nv[i]; i += 2) {
if(strcmp(nv[i], ":path") == 0) {
path = nv[i+1];
} else if(strcmp(nv[i], ":scheme") == 0) {
scheme = nv[i+1];
} else if(strcmp(nv[i], ":method") == 0) {
method = nv[i+1];
downstream->set_request_method(nv[i+1]);
} else if(strcmp(nv[i], ":host") == 0) {
host = nv[i+1];
} else if(nv[i][0] != ':') {
downstream->add_request_header(nv[i], nv[i+1]);
auto nva = frame->headers.nva;
std::string path, scheme, host, method;
for(size_t i = 0; i < frame->headers.nvlen; ++i) {
if(util::strieq(":path", nva[i].name, nva[i].namelen)) {
path.assign(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen);
} else if(util::strieq(":scheme", nva[i].name, nva[i].namelen)) {
scheme.assign(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen);
} else if(util::strieq(":method", nva[i].name, nva[i].namelen)) {
method.assign(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen);
downstream->set_request_method(method);
} else if(util::strieq(":host", nva[i].name, nva[i].namelen)) {
host.assign(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen);
} else if(nva[i].namelen > 0 && nva[i].name[0] != ':') {
downstream->add_request_header
(std::string(reinterpret_cast<char*>(nva[i].name), nva[i].namelen),
std::string(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen));
}
}
if(!path || !host || !method) {
upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
if(path.empty() || host.empty() || method.empty()) {
upstream->rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
return;
}
// SpdyDownstreamConnection examines request path to find
// scheme. We construct abs URI for spdy_bridge mode as well as
// spdy_proxy mode.
if((get_config()->spdy_proxy || get_config()->spdy_bridge) &&
scheme && path[0] == '/') {
!scheme.empty() && path[0] == '/') {
std::string reqpath = scheme;
reqpath += "://";
reqpath += host;
@ -195,16 +195,19 @@ void on_ctrl_recv_callback
if(LOG_ENABLED(INFO)) {
std::stringstream ss;
for(size_t i = 0; nv[i]; i += 2) {
ss << TTY_HTTP_HD << nv[i] << TTY_RST << ": " << nv[i+1] << "\n";
for(size_t i = 0; i < frame->headers.nvlen; ++i) {
ss << TTY_HTTP_HD;
ss.write(reinterpret_cast<char*>(nva[i].name), nva[i].namelen);
ss << TTY_RST << ": ";
ss.write(reinterpret_cast<char*>(nva[i].value), nva[i].valuelen);
ss << "\n";
}
ULOG(INFO, upstream) << "HTTP request headers. stream_id="
<< downstream->get_stream_id()
<< "\n" << ss.str();
}
DownstreamConnection *dconn;
dconn = upstream->get_client_handler()->get_downstream_connection();
auto dconn = upstream->get_client_handler()->get_downstream_connection();
int rv = dconn->attach_downstream(downstream);
if(rv != 0) {
// If downstream connection fails, issue RST_STREAM.
@ -218,7 +221,7 @@ void on_ctrl_recv_callback
return;
}
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->syn_stream.hd.flags & NGHTTP2_CTRL_FLAG_FIN) {
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
break;
@ -256,7 +259,7 @@ void on_data_chunk_recv_callback(nghttp2_session *session,
return;
}
}
if(flags & NGHTTP2_DATA_FLAG_FIN) {
if(flags & NGHTTP2_FLAG_END_STREAM) {
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
}
@ -264,19 +267,19 @@ void on_data_chunk_recv_callback(nghttp2_session *session,
} // namespace
namespace {
void on_ctrl_not_send_callback(nghttp2_session *session,
nghttp2_frame_type type,
nghttp2_frame *frame,
int error_code, void *user_data)
void on_frame_not_send_callback(nghttp2_session *session,
nghttp2_frame *frame,
int lib_error_code, void *user_data)
{
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
ULOG(WARNING, upstream) << "Failed to send control frame type=" << type
<< ", error_code=" << error_code << ":"
<< nghttp2_strerror(error_code);
if(type == NGHTTP2_SYN_REPLY) {
auto upstream = reinterpret_cast<SpdyUpstream*>(user_data);
ULOG(WARNING, upstream) << "Failed to send control frame type="
<< frame->hd.type
<< ", lib_error_code=" << lib_error_code << ":"
<< nghttp2_strerror(lib_error_code);
if(frame->hd.type == NGHTTP2_HEADERS &&
frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
// To avoid stream hanging around, issue RST_STREAM.
int32_t stream_id = frame->syn_reply.stream_id;
Downstream *downstream = upstream->find_downstream(stream_id);
auto downstream = upstream->find_downstream(frame->hd.stream_id);
if(downstream) {
upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
@ -285,30 +288,30 @@ void on_ctrl_not_send_callback(nghttp2_session *session,
} // namespace
namespace {
void on_ctrl_recv_parse_error_callback(nghttp2_session *session,
nghttp2_frame_type type,
const uint8_t *head, size_t headlen,
const uint8_t *payload,
size_t payloadlen, int error_code,
void *user_data)
void on_frame_recv_parse_error_callback(nghttp2_session *session,
nghttp2_frame_type type,
const uint8_t *head, size_t headlen,
const uint8_t *payload,
size_t payloadlen, int lib_error_code,
void *user_data)
{
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
auto upstream = reinterpret_cast<SpdyUpstream*>(user_data);
if(LOG_ENABLED(INFO)) {
ULOG(INFO, upstream) << "Failed to parse received control frame. type="
<< type
<< ", error_code=" << error_code << ":"
<< nghttp2_strerror(error_code);
<< ", error_code=" << lib_error_code << ":"
<< nghttp2_strerror(lib_error_code);
}
}
} // namespace
namespace {
void on_unknown_ctrl_recv_callback(nghttp2_session *session,
const uint8_t *head, size_t headlen,
const uint8_t *payload, size_t payloadlen,
void *user_data)
void on_unknown_frame_recv_callback(nghttp2_session *session,
const uint8_t *head, size_t headlen,
const uint8_t *payload, size_t payloadlen,
void *user_data)
{
SpdyUpstream *upstream = reinterpret_cast<SpdyUpstream*>(user_data);
auto upstream = reinterpret_cast<SpdyUpstream*>(user_data);
if(LOG_ENABLED(INFO)) {
ULOG(INFO, upstream) << "Received unknown control frame.";
}
@ -316,21 +319,22 @@ void on_unknown_ctrl_recv_callback(nghttp2_session *session,
} // namespace
namespace {
uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_status_code)
nghttp2_error_code infer_upstream_rst_stream_error_code
(nghttp2_error_code downstream_error_code)
{
// Only propagate NGHTTP2_REFUSED_STREAM so that upstream client
// can resend request.
if(downstream_status_code != NGHTTP2_REFUSED_STREAM) {
if(downstream_error_code != NGHTTP2_REFUSED_STREAM) {
return NGHTTP2_INTERNAL_ERROR;
} else {
return downstream_status_code;
return downstream_error_code;
}
}
} // namespace
SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
SpdyUpstream::SpdyUpstream(ClientHandler *handler)
: handler_(handler),
session_(0)
session_(nullptr)
{
//handler->set_bev_cb(spdy_readcb, 0, spdy_eventcb);
handler->set_upstream_timeouts(&get_config()->spdy_upstream_read_timeout,
@ -341,44 +345,41 @@ SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
callbacks.send_callback = send_callback;
callbacks.recv_callback = recv_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
callbacks.on_ctrl_recv_callback = on_ctrl_recv_callback;
callbacks.on_frame_recv_callback = on_frame_recv_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
callbacks.on_ctrl_not_send_callback = on_ctrl_not_send_callback;
callbacks.on_ctrl_recv_parse_error_callback =
on_ctrl_recv_parse_error_callback;
callbacks.on_unknown_ctrl_recv_callback = on_unknown_ctrl_recv_callback;
callbacks.on_frame_not_send_callback = on_frame_not_send_callback;
callbacks.on_frame_recv_parse_error_callback =
on_frame_recv_parse_error_callback;
callbacks.on_unknown_frame_recv_callback = on_unknown_frame_recv_callback;
int rv;
rv = nghttp2_session_server_new(&session_, version, &callbacks, this);
rv = nghttp2_session_server_new(&session_, &callbacks, this);
assert(rv == 0);
int val = 1;
flow_control_ = true;
initial_window_size_ = 1 << get_config()->spdy_upstream_window_bits;
rv = nghttp2_session_set_option(session_,
NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE, &val,
sizeof(val));
assert(rv == 0);
if(version == NGHTTP2_PROTO_SPDY3) {
int val = 1;
flow_control_ = true;
initial_window_size_ = 1 << get_config()->spdy_upstream_window_bits;
rv = nghttp2_session_set_option(session_,
NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE, &val,
sizeof(val));
assert(rv == 0);
} else {
flow_control_ = false;
initial_window_size_ = 0;
}
// TODO Maybe call from outside?
nghttp2_settings_entry entry[2];
entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[0].value = get_config()->spdy_max_concurrent_streams;
entry[0].flags = NGHTTP2_ID_FLAG_SETTINGS_NONE;
entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry[1].value = initial_window_size_;
entry[1].flags = NGHTTP2_ID_FLAG_SETTINGS_NONE;
rv = nghttp2_submit_settings
(session_, NGHTTP2_FLAG_SETTINGS_NONE,
entry, sizeof(entry)/sizeof(nghttp2_settings_entry));
(session_, entry, sizeof(entry)/sizeof(nghttp2_settings_entry));
assert(rv == 0);
// TODO Maybe call from outside?
// Disable connection-level flow control
rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_END_FLOW_CONTROL,
0, 0);
assert(rv == 0);
send();
}
@ -467,8 +468,8 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr)
// RST_STREAM to the upstream and delete downstream connection
// here. Deleting downstream will be taken place at
// on_stream_close_callback.
upstream->rst_stream(downstream, infer_upstream_rst_stream_status_code
(downstream->get_response_rst_stream_status_code()));
upstream->rst_stream(downstream, infer_upstream_rst_stream_error_code
(downstream->get_response_rst_stream_error_code()));
downstream->set_downstream_connection(0);
delete dconn;
dconn = 0;
@ -630,7 +631,8 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
} // namespace
int SpdyUpstream::rst_stream(Downstream *downstream, int status_code)
int SpdyUpstream::rst_stream(Downstream *downstream,
nghttp2_error_code error_code)
{
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "RST_STREAM stream_id="
@ -638,7 +640,7 @@ int SpdyUpstream::rst_stream(Downstream *downstream, int status_code)
}
int rv;
rv = nghttp2_submit_rst_stream(session_, downstream->get_stream_id(),
status_code);
error_code);
if(rv < NGHTTP2_ERR_FATAL) {
ULOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: "
<< nghttp2_strerror(rv);
@ -650,7 +652,8 @@ int SpdyUpstream::rst_stream(Downstream *downstream, int status_code)
int SpdyUpstream::window_update(Downstream *downstream)
{
int rv;
rv = nghttp2_submit_window_update(session_, downstream->get_stream_id(),
rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_NONE,
downstream->get_stream_id(),
downstream->get_recv_window_size());
downstream->set_recv_window_size(0);
if(rv < NGHTTP2_ERR_FATAL) {
@ -685,8 +688,8 @@ ssize_t spdy_data_read_callback(nghttp2_session *session,
ULOG(INFO, upstream) << "RST_STREAM to tunneled stream stream_id="
<< stream_id;
}
upstream->rst_stream(downstream, infer_upstream_rst_stream_status_code
(downstream->get_response_rst_stream_status_code()));
upstream->rst_stream(downstream, infer_upstream_rst_stream_error_code
(downstream->get_response_rst_stream_error_code()));
}
}
if(nread == 0 && *eof != 1) {

View File

@ -38,7 +38,7 @@ class ClientHandler;
class SpdyUpstream : public Upstream {
public:
SpdyUpstream(uint16_t version, ClientHandler *handler);
SpdyUpstream(ClientHandler *handler);
virtual ~SpdyUpstream();
virtual int on_read();
virtual int on_write();
@ -54,7 +54,7 @@ public:
nghttp2_session* get_spdy_session();
int rst_stream(Downstream *downstream, int status_code);
int rst_stream(Downstream *downstream, nghttp2_error_code error_code);
int window_update(Downstream *downstream);
int error_reply(Downstream *downstream, int status_code);