nghttpx: Support multiple -b option for HTTP/2 backend

This commit is contained in:
Tatsuhiro Tsujikawa 2015-03-10 21:54:29 +09:00
parent 6b714030dd
commit c5860fc6f4
6 changed files with 62 additions and 29 deletions

View File

@ -949,13 +949,10 @@ Options:
Connections: Connections:
-b, --backend=<HOST,PORT> -b, --backend=<HOST,PORT>
Set backend host and port. For HTTP/1 backend, multiple Set backend host and port. The multiple backend
backend addresses are accepted by repeating this option. addresses are accepted by repeating this option. UNIX
HTTP/2 backend does not support multiple backend domain socket can be specified by prefixing path name
addresses and the first occurrence of this option is with "unix:" (e.g., unix:/var/run/backend.sock)
used. UNIX domain socket can be specified by prefixing
path name with "unix:" (e.g.,
unix:/var/run/backend.sock)
Default: )" << DEFAULT_DOWNSTREAM_HOST << "," Default: )" << DEFAULT_DOWNSTREAM_HOST << ","
<< DEFAULT_DOWNSTREAM_PORT << R"( << DEFAULT_DOWNSTREAM_PORT << R"(
-f, --frontend=<HOST,PORT> -f, --frontend=<HOST,PORT>

View File

@ -258,15 +258,20 @@ int Http2DownstreamConnection::push_request_headers() {
get_config()->client_proxy || get_config()->client_proxy ||
downstream_->get_request_method() == "CONNECT"; downstream_->get_request_method() == "CONNECT";
// http2session_ has already in CONNECTED state, so we can get
// addr_idx here.
auto addr_idx = http2session_->get_addr_idx();
auto &downstream_addr = get_config()->downstream_addrs[addr_idx];
const char *authority = nullptr, *host = nullptr; const char *authority = nullptr, *host = nullptr;
if (!no_host_rewrite) { if (!no_host_rewrite) {
// HTTP/2 backend does not support multiple address, so we always // HTTP/2 backend does not support multiple address, so we always
// use index = 0. // use index = 0.
if (!downstream_->get_request_http2_authority().empty()) { if (!downstream_->get_request_http2_authority().empty()) {
authority = get_config()->downstream_addrs[0].hostport.get(); authority = downstream_addr.hostport.get();
} }
if (downstream_->get_request_header(http2::HD_HOST)) { if (downstream_->get_request_header(http2::HD_HOST)) {
host = get_config()->downstream_addrs[0].hostport.get(); host = downstream_addr.hostport.get();
} }
} else { } else {
if (!downstream_->get_request_http2_authority().empty()) { if (!downstream_->get_request_http2_authority().empty()) {
@ -281,7 +286,7 @@ int Http2DownstreamConnection::push_request_headers() {
if (!authority && !host) { if (!authority && !host) {
// upstream is HTTP/1.0. We use backend server's host // upstream is HTTP/1.0. We use backend server's host
// nonetheless. // nonetheless.
host = get_config()->downstream_addrs[0].hostport.get(); host = downstream_addr.hostport.get();
} }
if (authority) { if (authority) {

View File

@ -39,6 +39,7 @@
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_ssl.h" #include "shrpx_ssl.h"
#include "shrpx_http.h" #include "shrpx_http.h"
#include "shrpx_worker.h"
#include "http2.h" #include "http2.h"
#include "util.h" #include "util.h"
#include "base64.h" #include "base64.h"
@ -138,13 +139,15 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
} }
} // namespace } // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx) Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Worker *worker)
: conn_(loop, -1, nullptr, get_config()->downstream_write_timeout, : conn_(loop, -1, nullptr, get_config()->downstream_write_timeout,
get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb, get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb,
timeoutcb, this), timeoutcb, this),
ssl_ctx_(ssl_ctx), session_(nullptr), data_pending_(nullptr), worker_(worker), ssl_ctx_(ssl_ctx), session_(nullptr),
data_pendinglen_(0), state_(DISCONNECTED), data_pending_(nullptr), data_pendinglen_(0), addr_idx_(0),
connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false) { state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE),
flow_control_(false) {
read_ = write_ = &Http2Session::noop; read_ = write_ = &Http2Session::noop;
on_read_ = on_write_ = &Http2Session::noop; on_read_ = on_write_ = &Http2Session::noop;
@ -185,6 +188,8 @@ int Http2Session::disconnect(bool hard) {
conn_.disconnect(); conn_.disconnect();
addr_idx_ = 0;
if (proxy_htp_) { if (proxy_htp_) {
proxy_htp_.reset(); proxy_htp_.reset();
} }
@ -230,6 +235,21 @@ int Http2Session::check_cert() { return ssl::check_cert(conn_.tls.ssl); }
int Http2Session::initiate_connection() { int Http2Session::initiate_connection() {
int rv = 0; int rv = 0;
if (state_ == DISCONNECTED) {
auto worker_stat = worker_->get_worker_stat();
addr_idx_ = worker_stat->next_downstream;
++worker_stat->next_downstream;
worker_stat->next_downstream %= get_config()->downstream_addrs.size();
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Using downstream address idx=" << addr_idx_
<< " out of " << get_config()->downstream_addrs.size();
}
}
auto &downstream_addr = get_config()->downstream_addrs[addr_idx_];
if (get_config()->downstream_http_proxy_host && state_ == DISCONNECTED) { if (get_config()->downstream_http_proxy_host && state_ == DISCONNECTED) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Connecting to the proxy " SSLOG(INFO, this) << "Connecting to the proxy "
@ -292,7 +312,7 @@ int Http2Session::initiate_connection() {
if (get_config()->backend_tls_sni_name) { if (get_config()->backend_tls_sni_name) {
sni_name = get_config()->backend_tls_sni_name.get(); sni_name = get_config()->backend_tls_sni_name.get();
} else { } else {
sni_name = get_config()->downstream_addrs[0].host.get(); sni_name = downstream_addr.host.get();
} }
if (sni_name && !util::numeric_host(sni_name)) { if (sni_name && !util::numeric_host(sni_name)) {
@ -307,16 +327,15 @@ int Http2Session::initiate_connection() {
assert(conn_.fd == -1); assert(conn_.fd == -1);
conn_.fd = util::create_nonblock_socket( conn_.fd = util::create_nonblock_socket(
get_config()->downstream_addrs[0].addr.storage.ss_family); downstream_addr.addr.storage.ss_family);
if (conn_.fd == -1) { if (conn_.fd == -1) {
return -1; return -1;
} }
rv = connect( rv = connect(conn_.fd,
conn_.fd, // TODO maybe not thread-safe?
// TODO maybe not thread-safe? const_cast<sockaddr *>(&downstream_addr.addr.sa),
const_cast<sockaddr *>(&get_config()->downstream_addrs[0].addr.sa), downstream_addr.addrlen);
get_config()->downstream_addrs[0].addrlen);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {
return -1; return -1;
} }
@ -336,15 +355,14 @@ int Http2Session::initiate_connection() {
assert(conn_.fd == -1); assert(conn_.fd == -1);
conn_.fd = util::create_nonblock_socket( conn_.fd = util::create_nonblock_socket(
get_config()->downstream_addrs[0].addr.storage.ss_family); downstream_addr.addr.storage.ss_family);
if (conn_.fd == -1) { if (conn_.fd == -1) {
return -1; return -1;
} }
rv = connect(conn_.fd, const_cast<sockaddr *>( rv = connect(conn_.fd, const_cast<sockaddr *>(&downstream_addr.addr.sa),
&get_config()->downstream_addrs[0].addr.sa), downstream_addr.addrlen);
get_config()->downstream_addrs[0].addrlen);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {
return -1; return -1;
} }
@ -469,10 +487,12 @@ int Http2Session::downstream_connect_proxy() {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Connected to the proxy"; SSLOG(INFO, this) << "Connected to the proxy";
} }
auto &downstream_addr = get_config()->downstream_addrs[addr_idx_];
std::string req = "CONNECT "; std::string req = "CONNECT ";
req += get_config()->downstream_addrs[0].hostport.get(); req += downstream_addr.hostport.get();
req += " HTTP/1.1\r\nHost: "; req += " HTTP/1.1\r\nHost: ";
req += get_config()->downstream_addrs[0].host.get(); req += downstream_addr.host.get();
req += "\r\n"; req += "\r\n";
if (get_config()->downstream_http_proxy_userinfo) { if (get_config()->downstream_http_proxy_userinfo) {
req += "Proxy-Authorization: Basic "; req += "Proxy-Authorization: Basic ";
@ -1694,4 +1714,6 @@ bool Http2Session::should_hard_fail() const {
} }
} }
size_t Http2Session::get_addr_idx() const { return addr_idx_; }
} // namespace shrpx } // namespace shrpx

View File

@ -46,6 +46,7 @@ using namespace nghttp2;
namespace shrpx { namespace shrpx {
class Http2DownstreamConnection; class Http2DownstreamConnection;
class Worker;
struct StreamData { struct StreamData {
Http2DownstreamConnection *dconn; Http2DownstreamConnection *dconn;
@ -53,7 +54,7 @@ struct StreamData {
class Http2Session { class Http2Session {
public: public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx); Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker);
~Http2Session(); ~Http2Session();
int check_cert(); int check_cert();
@ -143,6 +144,8 @@ public:
void submit_pending_requests(); void submit_pending_requests();
size_t get_addr_idx() const;
enum { enum {
// Disconnected // Disconnected
DISCONNECTED, DISCONNECTED,
@ -188,11 +191,14 @@ private:
std::function<int(Http2Session &)> on_read_, on_write_; std::function<int(Http2Session &)> on_read_, on_write_;
// Used to parse the response from HTTP proxy // Used to parse the response from HTTP proxy
std::unique_ptr<http_parser> proxy_htp_; std::unique_ptr<http_parser> proxy_htp_;
Worker *worker_;
// NULL if no TLS is configured // NULL if no TLS is configured
SSL_CTX *ssl_ctx_; SSL_CTX *ssl_ctx_;
nghttp2_session *session_; nghttp2_session *session_;
const uint8_t *data_pending_; const uint8_t *data_pending_;
size_t data_pendinglen_; size_t data_pendinglen_;
// index of get_config()->downstream_addrs this object uses
size_t addr_idx_;
int state_; int state_;
int connection_check_state_; int connection_check_state_;
bool flow_control_; bool flow_control_;

View File

@ -59,7 +59,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
ev_async_start(loop_, &w_); ev_async_start(loop_, &w_);
if (get_config()->downstream_proto == PROTO_HTTP2) { if (get_config()->downstream_proto == PROTO_HTTP2) {
http2session_ = make_unique<Http2Session>(loop_, cl_ssl_ctx_); http2session_ = make_unique<Http2Session>(loop_, cl_ssl_ctx, this);
} else { } else {
http1_connect_blocker_ = make_unique<ConnectBlocker>(loop_); http1_connect_blocker_ = make_unique<ConnectBlocker>(loop_);
} }
@ -197,6 +197,8 @@ struct ev_loop *Worker::get_loop() const {
SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; } SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }
void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; } void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; } bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }

View File

@ -98,6 +98,7 @@ public:
ConnectBlocker *get_http1_connect_blocker() const; ConnectBlocker *get_http1_connect_blocker() const;
struct ev_loop *get_loop() const; struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_sv_ssl_ctx() const;
SSL_CTX *get_cl_ssl_ctx() const;
void set_graceful_shutdown(bool f); void set_graceful_shutdown(bool f);
bool get_graceful_shutdown() const; bool get_graceful_shutdown() const;