Share I/O code with all upstreams/downstream objects

This commit is contained in:
Tatsuhiro Tsujikawa 2015-02-04 21:15:58 +09:00
parent a4d729d36b
commit b2fb888363
17 changed files with 733 additions and 682 deletions

View File

@ -119,6 +119,7 @@ NGHTTPX_SRCS = \
shrpx_connect_blocker.cc shrpx_connect_blocker.h \ shrpx_connect_blocker.cc shrpx_connect_blocker.h \
shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \ shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \
shrpx_rate_limit.cc shrpx_rate_limit.h \ shrpx_rate_limit.cc shrpx_rate_limit.h \
shrpx_connection.cc shrpx_connection.h \
buffer.h memchunk.h buffer.h memchunk.h
if HAVE_SPDYLAY if HAVE_SPDYLAY

View File

@ -231,6 +231,29 @@ using Memchunk16K = Memchunk<16384>;
using MemchunkPool = Pool<Memchunk16K>; using MemchunkPool = Pool<Memchunk16K>;
using DefaultMemchunks = Memchunks<Memchunk16K>; using DefaultMemchunks = Memchunks<Memchunk16K>;
#define DEFAULT_WR_IOVCNT 16
#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT
#define MAX_WR_IOVCNT IOV_MAX
#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT
#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) {
if (max == 0) {
return 0;
}
for (int i = 0; i < iovcnt; ++i) {
auto d = std::min(max, iov[i].iov_len);
iov[i].iov_len = d;
max -= d;
if (max == 0) {
return i + 1;
}
}
return iovcnt;
}
} // namespace nghttp2 } // namespace nghttp2
#endif // MEMCHUNK_H #endif // MEMCHUNK_H

View File

@ -123,21 +123,6 @@ template <size_t N> struct RingBuf {
uint8_t begin[N]; uint8_t begin[N];
}; };
inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) {
if (max == 0) {
return 0;
}
for (int i = 0; i < iovcnt; ++i) {
auto d = std::min(max, iov[i].iov_len);
iov[i].iov_len = d;
max -= d;
if (max == 0) {
return i + 1;
}
}
return iovcnt;
}
} // namespace nghttp2 } // namespace nghttp2
#endif // RINGBUF_H #endif // RINGBUF_H

View File

@ -49,7 +49,8 @@ namespace shrpx {
namespace { namespace {
void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
auto handler = static_cast<ClientHandler *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto handler = static_cast<ClientHandler *>(conn->data);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
CLOG(INFO, handler) << "Time out"; CLOG(INFO, handler) << "Time out";
@ -73,7 +74,8 @@ void shutdowncb(struct ev_loop *loop, ev_timer *w, int revents) {
namespace { namespace {
void readcb(struct ev_loop *loop, ev_io *w, int revents) { void readcb(struct ev_loop *loop, ev_io *w, int revents) {
auto handler = static_cast<ClientHandler *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto handler = static_cast<ClientHandler *>(conn->data);
if (handler->do_read() != 0) { if (handler->do_read() != 0) {
delete handler; delete handler;
@ -84,7 +86,8 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) {
namespace { namespace {
void writecb(struct ev_loop *loop, ev_io *w, int revents) { void writecb(struct ev_loop *loop, ev_io *w, int revents) {
auto handler = static_cast<ClientHandler *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto handler = static_cast<ClientHandler *>(conn->data);
if (handler->do_write() != 0) { if (handler->do_write() != 0) {
delete handler; delete handler;
@ -94,7 +97,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace } // namespace
int ClientHandler::read_clear() { int ClientHandler::read_clear() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
for (;;) { for (;;) {
// we should process buffered data first before we read EOF. // we should process buffered data first before we read EOF.
@ -106,53 +109,33 @@ int ClientHandler::read_clear() {
} }
rb_.reset(); rb_.reset();
ssize_t nread = std::min(rb_.wleft(), rlimit_.avail()); auto nread = conn_.read_clear(rb_.last, rb_.wleft());
if (nread == 0) {
break;
}
while ((nread = read(fd_, rb_.last, nread)) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
return -1;
}
if (nread == 0) { if (nread == 0) {
return 0;
}
if (nread < 0) {
return -1; return -1;
} }
rb_.write(nread); rb_.write(nread);
rlimit_.drain(nread);
} }
return 0;
} }
int ClientHandler::write_clear() { int ClientHandler::write_clear() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
for (;;) { for (;;) {
if (wb_.rleft() > 0) { if (wb_.rleft() > 0) {
ssize_t nwrite = std::min(wb_.rleft(), wlimit_.avail()); auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft());
if (nwrite == 0) { if (nwrite == 0) {
return 0; return 0;
} }
if (nwrite < 0) {
while ((nwrite = write(fd_, wb_.pos, nwrite)) == -1 && errno == EINTR)
;
if (nwrite == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
wlimit_.startw();
ev_timer_again(loop_, &wt_);
return 0;
}
return -1; return -1;
} }
wb_.drain(nwrite); wb_.drain(nwrite);
wlimit_.drain(nwrite);
continue; continue;
} }
wb_.reset(); wb_.reset();
@ -164,54 +147,34 @@ int ClientHandler::write_clear() {
} }
} }
wlimit_.stopw(); conn_.wlimit.stopw();
ev_timer_stop(loop_, &wt_); ev_timer_stop(conn_.loop, &conn_.wt);
return 0; return 0;
} }
int ClientHandler::tls_handshake() { int ClientHandler::tls_handshake() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error(); ERR_clear_error();
auto rv = SSL_do_handshake(ssl_); auto rv = conn_.tls_handshake();
if (rv == 0) { if (rv == SHRPX_ERR_INPROGRESS) {
return -1; return 0;
} }
if (rv < 0) { if (rv < 0) {
auto err = SSL_get_error(ssl_, rv); return -1;
switch (err) {
case SSL_ERROR_WANT_READ:
wlimit_.stopw();
ev_timer_stop(loop_, &wt_);
return 0;
case SSL_ERROR_WANT_WRITE:
wlimit_.startw();
ev_timer_again(loop_, &wt_);
return 0;
default:
return -1;
}
} }
wlimit_.stopw();
ev_timer_stop(loop_, &wt_);
set_tls_handshake(true);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "SSL/TLS handshake completed"; CLOG(INFO, this) << "SSL/TLS handshake completed";
} }
if (validate_next_proto() != 0) { if (validate_next_proto() != 0) {
return -1; return -1;
} }
if (LOG_ENABLED(INFO)) {
if (SSL_session_reused(ssl_)) {
CLOG(INFO, this) << "SSL/TLS session reused";
}
}
read_ = &ClientHandler::read_tls; read_ = &ClientHandler::read_tls;
write_ = &ClientHandler::write_tls; write_ = &ClientHandler::write_tls;
@ -220,7 +183,7 @@ int ClientHandler::tls_handshake() {
} }
int ClientHandler::read_tls() { int ClientHandler::read_tls() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error(); ERR_clear_error();
@ -234,116 +197,38 @@ int ClientHandler::read_tls() {
} }
rb_.reset(); rb_.reset();
ssize_t nread; auto nread = conn_.read_tls(rb_.last, rb_.wleft());
// SSL_read requires the same arguments (buf pointer and its
// length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE. if (nread == 0) {
// rlimit_.avail() or rlimit_.avail() may return different length return 0;
// than the length previously passed to SSL_read, which violates
// OpenSSL assumption. To avoid this, we keep last legnth passed
// to SSL_read to tls_last_readlen_ if SSL_read indicated I/O
// blocking.
if (tls_last_readlen_ == 0) {
nread = std::min(rb_.wleft(), rlimit_.avail());
if (nread == 0) {
return 0;
}
} else {
nread = tls_last_readlen_;
tls_last_readlen_ = 0;
} }
auto rv = SSL_read(ssl_, rb_.last, nread); if (nread < 0) {
if (rv == 0) {
return -1; return -1;
} }
if (rv < 0) { rb_.write(nread);
auto err = SSL_get_error(ssl_, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
tls_last_readlen_ = nread;
return 0;
case SSL_ERROR_WANT_WRITE:
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Close connection due to TLS renegotiation";
}
return -1;
default:
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "SSL_read: SSL_get_error returned " << err;
}
return -1;
}
}
rb_.write(rv);
rlimit_.drain(rv);
} }
} }
int ClientHandler::write_tls() { int ClientHandler::write_tls() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error(); ERR_clear_error();
for (;;) { for (;;) {
if (wb_.rleft() > 0) { if (wb_.rleft() > 0) {
ssize_t nwrite; auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft());
// SSL_write requires the same arguments (buf pointer and its
// length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.
// get_write_limit() may return smaller length than previously
// passed to SSL_write, which violates OpenSSL assumption. To
// avoid this, we keep last legnth passed to SSL_write to
// tls_last_writelen_ if SSL_write indicated I/O blocking.
if (tls_last_writelen_ == 0) {
nwrite = std::min(wb_.rleft(), wlimit_.avail());
if (nwrite == 0) {
return 0;
}
auto limit = get_write_limit(); if (nwrite == 0) {
if (limit != -1) { return 0;
nwrite = std::min(nwrite, limit);
}
} else {
nwrite = tls_last_writelen_;
tls_last_writelen_ = 0;
} }
auto rv = SSL_write(ssl_, wb_.pos, nwrite); if (nwrite < 0) {
if (rv == 0) {
return -1; return -1;
} }
update_last_write_time(); wb_.drain(nwrite);
if (rv < 0) {
auto err = SSL_get_error(ssl_, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Close connection due to TLS renegotiation";
}
return -1;
case SSL_ERROR_WANT_WRITE:
tls_last_writelen_ = nwrite;
wlimit_.startw();
ev_timer_again(loop_, &wt_);
return 0;
default:
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "SSL_write: SSL_get_error returned " << err;
}
return -1;
}
}
wb_.drain(rv);
wlimit_.drain(rv);
update_warmup_writelen(rv);
continue; continue;
} }
@ -356,8 +241,8 @@ int ClientHandler::write_tls() {
} }
} }
wlimit_.stopw(); conn_.wlimit.stopw();
ev_timer_stop(loop_, &wt_); ev_timer_stop(conn_.loop, &conn_.wt);
return 0; return 0;
} }
@ -464,40 +349,27 @@ ClientHandler::ClientHandler(struct ev_loop *loop, int fd, SSL *ssl,
const char *ipaddr, const char *port, const char *ipaddr, const char *port,
WorkerStat *worker_stat, WorkerStat *worker_stat,
DownstreamConnectionPool *dconn_pool) DownstreamConnectionPool *dconn_pool)
: ipaddr_(ipaddr), port_(port), : conn_(loop, fd, ssl, get_config()->upstream_write_timeout,
wlimit_(loop, &wev_, get_config()->write_rate, get_config()->write_burst), get_config()->upstream_read_timeout, get_config()->write_rate,
rlimit_(loop, &rev_, get_config()->read_rate, get_config()->read_burst), get_config()->write_burst, get_config()->read_rate,
loop_(loop), dconn_pool_(dconn_pool), http2session_(nullptr), get_config()->read_burst, writecb, readcb, timeoutcb, this),
http1_connect_blocker_(nullptr), ssl_(ssl), worker_stat_(worker_stat), ipaddr_(ipaddr), port_(port), dconn_pool_(dconn_pool),
last_write_time_(0.), warmup_writelen_(0), http2session_(nullptr), http1_connect_blocker_(nullptr),
worker_stat_(worker_stat),
left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN), left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN),
tls_last_writelen_(0), tls_last_readlen_(0), fd_(fd), should_close_after_write_(false) {
should_close_after_write_(false), tls_handshake_(false),
tls_renegotiation_(false) {
++worker_stat->num_connections; ++worker_stat->num_connections;
ev_io_init(&wev_, writecb, fd_, EV_WRITE);
ev_io_init(&rev_, readcb, fd_, EV_READ);
wev_.data = this;
rev_.data = this;
ev_timer_init(&wt_, timeoutcb, 0., get_config()->upstream_write_timeout);
ev_timer_init(&rt_, timeoutcb, 0., get_config()->upstream_read_timeout);
wt_.data = this;
rt_.data = this;
ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.); ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
reneg_shutdown_timer_.data = this; reneg_shutdown_timer_.data = this;
rlimit_.startw(); conn_.rlimit.startw();
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
if (ssl_) { if (conn_.tls.ssl) {
SSL_set_app_data(ssl_, reinterpret_cast<char *>(this)); SSL_set_app_data(conn_.tls.ssl, &conn_);
read_ = write_ = &ClientHandler::tls_handshake; read_ = write_ = &ClientHandler::tls_handshake;
on_read_ = &ClientHandler::upstream_noop; on_read_ = &ClientHandler::upstream_noop;
on_write_ = &ClientHandler::upstream_write; on_write_ = &ClientHandler::upstream_write;
@ -525,33 +397,14 @@ ClientHandler::~ClientHandler() {
--worker_stat_->num_connections; --worker_stat_->num_connections;
ev_timer_stop(loop_, &reneg_shutdown_timer_); ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
ev_timer_stop(loop_, &rt_);
ev_timer_stop(loop_, &wt_);
ev_io_stop(loop_, &rev_);
ev_io_stop(loop_, &wev_);
// TODO If backend is http/2, and it is in CONNECTED state, signal // TODO If backend is http/2, and it is in CONNECTED state, signal
// it and make it loopbreak when output is zero. // it and make it loopbreak when output is zero.
if (worker_config->graceful_shutdown && worker_stat_->num_connections == 0) { if (worker_config->graceful_shutdown && worker_stat_->num_connections == 0) {
ev_break(loop_); ev_break(conn_.loop);
} }
if (ssl_) {
SSL_set_app_data(ssl_, nullptr);
SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN);
ERR_clear_error();
SSL_shutdown(ssl_);
}
if (ssl_) {
SSL_free(ssl_);
}
shutdown(fd_, SHUT_WR);
close(fd_);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Deleted"; CLOG(INFO, this) << "Deleted";
} }
@ -560,20 +413,20 @@ ClientHandler::~ClientHandler() {
Upstream *ClientHandler::get_upstream() { return upstream_.get(); } Upstream *ClientHandler::get_upstream() { return upstream_.get(); }
struct ev_loop *ClientHandler::get_loop() const { struct ev_loop *ClientHandler::get_loop() const {
return loop_; return conn_.loop;
} }
void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) { void ClientHandler::reset_upstream_read_timeout(ev_tstamp t) {
rt_.repeat = t; conn_.rt.repeat = t;
if (ev_is_active(&rt_)) { if (ev_is_active(&conn_.rt)) {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
} }
} }
void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) { void ClientHandler::reset_upstream_write_timeout(ev_tstamp t) {
wt_.repeat = t; conn_.wt.repeat = t;
if (ev_is_active(&wt_)) { if (ev_is_active(&conn_.wt)) {
ev_timer_again(loop_, &wt_); ev_timer_again(conn_.loop, &conn_.wt);
} }
} }
@ -585,7 +438,7 @@ int ClientHandler::validate_next_proto() {
// First set callback for catch all cases // First set callback for catch all cases
on_read_ = &ClientHandler::upstream_read; on_read_ = &ClientHandler::upstream_read;
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len); SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
if (next_proto) { if (next_proto) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -604,7 +457,7 @@ int ClientHandler::validate_next_proto() {
auto http2_upstream = util::make_unique<Http2Upstream>(this); auto http2_upstream = util::make_unique<Http2Upstream>(this);
if (!ssl::check_http2_requirement(ssl_)) { if (!ssl::check_http2_requirement(conn_.tls.ssl)) {
rv = http2_upstream->terminate_session(NGHTTP2_INADEQUATE_SECURITY); rv = http2_upstream->terminate_session(NGHTTP2_INADEQUATE_SECURITY);
if (rv != 0) { if (rv != 0) {
@ -744,7 +597,8 @@ ClientHandler::get_downstream_connection() {
dconn = util::make_unique<Http2DownstreamConnection>(dconn_pool_, dconn = util::make_unique<Http2DownstreamConnection>(dconn_pool_,
http2session_); http2session_);
} else { } else {
dconn = util::make_unique<HttpDownstreamConnection>(dconn_pool_, loop_); dconn =
util::make_unique<HttpDownstreamConnection>(dconn_pool_, conn_.loop);
} }
dconn->set_client_handler(this); dconn->set_client_handler(this);
return dconn; return dconn;
@ -760,7 +614,7 @@ ClientHandler::get_downstream_connection() {
return dconn; return dconn;
} }
SSL *ClientHandler::get_ssl() const { return ssl_; } SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
void ClientHandler::set_http2_session(Http2Session *http2session) { void ClientHandler::set_http2_session(Http2Session *http2session) {
http2session_ = http2session; http2session_ = http2session;
@ -806,70 +660,18 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) {
return 0; return 0;
} }
bool ClientHandler::get_http2_upgrade_allowed() const { return !ssl_; } bool ClientHandler::get_http2_upgrade_allowed() const { return !conn_.tls.ssl; }
std::string ClientHandler::get_upstream_scheme() const { std::string ClientHandler::get_upstream_scheme() const {
if (ssl_) { if (conn_.tls.ssl) {
return "https"; return "https";
} else { } else {
return "http"; return "http";
} }
} }
void ClientHandler::set_tls_handshake(bool f) { tls_handshake_ = f; } void ClientHandler::start_immediate_shutdown() {
ev_timer_start(conn_.loop, &reneg_shutdown_timer_);
bool ClientHandler::get_tls_handshake() const { return tls_handshake_; }
void ClientHandler::set_tls_renegotiation(bool f) {
if (tls_renegotiation_ == false) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "TLS renegotiation detected. "
<< "Start shutdown timer now.";
}
ev_timer_start(loop_, &reneg_shutdown_timer_);
}
tls_renegotiation_ = f;
}
bool ClientHandler::get_tls_renegotiation() const { return tls_renegotiation_; }
namespace {
const size_t SHRPX_SMALL_WRITE_LIMIT = 1300;
const size_t SHRPX_WARMUP_THRESHOLD = 1 << 20;
} // namespace
ssize_t ClientHandler::get_write_limit() {
if (!ssl_) {
return -1;
}
auto t = ev_now(loop_);
if (t - last_write_time_ > 1.0) {
// Time out, use small record size
warmup_writelen_ = 0;
return SHRPX_SMALL_WRITE_LIMIT;
}
// If event_base_gettimeofday_cached() failed, we just skip timer
// checking. Don't know how to treat this.
if (warmup_writelen_ >= SHRPX_WARMUP_THRESHOLD) {
return -1;
}
return SHRPX_SMALL_WRITE_LIMIT;
}
void ClientHandler::update_warmup_writelen(size_t n) {
if (warmup_writelen_ < SHRPX_WARMUP_THRESHOLD) {
warmup_writelen_ += n;
}
}
void ClientHandler::update_last_write_time() {
last_write_time_ = ev_now(loop_);
} }
void ClientHandler::write_accesslog(Downstream *downstream) { void ClientHandler::write_accesslog(Downstream *downstream) {
@ -922,9 +724,9 @@ ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; }
ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; } ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
void ClientHandler::signal_write() { wlimit_.startw(); } void ClientHandler::signal_write() { conn_.wlimit.startw(); }
RateLimit *ClientHandler::get_rlimit() { return &rlimit_; } RateLimit *ClientHandler::get_rlimit() { return &conn_.rlimit; }
RateLimit *ClientHandler::get_wlimit() { return &wlimit_; } RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
} // namespace shrpx } // namespace shrpx

View File

@ -34,6 +34,7 @@
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include "shrpx_rate_limit.h" #include "shrpx_rate_limit.h"
#include "shrpx_connection.h"
#include "buffer.h" #include "buffer.h"
using namespace nghttp2; using namespace nghttp2;
@ -106,20 +107,7 @@ public:
bool get_http2_upgrade_allowed() const; bool get_http2_upgrade_allowed() const;
// Returns upstream scheme, either "http" or "https" // Returns upstream scheme, either "http" or "https"
std::string get_upstream_scheme() const; std::string get_upstream_scheme() const;
void set_tls_handshake(bool f); void start_immediate_shutdown();
bool get_tls_handshake() const;
void set_tls_renegotiation(bool f);
bool get_tls_renegotiation() const;
// Returns maximum write size. The intention of this chunk size is
// control the TLS record size.
//
// This function returns -1, if TLS is not enabled or no limitation
// is required.
ssize_t get_write_limit();
// Updates the number of bytes written in warm up period.
void update_warmup_writelen(size_t n);
// Updates the time when last write was done.
void update_last_write_time();
// Writes upstream accesslog using |downstream|. The |downstream| // Writes upstream accesslog using |downstream|. The |downstream|
// must not be nullptr. // must not be nullptr.
@ -143,10 +131,7 @@ public:
void signal_write(); void signal_write();
private: private:
ev_io wev_; Connection conn_;
ev_io rev_;
ev_timer wt_;
ev_timer rt_;
ev_timer reneg_shutdown_timer_; ev_timer reneg_shutdown_timer_;
std::unique_ptr<Upstream> upstream_; std::unique_ptr<Upstream> upstream_;
std::string ipaddr_; std::string ipaddr_;
@ -155,26 +140,15 @@ private:
std::string alpn_; std::string alpn_;
std::function<int(ClientHandler &)> read_, write_; std::function<int(ClientHandler &)> read_, write_;
std::function<int(ClientHandler &)> on_read_, on_write_; std::function<int(ClientHandler &)> on_read_, on_write_;
RateLimit wlimit_;
RateLimit rlimit_;
struct ev_loop *loop_;
DownstreamConnectionPool *dconn_pool_; DownstreamConnectionPool *dconn_pool_;
// Shared HTTP2 session for each thread. NULL if backend is not // Shared HTTP2 session for each thread. NULL if backend is not
// HTTP2. Not deleted by this object. // HTTP2. Not deleted by this object.
Http2Session *http2session_; Http2Session *http2session_;
ConnectBlocker *http1_connect_blocker_; ConnectBlocker *http1_connect_blocker_;
SSL *ssl_;
WorkerStat *worker_stat_; WorkerStat *worker_stat_;
double last_write_time_;
size_t warmup_writelen_;
// The number of bytes of HTTP/2 client connection header to read // The number of bytes of HTTP/2 client connection header to read
size_t left_connhd_len_; size_t left_connhd_len_;
size_t tls_last_writelen_;
size_t tls_last_readlen_;
int fd_;
bool should_close_after_write_; bool should_close_after_write_;
bool tls_handshake_;
bool tls_renegotiation_;
WriteBuf wb_; WriteBuf wb_;
ReadBuf rb_; ReadBuf rb_;
}; };

321
src/shrpx_connection.cc Normal file
View File

@ -0,0 +1,321 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2015 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_connection.h"
#include <unistd.h>
#include <limits>
#include <openssl/err.h>
#include "memchunk.h"
using namespace nghttp2;
namespace shrpx {
Connection::Connection(struct ev_loop *loop, int fd, SSL *ssl,
ev_tstamp write_timeout, ev_tstamp read_timeout,
size_t write_rate, size_t write_burst, size_t read_rate,
size_t read_burst, IOCb writecb, IOCb readcb,
TimerCb timeoutcb, void *data)
: tls{ssl}, wlimit(loop, &wev, write_rate, write_burst),
rlimit(loop, &rev, read_rate, read_burst), writecb(writecb),
readcb(readcb), timeoutcb(timeoutcb), loop(loop), data(data), fd(fd) {
ev_io_init(&wev, writecb, fd, EV_WRITE);
ev_io_init(&rev, readcb, fd, EV_READ);
wev.data = this;
rev.data = this;
ev_timer_init(&wt, timeoutcb, 0., write_timeout);
ev_timer_init(&rt, timeoutcb, 0., read_timeout);
wt.data = this;
rt.data = this;
}
Connection::~Connection() { disconnect(); }
void Connection::disconnect() {
ev_timer_stop(loop, &rt);
ev_timer_stop(loop, &wt);
rlimit.stopw();
wlimit.stopw();
if (tls.ssl) {
SSL_set_app_data(tls.ssl, nullptr);
SSL_set_shutdown(tls.ssl, SSL_RECEIVED_SHUTDOWN);
ERR_clear_error();
SSL_shutdown(tls.ssl);
SSL_free(tls.ssl);
tls.ssl = nullptr;
}
if (fd != -1) {
shutdown(fd, SHUT_WR);
close(fd);
fd = -1;
}
}
int Connection::tls_handshake() {
auto rv = SSL_do_handshake(tls.ssl);
if (rv == 0) {
return SHRPX_ERR_NETWORK;
}
if (rv < 0) {
auto err = SSL_get_error(tls.ssl, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
wlimit.stopw();
ev_timer_stop(loop, &wt);
return SHRPX_ERR_INPROGRESS;
case SSL_ERROR_WANT_WRITE:
wlimit.startw();
ev_timer_again(loop, &wt);
return SHRPX_ERR_INPROGRESS;
default:
return SHRPX_ERR_NETWORK;
}
}
wlimit.stopw();
ev_timer_stop(loop, &wt);
tls.initial_handshake_done = true;
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "SSL/TLS handshake completed";
if (SSL_session_reused(tls.ssl)) {
LOG(INFO) << "SSL/TLS session reused";
}
}
return 0;
}
namespace {
const size_t SHRPX_SMALL_WRITE_LIMIT = 1300;
const size_t SHRPX_WARMUP_THRESHOLD = 1 << 20;
} // namespace
ssize_t Connection::get_tls_write_limit() {
auto t = ev_now(loop);
if (t - tls.last_write_time > 1.) {
// Time out, use small record size
tls.warmup_writelen = 0;
return SHRPX_SMALL_WRITE_LIMIT;
}
if (tls.warmup_writelen >= SHRPX_WARMUP_THRESHOLD) {
return std::numeric_limits<ssize_t>::max();
}
return SHRPX_SMALL_WRITE_LIMIT;
}
void Connection::update_tls_warmup_writelen(size_t n) {
if (tls.warmup_writelen < SHRPX_WARMUP_THRESHOLD) {
tls.warmup_writelen += n;
}
}
ssize_t Connection::write_tls(const void *data, size_t len) {
ssize_t nwrite;
// SSL_write requires the same arguments (buf pointer and its
// length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.
// get_write_limit() may return smaller length than previously
// passed to SSL_write, which violates OpenSSL assumption. To avoid
// this, we keep last legnth passed to SSL_write to
// tls.last_writelen if SSL_write indicated I/O blocking.
if (tls.last_writelen == 0) {
nwrite = std::min(len, wlimit.avail());
nwrite = std::min(nwrite, get_tls_write_limit());
if (nwrite == 0) {
return 0;
}
} else {
nwrite = tls.last_writelen;
tls.last_writelen = 0;
}
auto rv = SSL_write(tls.ssl, data, nwrite);
if (rv == 0) {
return SHRPX_ERR_NETWORK;
}
tls.last_write_time = ev_now(loop);
if (rv < 0) {
auto err = SSL_get_error(tls.ssl, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Close connection due to TLS renegotiation";
}
return SHRPX_ERR_NETWORK;
case SSL_ERROR_WANT_WRITE:
tls.last_writelen = nwrite;
wlimit.startw();
ev_timer_again(loop, &wt);
return 0;
default:
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "SSL_write: SSL_get_error returned " << err;
}
return SHRPX_ERR_NETWORK;
}
}
wlimit.drain(rv);
update_tls_warmup_writelen(rv);
return rv;
}
ssize_t Connection::read_tls(void *data, size_t len) {
ssize_t nread;
// SSL_read requires the same arguments (buf pointer and its
// length) on SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.
// rlimit_.avail() or rlimit_.avail() may return different length
// than the length previously passed to SSL_read, which violates
// OpenSSL assumption. To avoid this, we keep last legnth passed
// to SSL_read to tls_last_readlen_ if SSL_read indicated I/O
// blocking.
if (tls.last_readlen == 0) {
nread = std::min(len, rlimit.avail());
if (nread == 0) {
return 0;
}
} else {
nread = tls.last_readlen;
tls.last_readlen = 0;
}
auto rv = SSL_read(tls.ssl, data, nread);
if (rv <= 0) {
auto err = SSL_get_error(tls.ssl, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
tls.last_readlen = nread;
return 0;
case SSL_ERROR_WANT_WRITE:
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Close connection due to TLS renegotiation";
}
return SHRPX_ERR_NETWORK;
case SSL_ERROR_ZERO_RETURN:
return SHRPX_ERR_EOF;
default:
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "SSL_read: SSL_get_error returned " << err;
}
return SHRPX_ERR_NETWORK;
}
}
rlimit.drain(rv);
return rv;
}
ssize_t Connection::write_clear(const void *data, size_t len) {
ssize_t nwrite = std::min(len, wlimit.avail());
if (nwrite == 0) {
return 0;
}
while ((nwrite = write(fd, data, nwrite)) == -1 && errno == EINTR)
;
if (nwrite == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
wlimit.startw();
ev_timer_again(loop, &wt);
return 0;
}
return SHRPX_ERR_NETWORK;
}
wlimit.drain(nwrite);
return nwrite;
}
ssize_t Connection::writev_clear(struct iovec *iov, int iovcnt) {
iovcnt = limit_iovec(iov, iovcnt, wlimit.avail());
if (iovcnt == 0) {
return 0;
}
ssize_t nwrite;
while ((nwrite = writev(fd, iov, iovcnt)) == -1 && errno == EINTR)
;
if (nwrite == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
wlimit.startw();
ev_timer_again(loop, &wt);
return 0;
}
return SHRPX_ERR_NETWORK;
}
wlimit.drain(nwrite);
return nwrite;
}
ssize_t Connection::read_clear(void *data, size_t len) {
ssize_t nread = std::min(len, rlimit.avail());
if (nread == 0) {
return 0;
}
while ((nread = read(fd, data, nread)) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
return SHRPX_ERR_NETWORK;
}
if (nread == 0) {
return SHRPX_ERR_EOF;
}
rlimit.drain(nread);
return nread;
}
} // namespace shrpx

107
src/shrpx_connection.h Normal file
View File

@ -0,0 +1,107 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2015 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_CONNECTION_H
#define SHRPX_CONNECTION_H
#include "shrpx_config.h"
#include <sys/uio.h>
#include <ev.h>
#include <openssl/ssl.h>
#include "shrpx_rate_limit.h"
#include "shrpx_error.h"
namespace shrpx {
struct TLSConnection {
SSL *ssl;
ev_tstamp last_write_time;
size_t warmup_writelen;
// length passed to SSL_write and SSL_read last time. This is
// required since these functions require the exact same parameters
// on non-blocking I/O.
size_t last_writelen, last_readlen;
bool initial_handshake_done;
bool reneg_started;
};
template <typename T> using EVCb = void (*)(struct ev_loop *, T *, int);
using IOCb = EVCb<ev_io>;
using TimerCb = EVCb<ev_timer>;
struct Connection {
Connection(struct ev_loop *loop, int fd, SSL *ssl, ev_tstamp write_timeout,
ev_tstamp read_timeout, size_t write_rate, size_t write_burst,
size_t read_rate, size_t read_burst, IOCb writecb, IOCb readcb,
TimerCb timeoutcb, void *data);
~Connection();
void disconnect();
int tls_handshake();
// All write_* and writev_clear functions return number of bytes
// written. If nothing cannot be written (e.g., there is no
// allowance in RateLimit or underlying connection blocks), return
// 0. SHRPX_ERR_NETWORK is returned in case of error.
//
// All read_* functions return number of bytes read. If nothing
// cannot be read (e.g., there is no allowance in Ratelimit or
// underlying connection blocks), return 0. SHRPX_ERR_EOF is
// returned in case of EOF and no data was read. Otherwise
// SHRPX_ERR_NETWORK is return in case of error.
ssize_t write_tls(const void *data, size_t len);
ssize_t read_tls(void *data, size_t len);
ssize_t get_tls_write_limit();
// Updates the number of bytes written in warm up period.
void update_tls_warmup_writelen(size_t n);
ssize_t write_clear(const void *data, size_t len);
ssize_t writev_clear(struct iovec *iov, int iovcnt);
ssize_t read_clear(void *data, size_t len);
TLSConnection tls;
ev_io wev;
ev_io rev;
ev_timer wt;
ev_timer rt;
RateLimit wlimit;
RateLimit rlimit;
IOCb writecb;
IOCb readcb;
TimerCb timeoutcb;
struct ev_loop *loop;
void *data;
int fd;
};
} // namespace shrpx
#endif // SHRPX_CONNECTION_H

View File

@ -51,8 +51,6 @@ public:
virtual int resume_read(IOCtrlReason reason, size_t consumed) = 0; virtual int resume_read(IOCtrlReason reason, size_t consumed) = 0;
virtual void force_resume_read() = 0; virtual void force_resume_read() = 0;
enum { ERR_EOF = -100, ERR_NET = -101 };
virtual int on_read() = 0; virtual int on_read() = 0;
virtual int on_write() = 0; virtual int on_write() = 0;
virtual int on_timeout() { return 0; } virtual int on_timeout() { return 0; }

View File

@ -29,11 +29,13 @@
namespace shrpx { namespace shrpx {
// Deprecated, do not use.
enum ErrorCode { enum ErrorCode {
SHRPX_ERR_SUCCESS = 0, SHRPX_ERR_SUCCESS = 0,
SHRPX_ERR_UNKNOWN = -1, SHRPX_ERR_ERROR = -1,
SHRPX_ERR_HTTP_PARSE = -2, SHRPX_ERR_NETWORK = -100,
SHRPX_ERR_NETWORK = -3 SHRPX_ERR_EOF = -101,
SHRPX_ERR_INPROGRESS = -102,
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -73,7 +73,8 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
namespace { namespace {
void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
auto http2session = static_cast<Http2Session *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto http2session = static_cast<Http2Session *>(conn->data);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
SSLOG(INFO, http2session) << "Timeout"; SSLOG(INFO, http2session) << "Timeout";
@ -87,7 +88,8 @@ void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
namespace { namespace {
void readcb(struct ev_loop *loop, ev_io *w, int revents) { void readcb(struct ev_loop *loop, ev_io *w, int revents) {
int rv; int rv;
auto http2session = static_cast<Http2Session *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto http2session = static_cast<Http2Session *>(conn->data);
http2session->connection_alive(); http2session->connection_alive();
rv = http2session->do_read(); rv = http2session->do_read();
if (rv != 0) { if (rv != 0) {
@ -99,7 +101,8 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) {
namespace { namespace {
void writecb(struct ev_loop *loop, ev_io *w, int revents) { void writecb(struct ev_loop *loop, ev_io *w, int revents) {
int rv; int rv;
auto http2session = static_cast<Http2Session *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto http2session = static_cast<Http2Session *>(conn->data);
http2session->clear_write_request(); http2session->clear_write_request();
http2session->connection_alive(); http2session->connection_alive();
rv = http2session->do_write(); rv = http2session->do_write();
@ -132,26 +135,17 @@ void wrschedcb(struct ev_loop *loop, ev_prepare *w, int revents) {
} // namespace } // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx) Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx)
: loop_(loop), ssl_ctx_(ssl_ctx), ssl_(nullptr), session_(nullptr), : conn_(loop, -1, nullptr, get_config()->downstream_write_timeout,
data_pending_(nullptr), data_pendinglen_(0), fd_(-1), get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb,
state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE), timeoutcb, this),
flow_control_(false), write_requested_(false) { ssl_ctx_(ssl_ctx), session_(nullptr), data_pending_(nullptr),
// We do not know fd yet, so just set dummy fd 0 data_pendinglen_(0), state_(DISCONNECTED),
ev_io_init(&wev_, writecb, 0, EV_WRITE); connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false),
ev_io_init(&rev_, readcb, 0, EV_READ); write_requested_(false) {
wev_.data = this;
rev_.data = this;
read_ = write_ = &Http2Session::noop; read_ = write_ = &Http2Session::noop;
on_read_ = on_write_ = &Http2Session::noop; on_read_ = on_write_ = &Http2Session::noop;
ev_timer_init(&wt_, timeoutcb, 0., get_config()->downstream_write_timeout);
ev_timer_init(&rt_, timeoutcb, 0., get_config()->downstream_read_timeout);
wt_.data = this;
rt_.data = this;
// We will resuse this many times, so use repeat timeout value. // We will resuse this many times, so use repeat timeout value.
ev_timer_init(&connchk_timer_, connchk_timeout_cb, 0., 5.); ev_timer_init(&connchk_timer_, connchk_timeout_cb, 0., 5.);
@ -166,7 +160,7 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx)
ev_prepare_init(&wrsched_prep_, &wrschedcb); ev_prepare_init(&wrsched_prep_, &wrschedcb);
wrsched_prep_.data = this; wrsched_prep_.data = this;
ev_prepare_start(loop_, &wrsched_prep_); ev_prepare_start(conn_.loop, &wrsched_prep_);
} }
Http2Session::~Http2Session() { disconnect(); } Http2Session::~Http2Session() { disconnect(); }
@ -181,34 +175,13 @@ int Http2Session::disconnect(bool hard) {
rb_.reset(); rb_.reset();
wb_.reset(); wb_.reset();
ev_timer_stop(loop_, &settings_timer_); ev_timer_stop(conn_.loop, &settings_timer_);
ev_timer_stop(loop_, &connchk_timer_); ev_timer_stop(conn_.loop, &connchk_timer_);
ev_timer_stop(loop_, &rt_);
ev_timer_stop(loop_, &wt_);
read_ = write_ = &Http2Session::noop; read_ = write_ = &Http2Session::noop;
on_read_ = on_write_ = &Http2Session::noop; on_read_ = on_write_ = &Http2Session::noop;
ev_io_stop(loop_, &rev_); conn_.disconnect();
ev_io_stop(loop_, &wev_);
if (ssl_) {
SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN);
ERR_clear_error();
SSL_shutdown(ssl_);
SSL_free(ssl_);
ssl_ = nullptr;
}
if (fd_ != -1) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Closing fd=" << fd_;
}
shutdown(fd_, SHUT_WR);
close(fd_);
fd_ = -1;
}
if (proxy_htp_) { if (proxy_htp_) {
proxy_htp_.reset(); proxy_htp_.reset();
@ -251,7 +224,7 @@ int Http2Session::disconnect(bool hard) {
return 0; return 0;
} }
int Http2Session::check_cert() { return ssl::check_cert(ssl_); } int Http2Session::check_cert() { return ssl::check_cert(conn_.tls.ssl); }
int Http2Session::initiate_connection() { int Http2Session::initiate_connection() {
int rv = 0; int rv = 0;
@ -262,15 +235,15 @@ int Http2Session::initiate_connection() {
<< get_config()->downstream_http_proxy_port; << get_config()->downstream_http_proxy_port;
} }
fd_ = util::create_nonblock_socket( conn_.fd = util::create_nonblock_socket(
get_config()->downstream_http_proxy_addr.storage.ss_family); get_config()->downstream_http_proxy_addr.storage.ss_family);
if (fd_ == -1) { if (conn_.fd == -1) {
return -1; return -1;
} }
rv = connect(fd_, const_cast<sockaddr *>( rv = connect(conn_.fd, const_cast<sockaddr *>(
&get_config()->downstream_http_proxy_addr.sa), &get_config()->downstream_http_proxy_addr.sa),
get_config()->downstream_http_proxy_addrlen); get_config()->downstream_http_proxy_addrlen);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {
SSLOG(ERROR, this) << "Failed to connect to the proxy " SSLOG(ERROR, this) << "Failed to connect to the proxy "
@ -279,13 +252,13 @@ int Http2Session::initiate_connection() {
return -1; return -1;
} }
ev_io_set(&rev_, fd_, EV_READ); ev_io_set(&conn_.rev, conn_.fd, EV_READ);
ev_io_set(&wev_, fd_, EV_WRITE); ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
ev_io_start(loop_, &wev_); conn_.wlimit.startw();
// TODO we should have timeout for connection establishment // TODO we should have timeout for connection establishment
ev_timer_again(loop_, &wt_); ev_timer_again(conn_.loop, &conn_.wt);
write_ = &Http2Session::connected; write_ = &Http2Session::connected;
@ -307,8 +280,8 @@ int Http2Session::initiate_connection() {
} }
if (ssl_ctx_) { if (ssl_ctx_) {
// We are establishing TLS connection. // We are establishing TLS connection.
ssl_ = SSL_new(ssl_ctx_); conn_.tls.ssl = SSL_new(ssl_ctx_);
if (!ssl_) { if (!conn_.tls.ssl) {
SSLOG(ERROR, this) << "SSL_new() failed: " SSLOG(ERROR, this) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), NULL); << ERR_error_string(ERR_get_error(), NULL);
return -1; return -1;
@ -325,21 +298,21 @@ int Http2Session::initiate_connection() {
// TLS extensions: SNI. There is no documentation about the return // TLS extensions: SNI. There is no documentation about the return
// code for this function (actually this is macro wrapping SSL_ctrl // code for this function (actually this is macro wrapping SSL_ctrl
// at the time of this writing). // at the time of this writing).
SSL_set_tlsext_host_name(ssl_, sni_name); SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name);
} }
// If state_ == PROXY_CONNECTED, we has connected to the proxy // If state_ == PROXY_CONNECTED, we has connected to the proxy
// using fd_ and tunnel has been established. // using conn_.fd and tunnel has been established.
if (state_ == DISCONNECTED) { if (state_ == DISCONNECTED) {
assert(fd_ == -1); assert(conn_.fd == -1);
fd_ = util::create_nonblock_socket( conn_.fd = util::create_nonblock_socket(
get_config()->downstream_addrs[0].addr.storage.ss_family); get_config()->downstream_addrs[0].addr.storage.ss_family);
if (fd_ == -1) { if (conn_.fd == -1) {
return -1; return -1;
} }
rv = connect( rv = connect(
fd_, conn_.fd,
// TODO maybe not thread-safe? // TODO maybe not thread-safe?
const_cast<sockaddr *>(&get_config()->downstream_addrs[0].addr.sa), const_cast<sockaddr *>(&get_config()->downstream_addrs[0].addr.sa),
get_config()->downstream_addrs[0].addrlen); get_config()->downstream_addrs[0].addrlen);
@ -348,25 +321,25 @@ int Http2Session::initiate_connection() {
} }
} }
if (SSL_set_fd(ssl_, fd_) == 0) { if (SSL_set_fd(conn_.tls.ssl, conn_.fd) == 0) {
return -1; return -1;
} }
SSL_set_connect_state(ssl_); SSL_set_connect_state(conn_.tls.ssl);
} else { } else {
if (state_ == DISCONNECTED) { if (state_ == DISCONNECTED) {
// Without TLS and proxy. // Without TLS and proxy.
assert(fd_ == -1); assert(conn_.fd == -1);
fd_ = util::create_nonblock_socket( conn_.fd = util::create_nonblock_socket(
get_config()->downstream_addrs[0].addr.storage.ss_family); get_config()->downstream_addrs[0].addr.storage.ss_family);
if (fd_ == -1) { if (conn_.fd == -1) {
return -1; return -1;
} }
rv = connect(fd_, const_cast<sockaddr *>( rv = connect(conn_.fd, const_cast<sockaddr *>(
&get_config()->downstream_addrs[0].addr.sa), &get_config()->downstream_addrs[0].addr.sa),
get_config()->downstream_addrs[0].addrlen); get_config()->downstream_addrs[0].addrlen);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {
return -1; return -1;
@ -384,13 +357,11 @@ int Http2Session::initiate_connection() {
// rev_ and wev_ could possibly be active here. Since calling // rev_ and wev_ could possibly be active here. Since calling
// ev_io_set is not allowed while watcher is active, we have to // ev_io_set is not allowed while watcher is active, we have to
// stop them just in case. // stop them just in case.
ev_io_stop(loop_, &rev_); conn_.rlimit.stopw();
ev_io_stop(loop_, &wev_); conn_.wlimit.stopw();
ev_io_set(&rev_, fd_, EV_READ); ev_io_set(&conn_.rev, conn_.fd, EV_READ);
ev_io_set(&wev_, fd_, EV_WRITE); ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
ev_io_start(loop_, &wev_);
write_ = &Http2Session::connected; write_ = &Http2Session::connected;
@ -400,11 +371,11 @@ int Http2Session::initiate_connection() {
// We have been already connected when no TLS and proxy is used. // We have been already connected when no TLS and proxy is used.
if (state_ != CONNECTED) { if (state_ != CONNECTED) {
state_ = CONNECTING; state_ = CONNECTING;
ev_io_start(loop_, &wev_); conn_.wlimit.startw();
// TODO we should have timeout for connection establishment // TODO we should have timeout for connection establishment
ev_timer_again(loop_, &wt_); ev_timer_again(conn_.loop, &conn_.wt);
} else { } else {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
} }
return 0; return 0;
@ -450,17 +421,13 @@ http_parser_settings htp_hooks = {
int Http2Session::downstream_read_proxy() { int Http2Session::downstream_read_proxy() {
for (;;) { for (;;) {
const void *data; if (rb_.rleft() == 0) {
size_t datalen; return 0;
std::tie(data, datalen) = rb_.get();
if (datalen == 0) {
break;
} }
size_t nread = size_t nread = http_parser_execute(proxy_htp_.get(), &htp_hooks,
http_parser_execute(proxy_htp_.get(), &htp_hooks, reinterpret_cast<const char *>(rb_.pos),
reinterpret_cast<const char *>(data), datalen); rb_.rleft());
rb_.drain(nread); rb_.drain(nread);
@ -476,12 +443,11 @@ int Http2Session::downstream_read_proxy() {
if (initiate_connection() != 0) { if (initiate_connection() != 0) {
return -1; return -1;
} }
break; return 0;
case Http2Session::PROXY_FAILED: case Http2Session::PROXY_FAILED:
return -1; return -1;
} }
} }
return 0;
} }
int Http2Session::downstream_connect_proxy() { int Http2Session::downstream_connect_proxy() {
@ -679,11 +645,11 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
} // namespace } // namespace
void Http2Session::start_settings_timer() { void Http2Session::start_settings_timer() {
ev_timer_again(loop_, &settings_timer_); ev_timer_again(conn_.loop, &settings_timer_);
} }
void Http2Session::stop_settings_timer() { void Http2Session::stop_settings_timer() {
ev_timer_stop(loop_, &settings_timer_); ev_timer_stop(conn_.loop, &settings_timer_);
} }
namespace { namespace {
@ -1176,7 +1142,7 @@ int Http2Session::on_connect() {
if (ssl_ctx_) { if (ssl_ctx_) {
const unsigned char *next_proto = nullptr; const unsigned char *next_proto = nullptr;
unsigned int next_proto_len; unsigned int next_proto_len;
SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len); SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
if (next_proto) { if (next_proto) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -1276,8 +1242,8 @@ int Http2Session::on_connect() {
return -1; return -1;
} }
auto must_terminate = auto must_terminate = !get_config()->downstream_no_tls &&
!get_config()->downstream_no_tls && !ssl::check_http2_requirement(ssl_); !ssl::check_http2_requirement(conn_.tls.ssl);
if (must_terminate) { if (must_terminate) {
rv = terminate_session(NGHTTP2_INADEQUATE_SECURITY); rv = terminate_session(NGHTTP2_INADEQUATE_SECURITY);
@ -1329,16 +1295,9 @@ int Http2Session::on_write() { return on_write_(*this); }
int Http2Session::downstream_read() { int Http2Session::downstream_read() {
ssize_t rv = 0; ssize_t rv = 0;
for (;;) { if (rb_.rleft() > 0) {
const void *data;
size_t nread;
std::tie(data, nread) = rb_.get();
if (nread == 0) {
break;
}
rv = nghttp2_session_mem_recv( rv = nghttp2_session_mem_recv(
session_, reinterpret_cast<const uint8_t *>(data), nread); session_, reinterpret_cast<const uint8_t *>(rb_.pos), rb_.rleft());
if (rv < 0) { if (rv < 0) {
SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: " SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: "
@ -1346,7 +1305,9 @@ int Http2Session::downstream_read() {
return -1; return -1;
} }
rb_.drain(nread); // nghttp2_session_mem_recv() should consume all input data in
// case of success.
rb_.reset();
} }
if (nghttp2_session_want_read(session_) == 0 && if (nghttp2_session_want_read(session_) == 0 &&
@ -1413,10 +1374,10 @@ void Http2Session::clear_write_request() { write_requested_ = false; }
bool Http2Session::write_requested() const { return write_requested_; } bool Http2Session::write_requested() const { return write_requested_; }
struct ev_loop *Http2Session::get_loop() const { struct ev_loop *Http2Session::get_loop() const {
return loop_; return conn_.loop;
} }
ev_io *Http2Session::get_wev() { return &wev_; } ev_io *Http2Session::get_wev() { return &conn_.wev; }
int Http2Session::get_state() const { return state_; } int Http2Session::get_state() const { return state_; }
@ -1431,7 +1392,7 @@ int Http2Session::terminate_session(uint32_t error_code) {
return 0; return 0;
} }
SSL *Http2Session::get_ssl() const { return ssl_; } SSL *Http2Session::get_ssl() const { return conn_.tls.ssl; }
int Http2Session::consume(int32_t stream_id, size_t len) { int Http2Session::consume(int32_t stream_id, size_t len) {
int rv; int rv;
@ -1473,7 +1434,7 @@ void Http2Session::start_checking_connection() {
} }
void Http2Session::reset_connection_check_timer() { void Http2Session::reset_connection_check_timer() {
ev_timer_again(loop_, &connchk_timer_); ev_timer_again(conn_.loop, &connchk_timer_);
} }
void Http2Session::connection_alive() { void Http2Session::connection_alive() {
@ -1518,7 +1479,7 @@ void Http2Session::set_connection_check_state(int state) {
int Http2Session::noop() { return 0; } int Http2Session::noop() { return 0; }
int Http2Session::connected() { int Http2Session::connected() {
if (!util::check_socket_connected(fd_)) { if (!util::check_socket_connected(conn_.fd)) {
return -1; return -1;
} }
@ -1526,9 +1487,9 @@ int Http2Session::connected() {
SSLOG(INFO, this) << "Connection established"; SSLOG(INFO, this) << "Connection established";
} }
ev_io_start(loop_, &rev_); conn_.rlimit.startw();
if (ssl_) { if (conn_.tls.ssl) {
read_ = &Http2Session::tls_handshake; read_ = &Http2Session::tls_handshake;
write_ = &Http2Session::tls_handshake; write_ = &Http2Session::tls_handshake;
@ -1551,7 +1512,7 @@ int Http2Session::connected() {
} }
int Http2Session::read_clear() { int Http2Session::read_clear() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
for (;;) { for (;;) {
// we should process buffered data first before we read EOF. // we should process buffered data first before we read EOF.
@ -1562,50 +1523,36 @@ int Http2Session::read_clear() {
return 0; return 0;
} }
rb_.reset(); rb_.reset();
struct iovec iov[2];
auto iovcnt = rb_.wiovec(iov);
if (iovcnt > 0) { auto nread = conn_.read_clear(rb_.last, rb_.wleft());
ssize_t nread;
while ((nread = readv(fd_, iov, iovcnt)) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
return -1;
}
if (nread == 0) { if (nread == 0) {
return -1; return 0;
}
rb_.write(nread);
} }
}
return 0; if (nread < 0) {
return nread;
}
rb_.write(nread);
}
} }
int Http2Session::write_clear() { int Http2Session::write_clear() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
for (;;) { for (;;) {
if (wb_.rleft() > 0) { if (wb_.rleft() > 0) {
struct iovec iov[2]; auto nwrite = conn_.write_clear(wb_.pos, wb_.rleft());
auto iovcnt = wb_.riovec(iov);
ssize_t nwrite; if (nwrite == 0) {
while ((nwrite = writev(fd_, iov, iovcnt)) == -1 && errno == EINTR) return 0;
;
if (nwrite == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
ev_io_start(loop_, &wev_);
ev_timer_again(loop_, &wt_);
return 0;
}
return -1;
} }
if (nwrite < 0) {
return nwrite;
}
wb_.drain(nwrite); wb_.drain(nwrite);
continue; continue;
} }
@ -1619,50 +1566,30 @@ int Http2Session::write_clear() {
} }
} }
ev_io_stop(loop_, &wev_); conn_.wlimit.stopw();
ev_timer_stop(loop_, &wt_); ev_timer_stop(conn_.loop, &conn_.wt);
return 0; return 0;
} }
int Http2Session::tls_handshake() { int Http2Session::tls_handshake() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error(); ERR_clear_error();
auto rv = SSL_do_handshake(ssl_); auto rv = conn_.tls_handshake();
if (rv == 0) { if (rv == SHRPX_ERR_INPROGRESS) {
return -1; return 0;
} }
if (rv < 0) { if (rv < 0) {
auto err = SSL_get_error(ssl_, rv); return rv;
switch (err) {
case SSL_ERROR_WANT_READ:
ev_io_stop(loop_, &wev_);
ev_timer_stop(loop_, &wt_);
return 0;
case SSL_ERROR_WANT_WRITE:
ev_io_start(loop_, &wev_);
ev_timer_again(loop_, &wt_);
return 0;
default:
return -1;
}
} }
ev_io_stop(loop_, &wev_);
ev_timer_stop(loop_, &wt_);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "SSL/TLS handshake completed"; SSLOG(INFO, this) << "SSL/TLS handshake completed";
} }
if (LOG_ENABLED(INFO)) {
if (SSL_session_reused(ssl_)) {
CLOG(INFO, this) << "SSL/TLS session reused";
}
}
if (!get_config()->downstream_no_tls && !get_config()->insecure && if (!get_config()->downstream_no_tls && !get_config()->insecure &&
check_cert() != 0) { check_cert() != 0) {
@ -1681,7 +1608,7 @@ int Http2Session::tls_handshake() {
} }
int Http2Session::read_tls() { int Http2Session::read_tls() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error(); ERR_clear_error();
@ -1694,78 +1621,39 @@ int Http2Session::read_tls() {
return 0; return 0;
} }
rb_.reset(); rb_.reset();
struct iovec iov[2];
auto iovcnt = rb_.wiovec(iov); auto nread = conn_.read_tls(rb_.last, rb_.wleft());
if (iovcnt == 0) {
if (nread == 0) {
return 0; return 0;
} }
auto rv = SSL_read(ssl_, iov[0].iov_base, iov[0].iov_len); if (nread < 0) {
return nread;
if (rv == 0) {
return -1;
} }
if (rv < 0) { rb_.write(nread);
auto err = SSL_get_error(ssl_, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
return 0;
case SSL_ERROR_WANT_WRITE:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Close connection due to TLS renegotiation";
}
return -1;
default:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "SSL_read: SSL_get_error returned " << err;
}
return -1;
}
}
rb_.write(rv);
} }
} }
int Http2Session::write_tls() { int Http2Session::write_tls() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error(); ERR_clear_error();
for (;;) { for (;;) {
if (wb_.rleft() > 0) { if (wb_.rleft() > 0) {
const void *p; auto nwrite = conn_.write_tls(wb_.pos, wb_.rleft());
size_t len;
std::tie(p, len) = wb_.get();
auto rv = SSL_write(ssl_, p, len); if (nwrite == 0) {
return 0;
if (rv == 0) {
return -1;
} }
if (rv < 0) { if (nwrite < 0) {
auto err = SSL_get_error(ssl_, rv); return nwrite;
switch (err) {
case SSL_ERROR_WANT_READ:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Close connection due to TLS renegotiation";
}
return -1;
case SSL_ERROR_WANT_WRITE:
ev_io_start(loop_, &wev_);
ev_timer_again(loop_, &wt_);
return 0;
default:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "SSL_write: SSL_get_error returned " << err;
}
return -1;
}
} }
wb_.drain(rv); wb_.drain(nwrite);
continue; continue;
} }
@ -1778,8 +1666,8 @@ int Http2Session::write_tls() {
} }
} }
ev_io_stop(loop_, &wev_); conn_.wlimit.stopw();
ev_timer_stop(loop_, &wt_); ev_timer_stop(conn_.loop, &conn_.wt);
return 0; return 0;
} }

View File

@ -38,7 +38,8 @@
#include "http-parser/http_parser.h" #include "http-parser/http_parser.h"
#include "ringbuf.h" #include "shrpx_connection.h"
#include "buffer.h"
using namespace nghttp2; using namespace nghttp2;
@ -168,14 +169,11 @@ public:
CONNECTION_CHECK_STARTED CONNECTION_CHECK_STARTED
}; };
using ReadBuf = RingBuf<8192>; using ReadBuf = Buffer<8192>;
using WriteBuf = RingBuf<65536>; using WriteBuf = Buffer<32768>;
private: private:
ev_io wev_; Connection conn_;
ev_io rev_;
ev_timer wt_;
ev_timer rt_;
ev_timer settings_timer_; ev_timer settings_timer_;
ev_timer connchk_timer_; ev_timer connchk_timer_;
ev_prepare wrsched_prep_; ev_prepare wrsched_prep_;
@ -185,18 +183,11 @@ private:
std::function<int(Http2Session &)> on_read_, on_write_; std::function<int(Http2Session &)> on_read_, on_write_;
// Used to parse the response from HTTP proxy // Used to parse the response from HTTP proxy
std::unique_ptr<http_parser> proxy_htp_; std::unique_ptr<http_parser> proxy_htp_;
struct ev_loop *loop_;
// NULL if no TLS is configured // NULL if no TLS is configured
SSL_CTX *ssl_ctx_; SSL_CTX *ssl_ctx_;
SSL *ssl_;
nghttp2_session *session_; nghttp2_session *session_;
const uint8_t *data_pending_; const uint8_t *data_pending_;
size_t data_pendinglen_; size_t data_pendinglen_;
// fd_ is used for proxy connection and no TLS connection. For
// direct or TLS connection, it may be -1 even after connection is
// established. Use bufferevent_getfd(bev_) to get file descriptor
// in these cases.
int fd_;
int state_; int state_;
int connection_check_state_; int connection_check_state_;
bool flow_control_; bool flow_control_;

View File

@ -889,11 +889,11 @@ int Http2Upstream::downstream_read(DownstreamConnection *dconn) {
dconn = nullptr; dconn = nullptr;
} else { } else {
auto rv = downstream->on_read(); auto rv = downstream->on_read();
if (rv == DownstreamConnection::ERR_EOF) { if (rv == SHRPX_ERR_EOF) {
return downstream_eof(dconn); return downstream_eof(dconn);
} }
if (rv != 0) { if (rv != 0) {
if (rv != DownstreamConnection::ERR_NET) { if (rv != SHRPX_ERR_NETWORK) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "HTTP parser failure"; DCLOG(INFO, dconn) << "HTTP parser failure";
} }
@ -919,7 +919,7 @@ int Http2Upstream::downstream_read(DownstreamConnection *dconn) {
int Http2Upstream::downstream_write(DownstreamConnection *dconn) { int Http2Upstream::downstream_write(DownstreamConnection *dconn) {
int rv; int rv;
rv = dconn->on_write(); rv = dconn->on_write();
if (rv == DownstreamConnection::ERR_NET) { if (rv == SHRPX_ERR_NETWORK) {
return downstream_error(dconn, Downstream::EVENT_ERROR); return downstream_error(dconn, Downstream::EVENT_ERROR);
} }
if (rv != 0) { if (rv != 0) {

View File

@ -43,7 +43,8 @@ namespace shrpx {
namespace { namespace {
void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
auto dconn = static_cast<HttpDownstreamConnection *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Time out"; DCLOG(INFO, dconn) << "Time out";
@ -64,7 +65,8 @@ void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
namespace { namespace {
void readcb(struct ev_loop *loop, ev_io *w, int revents) { void readcb(struct ev_loop *loop, ev_io *w, int revents) {
auto dconn = static_cast<HttpDownstreamConnection *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
auto downstream = dconn->get_downstream(); auto downstream = dconn->get_downstream();
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
auto handler = upstream->get_client_handler(); auto handler = upstream->get_client_handler();
@ -77,7 +79,8 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) {
namespace { namespace {
void writecb(struct ev_loop *loop, ev_io *w, int revents) { void writecb(struct ev_loop *loop, ev_io *w, int revents) {
auto dconn = static_cast<HttpDownstreamConnection *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
auto downstream = dconn->get_downstream(); auto downstream = dconn->get_downstream();
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
auto handler = upstream->get_client_handler(); auto handler = upstream->get_client_handler();
@ -90,7 +93,8 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
namespace { namespace {
void connectcb(struct ev_loop *loop, ev_io *w, int revents) { void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
auto dconn = static_cast<HttpDownstreamConnection *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
auto downstream = dconn->get_downstream(); auto downstream = dconn->get_downstream();
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
auto handler = upstream->get_client_handler(); auto handler = upstream->get_client_handler();
@ -106,32 +110,13 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
HttpDownstreamConnection::HttpDownstreamConnection( HttpDownstreamConnection::HttpDownstreamConnection(
DownstreamConnectionPool *dconn_pool, struct ev_loop *loop) DownstreamConnectionPool *dconn_pool, struct ev_loop *loop)
: DownstreamConnection(dconn_pool), rlimit_(loop, &rev_, 0, 0), : DownstreamConnection(dconn_pool),
ioctrl_(&rlimit_), response_htp_{0}, loop_(loop), addr_idx_(0), fd_(-1) { conn_(loop, -1, nullptr, get_config()->downstream_write_timeout,
// We do not know fd yet, so just set dummy fd 0 get_config()->downstream_read_timeout, 0, 0, 0, 0, connectcb,
ev_io_init(&wev_, connectcb, 0, EV_WRITE); readcb, timeoutcb, this),
ev_io_init(&rev_, readcb, 0, EV_READ); ioctrl_(&conn_.rlimit), response_htp_{0}, addr_idx_(0) {}
wev_.data = this;
rev_.data = this;
ev_timer_init(&wt_, timeoutcb, 0., get_config()->downstream_write_timeout);
ev_timer_init(&rt_, timeoutcb, 0., get_config()->downstream_read_timeout);
wt_.data = this;
rt_.data = this;
}
HttpDownstreamConnection::~HttpDownstreamConnection() { HttpDownstreamConnection::~HttpDownstreamConnection() {
ev_timer_stop(loop_, &rt_);
ev_timer_stop(loop_, &wt_);
ev_io_stop(loop_, &rev_);
ev_io_stop(loop_, &wev_);
if (fd_ != -1) {
shutdown(fd_, SHUT_WR);
close(fd_);
}
// Downstream and DownstreamConnection may be deleted // Downstream and DownstreamConnection may be deleted
// asynchronously. // asynchronously.
if (downstream_) { if (downstream_) {
@ -144,7 +129,7 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream; DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
} }
if (fd_ == -1) { if (conn_.fd == -1) {
auto connect_blocker = client_handler_->get_http1_connect_blocker(); auto connect_blocker = client_handler_->get_http1_connect_blocker();
if (connect_blocker->blocked()) { if (connect_blocker->blocked()) {
@ -162,10 +147,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
++worker_stat->next_downstream; ++worker_stat->next_downstream;
worker_stat->next_downstream %= get_config()->downstream_addrs.size(); worker_stat->next_downstream %= get_config()->downstream_addrs.size();
fd_ = util::create_nonblock_socket( conn_.fd = util::create_nonblock_socket(
get_config()->downstream_addrs[i].addr.storage.ss_family); get_config()->downstream_addrs[i].addr.storage.ss_family);
if (fd_ == -1) { if (conn_.fd == -1) {
auto error = errno; auto error = errno;
DCLOG(WARN, this) << "socket() failed; errno=" << error; DCLOG(WARN, this) << "socket() failed; errno=" << error;
@ -175,16 +160,16 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
} }
int rv; int rv;
rv = connect(fd_, const_cast<sockaddr *>( rv = connect(conn_.fd, const_cast<sockaddr *>(
&get_config()->downstream_addrs[i].addr.sa), &get_config()->downstream_addrs[i].addr.sa),
get_config()->downstream_addrs[i].addrlen); get_config()->downstream_addrs[i].addrlen);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {
auto error = errno; auto error = errno;
DCLOG(WARN, this) << "connect() failed; errno=" << error; DCLOG(WARN, this) << "connect() failed; errno=" << error;
connect_blocker->on_failure(); connect_blocker->on_failure();
close(fd_); close(conn_.fd);
fd_ = -1; conn_.fd = -1;
if (end == worker_stat->next_downstream) { if (end == worker_stat->next_downstream) {
return SHRPX_ERR_NETWORK; return SHRPX_ERR_NETWORK;
@ -200,10 +185,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
addr_idx_ = i; addr_idx_ = i;
ev_io_set(&wev_, fd_, EV_WRITE); ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
ev_io_set(&rev_, fd_, EV_READ); ev_io_set(&conn_.rev, conn_.fd, EV_READ);
ev_io_start(loop_, &wev_); conn_.wlimit.startw();
break; break;
} }
@ -214,12 +199,12 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
http_parser_init(&response_htp_, HTTP_RESPONSE); http_parser_init(&response_htp_, HTTP_RESPONSE);
response_htp_.data = downstream_; response_htp_.data = downstream_;
ev_set_cb(&rev_, readcb); ev_set_cb(&conn_.rev, readcb);
rt_.repeat = get_config()->downstream_read_timeout; conn_.rt.repeat = get_config()->downstream_read_timeout;
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
// TODO we should have timeout for connection establishment // TODO we should have timeout for connection establishment
ev_timer_again(loop_, &wt_); ev_timer_again(conn_.loop, &conn_.wt);
return 0; return 0;
} }
@ -421,7 +406,8 @@ int HttpDownstreamConnection::end_upload_data() {
namespace { namespace {
void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) { void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
auto dconn = static_cast<HttpDownstreamConnection *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection EOF"; DCLOG(INFO, dconn) << "Idle connection EOF";
} }
@ -433,7 +419,8 @@ void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
namespace { namespace {
void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
auto dconn = static_cast<HttpDownstreamConnection *>(w->data); auto conn = static_cast<Connection *>(w->data);
auto dconn = static_cast<HttpDownstreamConnection *>(conn->data);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection timeout"; DCLOG(INFO, dconn) << "Idle connection timeout";
} }
@ -450,16 +437,16 @@ void HttpDownstreamConnection::detach_downstream(Downstream *downstream) {
downstream_ = nullptr; downstream_ = nullptr;
ioctrl_.force_resume_read(); ioctrl_.force_resume_read();
ev_io_start(loop_, &rev_); conn_.rlimit.startw();
ev_io_stop(loop_, &wev_); conn_.wlimit.stopw();
ev_set_cb(&rev_, idle_readcb); ev_set_cb(&conn_.rev, idle_readcb);
ev_timer_stop(loop_, &wt_); ev_timer_stop(conn_.loop, &conn_.wt);
rt_.repeat = get_config()->downstream_idle_read_timeout; conn_.rt.repeat = get_config()->downstream_idle_read_timeout;
ev_set_cb(&rt_, idle_timeoutcb); ev_set_cb(&conn_.rt, idle_timeoutcb);
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
} }
void HttpDownstreamConnection::pause_read(IOCtrlReason reason) { void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
@ -649,25 +636,21 @@ http_parser_settings htp_hooks = {
} // namespace } // namespace
int HttpDownstreamConnection::on_read() { int HttpDownstreamConnection::on_read() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
uint8_t buf[8192]; uint8_t buf[8192];
int rv; int rv;
if (downstream_->get_upgraded()) { if (downstream_->get_upgraded()) {
// For upgraded connection, just pass data to the upstream. // For upgraded connection, just pass data to the upstream.
for (;;) { for (;;) {
ssize_t nread; auto nread = conn_.read_clear(buf, sizeof(buf));
while ((nread = read(fd_, buf, sizeof(buf))) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
return DownstreamConnection::ERR_NET;
}
if (nread == 0) { if (nread == 0) {
return DownstreamConnection::ERR_EOF; return 0;
}
if (nread < 0) {
return nread;
} }
rv = downstream_->get_upstream()->on_downstream_body(downstream_, buf, rv = downstream_->get_upstream()->on_downstream_body(downstream_, buf,
@ -684,18 +667,14 @@ int HttpDownstreamConnection::on_read() {
} }
for (;;) { for (;;) {
ssize_t nread; auto nread = conn_.read_clear(buf, sizeof(buf));
while ((nread = read(fd_, buf, sizeof(buf))) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
return DownstreamConnection::ERR_NET;
}
if (nread == 0) { if (nread == 0) {
return DownstreamConnection::ERR_EOF; return 0;
}
if (nread < 0) {
return nread;
} }
auto nproc = http_parser_execute(&response_htp_, &htp_hooks, auto nproc = http_parser_execute(&response_htp_, &htp_hooks,
@ -727,16 +706,8 @@ int HttpDownstreamConnection::on_read() {
} }
} }
#define DEFAULT_WR_IOVCNT 16
#if defined(IOV_MAX) && IOV_MAX < DEFAULT_WR_IOVCNT
#define MAX_WR_IOVCNT IOV_MAX
#else // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
#define MAX_WR_IOVCNT DEFAULT_WR_IOVCNT
#endif // !defined(IOV_MAX) || IOV_MAX >= DEFAULT_WR_IOVCNT
int HttpDownstreamConnection::on_write() { int HttpDownstreamConnection::on_write() {
ev_timer_again(loop_, &rt_); ev_timer_again(conn_.loop, &conn_.rt);
auto upstream = downstream_->get_upstream(); auto upstream = downstream_->get_upstream();
auto input = downstream_->get_request_buf(); auto input = downstream_->get_request_buf();
@ -746,29 +717,22 @@ int HttpDownstreamConnection::on_write() {
while (input->rleft() > 0) { while (input->rleft() > 0) {
auto iovcnt = input->riovec(iov, util::array_size(iov)); auto iovcnt = input->riovec(iov, util::array_size(iov));
ssize_t nwrite; auto nwrite = conn_.writev_clear(iov, iovcnt);
while ((nwrite = writev(fd_, iov, iovcnt)) == -1 && errno == EINTR)
; if (nwrite == 0) {
if (nwrite == -1) { return 0;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
ev_io_start(loop_, &wev_);
ev_timer_again(loop_, &wt_);
goto end;
}
return DownstreamConnection::ERR_NET;
} }
if (nwrite < 0) {
return nwrite;
}
input->drain(nwrite); input->drain(nwrite);
} }
if (input->rleft() == 0) { conn_.wlimit.stopw();
ev_io_stop(loop_, &wev_); ev_timer_stop(conn_.loop, &conn_.wt);
ev_timer_stop(loop_, &wt_);
} else {
ev_io_start(loop_, &wev_);
ev_timer_again(loop_, &wt_);
}
end:
if (input->rleft() == 0) { if (input->rleft() == 0) {
upstream->resume_read(SHRPX_NO_BUFFER, downstream_, upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
downstream_->get_request_datalen()); downstream_->get_request_datalen());
@ -780,8 +744,8 @@ end:
int HttpDownstreamConnection::on_connect() { int HttpDownstreamConnection::on_connect() {
auto connect_blocker = client_handler_->get_http1_connect_blocker(); auto connect_blocker = client_handler_->get_http1_connect_blocker();
if (!util::check_socket_connected(fd_)) { if (!util::check_socket_connected(conn_.fd)) {
ev_io_stop(loop_, &wev_); conn_.wlimit.stopw();
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
DLOG(INFO, this) << "downstream connect failed"; DLOG(INFO, this) << "downstream connect failed";
@ -792,14 +756,14 @@ int HttpDownstreamConnection::on_connect() {
connect_blocker->on_success(); connect_blocker->on_success();
ev_io_start(loop_, &rev_); conn_.rlimit.startw();
ev_set_cb(&wev_, writecb); ev_set_cb(&conn_.wev, writecb);
return 0; return 0;
} }
void HttpDownstreamConnection::on_upstream_change(Upstream *upstream) {} void HttpDownstreamConnection::on_upstream_change(Upstream *upstream) {}
void HttpDownstreamConnection::signal_write() { ev_io_start(loop_, &wev_); } void HttpDownstreamConnection::signal_write() { conn_.wlimit.startw(); }
} // namespace shrpx } // namespace shrpx

View File

@ -31,6 +31,7 @@
#include "shrpx_downstream_connection.h" #include "shrpx_downstream_connection.h"
#include "shrpx_io_control.h" #include "shrpx_io_control.h"
#include "shrpx_connection.h"
namespace shrpx { namespace shrpx {
@ -62,17 +63,11 @@ public:
void signal_write(); void signal_write();
private: private:
ev_io wev_; Connection conn_;
ev_io rev_;
ev_timer wt_;
ev_timer rt_;
RateLimit rlimit_;
IOControl ioctrl_; IOControl ioctrl_;
http_parser response_htp_; http_parser response_htp_;
struct ev_loop *loop_;
// index of get_config()->downstream_addrs this object is using // index of get_config()->downstream_addrs this object is using
size_t addr_idx_; size_t addr_idx_;
int fd_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -477,7 +477,7 @@ int HttpsUpstream::downstream_read(DownstreamConnection *dconn) {
downstream->detach_downstream_connection(); downstream->detach_downstream_connection();
} }
if (rv == DownstreamConnection::ERR_EOF) { if (rv == SHRPX_ERR_EOF) {
return downstream_eof(dconn); return downstream_eof(dconn);
} }
@ -494,7 +494,7 @@ end:
int HttpsUpstream::downstream_write(DownstreamConnection *dconn) { int HttpsUpstream::downstream_write(DownstreamConnection *dconn) {
int rv; int rv;
rv = dconn->on_write(); rv = dconn->on_write();
if (rv == DownstreamConnection::ERR_NET) { if (rv == SHRPX_ERR_NETWORK) {
return downstream_error(dconn, Downstream::EVENT_ERROR); return downstream_error(dconn, Downstream::EVENT_ERROR);
} }

View File

@ -57,8 +57,6 @@ ssize_t send_callback(spdylay_session *session, const uint8_t *data, size_t len,
auto nread = wb->write(data, len); auto nread = wb->write(data, len);
handler->update_warmup_writelen(nread);
return nread; return nread;
} }
} // namespace } // namespace
@ -541,11 +539,11 @@ int SpdyUpstream::downstream_read(DownstreamConnection *dconn) {
dconn = nullptr; dconn = nullptr;
} else { } else {
auto rv = downstream->on_read(); auto rv = downstream->on_read();
if (rv == DownstreamConnection::ERR_EOF) { if (rv == SHRPX_ERR_EOF) {
return downstream_eof(dconn); return downstream_eof(dconn);
} }
if (rv != 0) { if (rv != 0) {
if (rv != DownstreamConnection::ERR_NET) { if (rv != SHRPX_ERR_NETWORK) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "HTTP parser failure"; DCLOG(INFO, dconn) << "HTTP parser failure";
} }
@ -570,7 +568,7 @@ int SpdyUpstream::downstream_read(DownstreamConnection *dconn) {
int SpdyUpstream::downstream_write(DownstreamConnection *dconn) { int SpdyUpstream::downstream_write(DownstreamConnection *dconn) {
int rv; int rv;
rv = dconn->on_write(); rv = dconn->on_write();
if (rv == DownstreamConnection::ERR_NET) { if (rv == SHRPX_ERR_NETWORK) {
return downstream_error(dconn, Downstream::EVENT_ERROR); return downstream_error(dconn, Downstream::EVENT_ERROR);
} }
if (rv != 0) { if (rv != 0) {

View File

@ -218,12 +218,14 @@ void info_callback(const SSL *ssl, int where, int ret) {
// to disable it, we check that renegotiation is started in this // to disable it, we check that renegotiation is started in this
// callback. // callback.
if (where & SSL_CB_HANDSHAKE_START) { if (where & SSL_CB_HANDSHAKE_START) {
auto handler = static_cast<ClientHandler *>(SSL_get_app_data(ssl)); auto conn = static_cast<Connection *>(SSL_get_app_data(ssl));
if (handler && handler->get_tls_handshake()) { if (conn && conn->tls.initial_handshake_done) {
handler->set_tls_renegotiation(true); // We only set SSL_get_app_data for ClientHandler for now.
auto handler = static_cast<ClientHandler *>(conn->data);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
CLOG(INFO, handler) << "TLS renegotiation started"; CLOG(INFO, handler) << "TLS renegotiation started";
} }
handler->start_immediate_shutdown();
} }
} }
} }