diff --git a/gennghttpxfun.py b/gennghttpxfun.py index 2c04e7d5..f322e5c1 100755 --- a/gennghttpxfun.py +++ b/gennghttpxfun.py @@ -93,6 +93,7 @@ OPTIONS = [ "include", "tls-ticket-cipher", "host-rewrite", + "tls-session-cache-memcached", "conf", ] diff --git a/src/Makefile.am b/src/Makefile.am index ff800463..304b6c09 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -120,6 +120,10 @@ NGHTTPX_SRCS = \ shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \ shrpx_rate_limit.cc shrpx_rate_limit.h \ shrpx_connection.cc shrpx_connection.h \ + shrpx_memcached_dispatcher.cc shrpx_memcached_dispatcher.h \ + shrpx_memcached_connection.cc shrpx_memcached_connection.h \ + shrpx_memcached_request.h \ + shrpx_memcached_result.h \ buffer.h memchunk.h template.h if HAVE_SPDYLAY diff --git a/src/buffer.h b/src/buffer.h index e8c9a5e2..1921edf1 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -58,7 +58,17 @@ template struct Buffer { pos += count; return count; } + size_t drain_reset(size_t count) { + count = std::min(count, rleft()); + std::copy(pos + count, last, std::begin(buf)); + last = std::begin(buf) + (last - (pos + count)); + pos = std::begin(buf); + return count; + } void reset() { pos = last = std::begin(buf); } + uint8_t *begin() { return std::begin(buf); } + uint8_t &operator[](size_t n) { return buf[n]; } + const uint8_t &operator[](size_t n) const { return buf[n]; } std::array buf; uint8_t *pos, *last; }; diff --git a/src/shrpx-unittest.cc b/src/shrpx-unittest.cc index aea0eb33..a783336d 100644 --- a/src/shrpx-unittest.cc +++ b/src/shrpx-unittest.cc @@ -164,6 +164,7 @@ int main(int argc, char *argv[]) { shrpx::test_util_parse_http_date) || !CU_add_test(pSuite, "util_localtime_date", shrpx::test_util_localtime_date) || + !CU_add_test(pSuite, "util_get_uint64", shrpx::test_util_get_uint64) || !CU_add_test(pSuite, "gzip_inflate", test_nghttp2_gzip_inflate) || !CU_add_test(pSuite, "buffer_write", nghttp2::test_buffer_write) || !CU_add_test(pSuite, "pool_recycle", nghttp2::test_pool_recycle) || diff --git a/src/shrpx.cc b/src/shrpx.cc index 01d15205..0532d920 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -1365,6 +1365,10 @@ SSL/TLS: Default: )" << util::duration_str(get_config()->ocsp_update_interval) << R"( --no-ocsp Disable OCSP stapling. + --tls-session-cache-memcached=, + Specify address of memcached server to store session + cache. This enables shared session cache between + multiple nghttpx instances. HTTP/2 and SPDY: -c, --http2-max-concurrent-streams= @@ -1728,6 +1732,7 @@ int main(int argc, char **argv) { {SHRPX_OPT_INCLUDE, required_argument, &flag, 83}, {SHRPX_OPT_TLS_TICKET_CIPHER, required_argument, &flag, 84}, {SHRPX_OPT_HOST_REWRITE, no_argument, &flag, 85}, + {SHRPX_OPT_TLS_SESSION_CACHE_MEMCACHED, required_argument, &flag, 86}, {nullptr, 0, nullptr, 0}}; int option_index = 0; @@ -2102,6 +2107,10 @@ int main(int argc, char **argv) { // --host-rewrite cmdcfgs.emplace_back(SHRPX_OPT_HOST_REWRITE, "yes"); break; + case 86: + // --tls-session-cache-memcached + cmdcfgs.emplace_back(SHRPX_OPT_TLS_SESSION_CACHE_MEMCACHED, optarg); + break; default: break; } @@ -2380,6 +2389,16 @@ int main(int argc, char **argv) { } } + if (get_config()->session_cache_memcached_host) { + if (resolve_hostname(&mod_config()->session_cache_memcached_addr, + &mod_config()->session_cache_memcached_addrlen, + get_config()->session_cache_memcached_host.get(), + get_config()->session_cache_memcached_port, + AF_UNSPEC) == -1) { + exit(EXIT_FAILURE); + } + } + if (get_config()->rlimit_nofile) { struct rlimit lim = {static_cast(get_config()->rlimit_nofile), static_cast(get_config()->rlimit_nofile)}; diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 0e2c8ff9..3c57a54e 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -380,7 +380,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, ev_timer_again(conn_.loop, &conn_.rt); if (conn_.tls.ssl) { - SSL_set_app_data(conn_.tls.ssl, &conn_); + conn_.prepare_server_handshake(); read_ = write_ = &ClientHandler::tls_handshake; on_read_ = &ClientHandler::upstream_noop; on_write_ = &ClientHandler::upstream_write; @@ -848,4 +848,6 @@ ev_io *ClientHandler::get_wev() { return &conn_.wev; } Worker *ClientHandler::get_worker() const { return worker_; } +Connection *ClientHandler::get_connection() { return &conn_; } + } // namespace shrpx diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 4d4ccd9c..f125f82f 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -130,6 +130,8 @@ public: void signal_write(); ev_io *get_wev(); + Connection *get_connection(); + private: Connection conn_; ev_timer reneg_shutdown_timer_; diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 8029d0bb..65fe1df9 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -704,6 +704,7 @@ enum { SHRPX_OPTID_SUBCERT, SHRPX_OPTID_SYSLOG_FACILITY, SHRPX_OPTID_TLS_PROTO_LIST, + SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED, SHRPX_OPTID_TLS_TICKET_CIPHER, SHRPX_OPTID_TLS_TICKET_KEY_FILE, SHRPX_OPTID_USER, @@ -1180,6 +1181,11 @@ int option_lookup_token(const char *name, size_t namelen) { break; case 27: switch (name[26]) { + case 'd': + if (util::strieq_l("tls-session-cache-memcache", name, 26)) { + return SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED; + } + break; case 's': if (util::strieq_l("worker-frontend-connection", name, 26)) { return SHRPX_OPTID_WORKER_FRONTEND_CONNECTIONS; @@ -1865,6 +1871,17 @@ int parse_config(const char *opt, const char *optarg, mod_config()->no_host_rewrite = !util::strieq(optarg, "yes"); return 0; + case SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED: { + if (split_host_port(host, sizeof(host), &port, optarg, strlen(optarg)) == + -1) { + return -1; + } + + mod_config()->session_cache_memcached_host = strcopy(host); + mod_config()->session_cache_memcached_port = port; + + return 0; + } case SHRPX_OPTID_CONF: LOG(WARN) << "conf: ignored"; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 9f0e52f8..2793c987 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -173,6 +173,8 @@ constexpr char SHRPX_OPT_MAX_HEADER_FIELDS[] = "max-header-fields"; constexpr char SHRPX_OPT_INCLUDE[] = "include"; constexpr char SHRPX_OPT_TLS_TICKET_CIPHER[] = "tls-ticket-cipher"; constexpr char SHRPX_OPT_HOST_REWRITE[] = "host-rewrite"; +constexpr char SHRPX_OPT_TLS_SESSION_CACHE_MEMCACHED[] = + "tls-session-cache-memcached"; union sockaddr_union { sockaddr_storage storage; @@ -253,6 +255,7 @@ struct Config { std::vector tls_proto_list; // binary form of http proxy host and port sockaddr_union downstream_http_proxy_addr; + sockaddr_union session_cache_memcached_addr; std::chrono::seconds tls_session_timeout; ev_tstamp http2_upstream_read_timeout; ev_tstamp upstream_read_timeout; @@ -295,6 +298,7 @@ struct Config { std::unique_ptr errorlog_file; std::unique_ptr fetch_ocsp_response_file; std::unique_ptr user; + std::unique_ptr session_cache_memcached_host; FILE *http2_upstream_dump_request_header; FILE *http2_upstream_dump_response_header; nghttp2_session_callbacks *http2_upstream_callbacks; @@ -316,6 +320,7 @@ struct Config { size_t downstream_connections_per_frontend; // actual size of downstream_http_proxy_addr size_t downstream_http_proxy_addrlen; + size_t session_cache_memcached_addrlen; size_t read_rate; size_t read_burst; size_t write_rate; @@ -349,6 +354,7 @@ struct Config { uint16_t port; // port in http proxy URI uint16_t downstream_http_proxy_port; + uint16_t session_cache_memcached_port; bool verbose; bool daemon; bool verify_client; diff --git a/src/shrpx_connection.cc b/src/shrpx_connection.cc index 1ba8dfdb..8a34eac8 100644 --- a/src/shrpx_connection.cc +++ b/src/shrpx_connection.cc @@ -32,6 +32,8 @@ #include +#include "shrpx_ssl.h" +#include "shrpx_memcached_request.h" #include "memchunk.h" using namespace nghttp2; @@ -42,7 +44,7 @@ Connection::Connection(struct ev_loop *loop, int fd, SSL *ssl, size_t write_rate, size_t write_burst, size_t read_rate, size_t read_burst, IOCb writecb, IOCb readcb, TimerCb timeoutcb, void *data) - : tls{ssl}, wlimit(loop, &wev, write_rate, write_burst), + : tls{}, wlimit(loop, &wev, write_rate, write_burst), rlimit(loop, &rev, read_rate, read_burst, ssl), writecb(writecb), readcb(readcb), timeoutcb(timeoutcb), loop(loop), data(data), fd(fd) { @@ -60,6 +62,10 @@ Connection::Connection(struct ev_loop *loop, int fd, SSL *ssl, // set 0. to double field explicitly just in case tls.last_write_time = 0.; + + if (ssl) { + set_ssl(ssl); + } } Connection::~Connection() { @@ -78,15 +84,25 @@ void Connection::disconnect() { wlimit.stopw(); if (tls.ssl) { - SSL_set_app_data(tls.ssl, nullptr); SSL_set_shutdown(tls.ssl, SSL_RECEIVED_SHUTDOWN); ERR_clear_error(); + + if (tls.cached_session) { + SSL_SESSION_free(tls.cached_session); + } + + if (tls.cached_session_lookup_req) { + tls.cached_session_lookup_req->canceled = true; + } + // To reuse SSL/TLS session, we have to shutdown, and don't free // tls.ssl. if (SSL_shutdown(tls.ssl) != 1) { SSL_free(tls.ssl); tls.ssl = nullptr; } + + tls = {tls.ssl}; } if (fd != -1) { @@ -96,31 +112,274 @@ void Connection::disconnect() { } } -int Connection::tls_handshake() { - auto rv = SSL_do_handshake(tls.ssl); +namespace { +void allocate_buffer(Connection *conn) { + conn->tls.rb = make_unique>(); + conn->tls.wb = make_unique>(); +} +} // namespace - if (rv == 0) { - return SHRPX_ERR_NETWORK; +void Connection::prepare_client_handshake() { + SSL_set_connect_state(tls.ssl); + allocate_buffer(this); +} + +void Connection::prepare_server_handshake() { + SSL_set_accept_state(tls.ssl); + allocate_buffer(this); +} + +// BIO implementation is inspired by openldap implementation: +// http://www.openldap.org/devel/cvsweb.cgi/~checkout~/libraries/libldap/tls_o.c +namespace { +int shrpx_bio_write(BIO *b, const char *buf, int len) { + if (buf == nullptr || len <= 0) { + return 0; } - if (rv < 0) { + auto conn = static_cast(b->ptr); + auto &wb = conn->tls.wb; + + BIO_clear_retry_flags(b); + + if (conn->tls.initial_handshake_done) { + // After handshake finished, send |buf| of length |len| to the + // socket directly. + if (wb && wb->rleft()) { + auto nwrite = conn->write_clear(wb->pos, wb->rleft()); + if (nwrite < 0) { + return -1; + } + + wb->drain(nwrite); + if (wb->rleft()) { + BIO_set_retry_write(b); + return -1; + } + + // Here delete TLS write buffer + wb.reset(); + } + auto nwrite = conn->write_clear(buf, len); + if (nwrite < 0) { + return -1; + } + + if (nwrite == 0) { + BIO_set_retry_write(b); + return -1; + } + + return nwrite; + } + + auto nwrite = std::min(static_cast(len), wb->wleft()); + + if (nwrite == 0) { + BIO_set_retry_write(b); + return -1; + } + + wb->write(buf, nwrite); + + return nwrite; +} +} // namespace + +namespace { +int shrpx_bio_read(BIO *b, char *buf, int len) { + if (buf == nullptr || len <= 0) { + return 0; + } + + auto conn = static_cast(b->ptr); + auto &rb = conn->tls.rb; + + BIO_clear_retry_flags(b); + + if (conn->tls.initial_handshake_done && !rb) { + auto nread = conn->read_clear(buf, len); + if (nread < 0) { + return -1; + } + if (nread == 0) { + BIO_set_retry_read(b); + return -1; + } + return nread; + } + + auto nread = std::min(static_cast(len), rb->rleft()); + + if (nread == 0) { + if (conn->tls.initial_handshake_done) { + rb.reset(); + } + + BIO_set_retry_read(b); + return -1; + } + + std::copy_n(rb->pos, nread, buf); + + rb->drain(nread); + + return nread; +} +} // namespace + +namespace { +int shrpx_bio_puts(BIO *b, const char *str) { + return shrpx_bio_write(b, str, strlen(str)); +} +} // namespace + +namespace { +int shrpx_bio_gets(BIO *b, char *buf, int len) { return -1; } +} // namespace + +namespace { +long shrpx_bio_ctrl(BIO *b, int cmd, long num, void *ptr) { + switch (cmd) { + case BIO_CTRL_FLUSH: + return 1; + } + + return 0; +} +} // namespace + +namespace { +int shrpx_bio_create(BIO *b) { + b->init = 1; + b->num = 0; + b->ptr = nullptr; + b->flags = 0; + return 1; +} +} // namespace + +namespace { +int shrpx_bio_destroy(BIO *b) { + if (b == nullptr) { + return 0; + } + + b->ptr = nullptr; + b->init = 0; + b->flags = 0; + + return 1; +} +} // namespace + +namespace { +BIO_METHOD shrpx_bio_method = { + BIO_TYPE_FD, "nghttpx-bio", shrpx_bio_write, + shrpx_bio_read, shrpx_bio_puts, shrpx_bio_gets, + shrpx_bio_ctrl, shrpx_bio_create, shrpx_bio_destroy, +}; +} // namespace + +void Connection::set_ssl(SSL *ssl) { + tls.ssl = ssl; + auto bio = BIO_new(&shrpx_bio_method); + bio->ptr = this; + SSL_set_bio(tls.ssl, bio, bio); + SSL_set_app_data(tls.ssl, this); + rlimit.set_ssl(tls.ssl); +} + +int Connection::tls_handshake() { + wlimit.stopw(); + ev_timer_stop(loop, &wt); + + auto nread = read_clear(tls.rb->last, tls.rb->wleft()); + if (nread < 0) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "tls: handshake read error"; + } + return -1; + } + tls.rb->write(nread); + + switch (tls.handshake_state) { + case TLS_CONN_WAIT_FOR_SESSION_CACHE: + if (tls.rb->wleft() == 0) { + // Input buffer is full. Disable read until cache is returned + rlimit.stopw(); + ev_timer_stop(loop, &rt); + } + return SHRPX_ERR_INPROGRESS; + case TLS_CONN_GOT_SESSION_CACHE: { + tls.wb->reset(); + tls.rb->pos = tls.rb->begin(); + + auto ssl_ctx = SSL_get_SSL_CTX(tls.ssl); + SSL_free(tls.ssl); + + auto ssl = ssl::create_ssl(ssl_ctx); + if (!ssl) { + return -1; + } + + set_ssl(ssl); + + SSL_set_accept_state(tls.ssl); + + tls.handshake_state = TLS_CONN_NORMAL; + break; + } + case TLS_CONN_CANCEL_SESSION_CACHE: + tls.handshake_state = TLS_CONN_NORMAL; + break; + } + + auto rv = SSL_do_handshake(tls.ssl); + + if (rv <= 0) { auto err = SSL_get_error(tls.ssl, rv); switch (err) { case SSL_ERROR_WANT_READ: - wlimit.stopw(); - ev_timer_stop(loop, &wt); - return SHRPX_ERR_INPROGRESS; case SSL_ERROR_WANT_WRITE: - wlimit.startw(); - ev_timer_again(loop, &wt); - return SHRPX_ERR_INPROGRESS; + break; default: + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "tls: handshake libssl error " << err; + } return SHRPX_ERR_NETWORK; } } - wlimit.stopw(); - ev_timer_stop(loop, &wt); + if (tls.handshake_state == TLS_CONN_WAIT_FOR_SESSION_CACHE) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "tls: handshake is still in progress"; + } + return SHRPX_ERR_INPROGRESS; + } + + if (tls.wb->rleft()) { + auto nwrite = write_clear(tls.wb->pos, tls.wb->rleft()); + if (nwrite < 0) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "tls: handshake write error"; + } + return -1; + } + tls.wb->drain(nwrite); + } + + if (tls.wb->rleft()) { + wlimit.startw(); + ev_timer_again(loop, &wt); + } + + if (rv != 1) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "tls: handshake is still in progress"; + } + return SHRPX_ERR_INPROGRESS; + } tls.initial_handshake_done = true; diff --git a/src/shrpx_connection.h b/src/shrpx_connection.h index 055d32c3..7b73bb85 100644 --- a/src/shrpx_connection.h +++ b/src/shrpx_connection.h @@ -35,19 +35,34 @@ #include "shrpx_rate_limit.h" #include "shrpx_error.h" +#include "buffer.h" namespace shrpx { +struct MemcachedRequest; + +enum { + TLS_CONN_NORMAL, + TLS_CONN_WAIT_FOR_SESSION_CACHE, + TLS_CONN_GOT_SESSION_CACHE, + TLS_CONN_CANCEL_SESSION_CACHE, +}; + struct TLSConnection { SSL *ssl; + SSL_SESSION *cached_session; + MemcachedRequest *cached_session_lookup_req; ev_tstamp last_write_time; size_t warmup_writelen; // length passed to SSL_write and SSL_read last time. This is // required since these functions require the exact same parameters // on non-blocking I/O. size_t last_writelen, last_readlen; + int handshake_state; bool initial_handshake_done; bool reneg_started; + std::unique_ptr> rb; + std::unique_ptr> wb; }; template using EVCb = void (*)(struct ev_loop *, T *, int); @@ -64,6 +79,9 @@ struct Connection { void disconnect(); + void prepare_client_handshake(); + void prepare_server_handshake(); + int tls_handshake(); // All write_* and writev_clear functions return number of bytes @@ -89,6 +107,8 @@ struct Connection { void handle_tls_pending_read(); + void set_ssl(SSL *ssl); + TLSConnection tls; ev_io wev; ev_io rev; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 164a57da..7a6765c2 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -323,12 +323,12 @@ int Http2Session::initiate_connection() { // We are establishing TLS connection. If conn_.tls.ssl, we may // reuse the previous session. if (!conn_.tls.ssl) { - conn_.tls.ssl = SSL_new(ssl_ctx_); - if (!conn_.tls.ssl) { - SSLOG(ERROR, this) << "SSL_new() failed: " - << ERR_error_string(ERR_get_error(), NULL); + auto ssl = ssl::create_ssl(ssl_ctx_); + if (!ssl) { return -1; } + + conn_.set_ssl(ssl); } const char *sni_name = nullptr; @@ -369,11 +369,7 @@ int Http2Session::initiate_connection() { ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); } - if (SSL_set_fd(conn_.tls.ssl, conn_.fd) == 0) { - return -1; - } - - SSL_set_connect_state(conn_.tls.ssl); + conn_.prepare_client_handshake(); } else { if (state_ == DISCONNECTED) { // Without TLS and proxy. diff --git a/src/shrpx_log.h b/src/shrpx_log.h index e13be41e..bb86a92e 100644 --- a/src/shrpx_log.h +++ b/src/shrpx_log.h @@ -76,6 +76,10 @@ class Downstream; #define SSLOG(SEVERITY, HTTP2) \ (Log(SEVERITY, __FILE__, __LINE__) << "[DHTTP2:" << HTTP2 << "] ") +// Memcached connection log +#define MCLOG(SEVERITY, MCONN) \ + (Log(SEVERITY, __FILE__, __LINE__) << "[MCONN:" << MCONN << "] ") + enum SeverityLevel { INFO, NOTICE, WARN, ERROR, FATAL }; class Log { diff --git a/src/shrpx_memcached_connection.cc b/src/shrpx_memcached_connection.cc new file mode 100644 index 00000000..cf5f09fe --- /dev/null +++ b/src/shrpx_memcached_connection.cc @@ -0,0 +1,537 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_memcached_connection.h" + +#include "shrpx_memcached_request.h" +#include "shrpx_memcached_result.h" +#include "shrpx_config.h" +#include "util.h" + +namespace shrpx { + +namespace { +void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { + auto conn = static_cast(w->data); + auto mconn = static_cast(conn->data); + + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, mconn) << "Time out"; + } + + mconn->disconnect(); +} +} // namespace + +namespace { +void readcb(struct ev_loop *loop, ev_io *w, int revents) { + auto conn = static_cast(w->data); + auto mconn = static_cast(conn->data); + + if (mconn->on_read() != 0) { + mconn->disconnect(); + return; + } +} +} // namespace + +namespace { +void writecb(struct ev_loop *loop, ev_io *w, int revents) { + auto conn = static_cast(w->data); + auto mconn = static_cast(conn->data); + + if (mconn->on_write() != 0) { + mconn->disconnect(); + return; + } +} +} // namespace + +namespace { +void connectcb(struct ev_loop *loop, ev_io *w, int revents) { + auto conn = static_cast(w->data); + auto mconn = static_cast(conn->data); + + if (mconn->on_connect() != 0) { + mconn->disconnect(); + return; + } + + writecb(loop, w, revents); +} +} // namespace + +constexpr ev_tstamp write_timeout = 10.; +constexpr ev_tstamp read_timeout = 10.; + +MemcachedConnection::MemcachedConnection(const sockaddr_union *addr, + size_t addrlen, struct ev_loop *loop) + : conn_(loop, -1, nullptr, write_timeout, read_timeout, 0, 0, 0, 0, + connectcb, readcb, timeoutcb, this), + parse_state_{}, addr_(addr), addrlen_(addrlen), sendsum_(0), + connected_(false) {} + +MemcachedConnection::~MemcachedConnection() { disconnect(); } + +namespace { +void clear_request(std::deque> &q) { + for (auto &req : q) { + if (req->cb) { + req->cb(req.get(), MemcachedResult(MEMCACHED_ERR_ERROR)); + } + } + q.clear(); +} +} // namespace + +void MemcachedConnection::disconnect() { + clear_request(recvq_); + clear_request(sendq_); + + sendbufv_.clear(); + sendsum_ = 0; + + parse_state_ = {}; + + connected_ = false; + + conn_.disconnect(); + + assert(recvbuf_.rleft() == 0); + recvbuf_.reset(); +} + +int MemcachedConnection::initiate_connection() { + assert(conn_.fd == -1); + + conn_.fd = util::create_nonblock_socket(addr_->storage.ss_family); + + if (conn_.fd == -1) { + auto error = errno; + MCLOG(WARN, this) << "socket() failed; errno=" << error; + + return -1; + } + + int rv; + rv = connect(conn_.fd, &addr_->sa, addrlen_); + if (rv != 0 && errno != EINPROGRESS) { + auto error = errno; + MCLOG(WARN, this) << "connect() failed; errno=" << error; + + close(conn_.fd); + conn_.fd = -1; + + return -1; + } + + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, this) << "Connecting to memcached server"; + } + + ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); + ev_io_set(&conn_.rev, conn_.fd, EV_READ); + + ev_set_cb(&conn_.wev, connectcb); + + conn_.wlimit.startw(); + ev_timer_again(conn_.loop, &conn_.wt); + + return 0; +} + +int MemcachedConnection::on_connect() { + if (!util::check_socket_connected(conn_.fd)) { + conn_.wlimit.stopw(); + + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, this) << "memcached connect failed"; + } + + return -1; + } + + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, this) << "connected to memcached server"; + } + + connected_ = true; + + ev_set_cb(&conn_.wev, writecb); + + conn_.rlimit.startw(); + ev_timer_again(conn_.loop, &conn_.rt); + + return 0; +} + +int MemcachedConnection::on_write() { + if (!connected_) { + return 0; + } + + ev_timer_again(conn_.loop, &conn_.rt); + + if (sendq_.empty()) { + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); + + return 0; + } + + int rv; + + for (; !sendq_.empty();) { + rv = send_request(); + + if (rv < 0) { + return -1; + } + + if (rv == 1) { + // blocked + return 0; + } + } + + conn_.wlimit.stopw(); + ev_timer_stop(conn_.loop, &conn_.wt); + + return 0; +} + +int MemcachedConnection::on_read() { + if (!connected_) { + return 0; + } + + ev_timer_again(conn_.loop, &conn_.rt); + + for (;;) { + auto nread = conn_.read_clear(recvbuf_.last, recvbuf_.wleft()); + + if (nread == 0) { + return 0; + } + + if (nread < 0) { + return -1; + } + + recvbuf_.write(nread); + + if (parse_packet() != 0) { + return -1; + } + } + + return 0; +} + +int MemcachedConnection::parse_packet() { + auto in = recvbuf_.pos; + + for (;;) { + auto busy = false; + + switch (parse_state_.state) { + case MEMCACHED_PARSE_HEADER24: { + if (recvbuf_.last - in < 24) { + recvbuf_.drain_reset(in - recvbuf_.pos); + return 0; + } + + if (recvq_.empty()) { + MCLOG(WARN, this) + << "Response received, but there is no in-flight request."; + return -1; + } + + auto &req = recvq_.front(); + + if (*in != MEMCACHED_RES_MAGIC) { + MCLOG(WARN, this) << "Response has bad magic: " + << static_cast(*in); + return -1; + } + ++in; + + parse_state_.op = *in++; + parse_state_.keylen = util::get_uint16(in); + in += 2; + parse_state_.extralen = *in++; + // skip 1 byte reserved data type + ++in; + parse_state_.status_code = util::get_uint16(in); + in += 2; + parse_state_.totalbody = util::get_uint32(in); + in += 4; + // skip 4 bytes opaque + in += 4; + parse_state_.cas = util::get_uint64(in); + in += 8; + + if (req->op != parse_state_.op) { + MCLOG(WARN, this) + << "opcode in response does not match to the request: want " + << static_cast(req->op) << ", got " << parse_state_.op; + return -1; + } + + if (parse_state_.keylen != 0) { + MCLOG(WARN, this) << "zero length keylen expected: got " + << parse_state_.keylen; + return -1; + } + + if (parse_state_.totalbody > 16_k) { + MCLOG(WARN, this) << "totalbody is too large: got " + << parse_state_.totalbody; + return -1; + } + + if (parse_state_.op == MEMCACHED_OP_GET && + parse_state_.status_code == 0 && parse_state_.extralen == 0) { + MCLOG(WARN, this) << "response for GET does not have extra"; + return -1; + } + + if (parse_state_.totalbody < + parse_state_.keylen + parse_state_.extralen) { + MCLOG(WARN, this) << "totalbody is too short: totalbody " + << parse_state_.totalbody << ", want min " + << parse_state_.keylen + parse_state_.extralen; + return -1; + } + + if (parse_state_.extralen) { + parse_state_.state = MEMCACHED_PARSE_EXTRA; + parse_state_.read_left = parse_state_.extralen; + } else { + parse_state_.state = MEMCACHED_PARSE_VALUE; + parse_state_.read_left = parse_state_.totalbody - parse_state_.keylen - + parse_state_.extralen; + } + busy = true; + break; + } + case MEMCACHED_PARSE_EXTRA: { + // We don't use extra for now. Just read and forget. + auto n = std::min(static_cast(recvbuf_.last - in), + parse_state_.read_left); + + parse_state_.read_left -= n; + in += n; + if (parse_state_.read_left) { + recvbuf_.reset(); + return 0; + } + parse_state_.state = MEMCACHED_PARSE_VALUE; + // since we require keylen == 0, totalbody - extralen == + // valuelen + parse_state_.read_left = + parse_state_.totalbody - parse_state_.keylen - parse_state_.extralen; + busy = true; + break; + } + case MEMCACHED_PARSE_VALUE: { + auto n = std::min(static_cast(recvbuf_.last - in), + parse_state_.read_left); + + parse_state_.value.insert(std::end(parse_state_.value), in, in + n); + + parse_state_.read_left -= n; + in += n; + if (parse_state_.read_left) { + recvbuf_.reset(); + return 0; + } + + if (LOG_ENABLED(INFO)) { + if (parse_state_.status_code) { + MCLOG(INFO, this) + << "response returned error status: " << parse_state_.status_code; + } + } + + auto req = std::move(recvq_.front()); + recvq_.pop_front(); + + if (!req->canceled && req->cb) { + req->cb(req.get(), MemcachedResult(parse_state_.status_code, + std::move(parse_state_.value))); + } + + parse_state_ = {}; + break; + } + } + + if (!busy && in == recvbuf_.last) { + break; + } + } + + assert(in == recvbuf_.last); + recvbuf_.reset(); + + return 0; +} + +int MemcachedConnection::send_request() { + ssize_t nwrite; + + if (sendsum_ == 0) { + for (auto &req : sendq_) { + if (req->canceled) { + continue; + } + if (serialized_size(req.get()) + sendsum_ > 1300) { + break; + } + sendbufv_.emplace_back(); + sendbufv_.back().req = req.get(); + make_request(&sendbufv_.back(), req.get()); + sendsum_ += sendbufv_.back().left(); + } + + if (sendsum_ == 0) { + sendq_.clear(); + return 0; + } + } + + std::array iov; + size_t iovlen = 0; + for (auto &buf : sendbufv_) { + if (iovlen + 2 > iov.size()) { + break; + } + + auto req = buf.req; + if (buf.headbuf.rleft()) { + iov[iovlen++] = {buf.headbuf.pos, buf.headbuf.rleft()}; + } + if (buf.send_value_left) { + iov[iovlen++] = {req->value.data() + req->value.size() - + buf.send_value_left, + buf.send_value_left}; + } + } + + nwrite = conn_.writev_clear(iov.data(), iovlen); + if (nwrite < 0) { + return -1; + } + if (nwrite == 0) { + return 1; + } + + sendsum_ -= nwrite; + + while (nwrite > 0) { + auto &buf = sendbufv_.front(); + auto &req = sendq_.front(); + if (req->canceled) { + sendq_.pop_front(); + continue; + } + assert(buf.req == req.get()); + auto n = std::min(static_cast(nwrite), buf.headbuf.rleft()); + buf.headbuf.drain(n); + nwrite -= n; + n = std::min(static_cast(nwrite), buf.send_value_left); + buf.send_value_left -= n; + nwrite -= n; + + if (buf.headbuf.rleft() || buf.send_value_left) { + break; + } + sendbufv_.pop_front(); + recvq_.push_back(std::move(sendq_.front())); + sendq_.pop_front(); + } + + return 0; +} + +size_t MemcachedConnection::serialized_size(MemcachedRequest *req) { + switch (req->op) { + case MEMCACHED_OP_GET: + return 24 + req->key.size(); + case MEMCACHED_OP_ADD: + default: + return 24 + 8 + req->key.size() + req->value.size(); + } +} + +void MemcachedConnection::make_request(MemcachedSendbuf *sendbuf, + MemcachedRequest *req) { + auto &headbuf = sendbuf->headbuf; + + std::fill(std::begin(headbuf.buf), std::end(headbuf.buf), 0); + + headbuf[0] = MEMCACHED_REQ_MAGIC; + headbuf[1] = req->op; + switch (req->op) { + case MEMCACHED_OP_GET: + util::put_uint16be(&headbuf[2], req->key.size()); + util::put_uint32be(&headbuf[8], req->key.size()); + headbuf.write(24); + break; + case MEMCACHED_OP_ADD: + util::put_uint16be(&headbuf[2], req->key.size()); + headbuf[4] = 8; + util::put_uint32be(&headbuf[8], 8 + req->key.size() + req->value.size()); + util::put_uint32be(&headbuf[28], req->expiry); + headbuf.write(32); + break; + } + + headbuf.write(req->key.c_str(), req->key.size()); + + sendbuf->send_value_left = req->value.size(); +} + +int MemcachedConnection::add_request(std::unique_ptr req) { + sendq_.push_back(std::move(req)); + + if (connected_) { + signal_write(); + return 0; + } + + if (conn_.fd == -1) { + if (initiate_connection() != 0) { + return -1; + } + } + + return 0; +} + +// TODO should we start write timer too? +void MemcachedConnection::signal_write() { conn_.wlimit.startw(); } + +} // namespace shrpx diff --git a/src/shrpx_memcached_connection.h b/src/shrpx_memcached_connection.h new file mode 100644 index 00000000..31b090c5 --- /dev/null +++ b/src/shrpx_memcached_connection.h @@ -0,0 +1,131 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_MEMCACHED_CONNECTION_H +#define SHRPX_MEMCACHED_CONNECTION_H + +#include "shrpx.h" + +#include +#include + +#include + +#include "shrpx_connection.h" +#include "buffer.h" + +using namespace nghttp2; + +namespace shrpx { + +struct MemcachedRequest; +union sockaddr_union; + +enum { + MEMCACHED_PARSE_HEADER24, + MEMCACHED_PARSE_EXTRA, + MEMCACHED_PARSE_VALUE, +}; + +// Stores state when parsing response from memcached server +struct MemcachedParseState { + // Buffer for value, dynamically allocated. + std::vector value; + // cas in response + uint64_t cas; + // keylen in response + size_t keylen; + // extralen in response + size_t extralen; + // totalbody in response. The length of value is totalbody - + // extralen - keylen. + size_t totalbody; + // Number of bytes left to read variable length field. + size_t read_left; + // Parser state; see enum above + int state; + // status_code in response + int status_code; + // op in response + int op; +}; + +struct MemcachedSendbuf { + // Buffer for header + extra + key + Buffer<512> headbuf; + // MemcachedRequest associated to this object + MemcachedRequest *req; + // Number of bytes left when sending value + size_t send_value_left; + // Returns the number of bytes this object transmits. + size_t left() const { return headbuf.rleft() + send_value_left; } +}; + +constexpr uint8_t MEMCACHED_REQ_MAGIC = 0x80; +constexpr uint8_t MEMCACHED_RES_MAGIC = 0x81; + +// MemcachedConnection implements part of memcached binary protocol. +// This is not full brown implementation. Just the part we need is +// implemented. We only use GET and ADD. +// +// https://github.com/memcached/memcached/blob/master/doc/protocol-binary.xml +// https://code.google.com/p/memcached/wiki/MemcacheBinaryProtocol +class MemcachedConnection { +public: + MemcachedConnection(const sockaddr_union *addr, size_t addrlen, + struct ev_loop *loop); + ~MemcachedConnection(); + + void disconnect(); + + int add_request(std::unique_ptr req); + int initiate_connection(); + + int on_connect(); + int on_write(); + int on_read(); + int send_request(); + void make_request(MemcachedSendbuf *sendbuf, MemcachedRequest *req); + int parse_packet(); + size_t serialized_size(MemcachedRequest *req); + + void signal_write(); + +private: + Connection conn_; + std::deque> recvq_; + std::deque> sendq_; + std::deque sendbufv_; + MemcachedParseState parse_state_; + const sockaddr_union *addr_; + size_t addrlen_; + // Sum of the bytes to be transmitted in sendbufv_. + size_t sendsum_; + bool connected_; + Buffer<8_k> recvbuf_; +}; + +} // namespace shrpx + +#endif // SHRPX_MEMCACHED_CONNECTION_H diff --git a/src/shrpx_memcached_dispatcher.cc b/src/shrpx_memcached_dispatcher.cc new file mode 100644 index 00000000..66bebceb --- /dev/null +++ b/src/shrpx_memcached_dispatcher.cc @@ -0,0 +1,48 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_memcached_dispatcher.h" + +#include "shrpx_memcached_request.h" +#include "shrpx_memcached_connection.h" +#include "shrpx_config.h" + +namespace shrpx { + +MemcachedDispatcher::MemcachedDispatcher(const sockaddr_union *addr, + size_t addrlen, struct ev_loop *loop) + : loop_(loop), + mconn_(make_unique(addr, addrlen, loop_)) {} + +MemcachedDispatcher::~MemcachedDispatcher() {} + +int MemcachedDispatcher::add_request(std::unique_ptr req) { + if (mconn_->add_request(std::move(req)) != 0) { + return -1; + } + + return 0; +} + +} // namespace shrpx diff --git a/src/shrpx_memcached_dispatcher.h b/src/shrpx_memcached_dispatcher.h new file mode 100644 index 00000000..95f85a70 --- /dev/null +++ b/src/shrpx_memcached_dispatcher.h @@ -0,0 +1,55 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_MEMCACHED_DISPATCHER_H +#define SHRPX_MEMCACHED_DISPATCHER_H + +#include "shrpx.h" + +#include + +#include + +namespace shrpx { + +struct MemcachedRequest; +class MemcachedConnection; +union sockaddr_union; + +class MemcachedDispatcher { +public: + MemcachedDispatcher(const sockaddr_union *addr, size_t addrlen, + struct ev_loop *loop); + ~MemcachedDispatcher(); + + int add_request(std::unique_ptr req); + +private: + struct ev_loop *loop_; + std::unique_ptr mconn_; +}; + +} // namespace shrpx + +#endif // SHRPX_MEMCACHED_DISPATCHER_H diff --git a/src/shrpx_memcached_request.h b/src/shrpx_memcached_request.h new file mode 100644 index 00000000..3bd79fd7 --- /dev/null +++ b/src/shrpx_memcached_request.h @@ -0,0 +1,59 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_MEMCACHED_REQUEST_H +#define SHRPX_MEMCACHED_REQUEST_H + +#include "shrpx.h" + +#include +#include +#include + +#include "shrpx_memcached_result.h" + +namespace shrpx { + +enum { + MEMCACHED_OP_GET = 0x00, + MEMCACHED_OP_ADD = 0x02, +}; + +struct MemcachedRequest; + +using MemcachedResultCallback = + std::function; + +struct MemcachedRequest { + std::string key; + std::vector value; + MemcachedResultCallback cb; + uint32_t expiry; + int op; + bool canceled; +}; + +} // namespace shrpx + +#endif // SHRPX_MEMCACHED_REQUEST_H diff --git a/src/shrpx_memcached_result.h b/src/shrpx_memcached_result.h new file mode 100644 index 00000000..c2cd70d0 --- /dev/null +++ b/src/shrpx_memcached_result.h @@ -0,0 +1,50 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2015 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_MEMCACHED_RESULT_H +#define SHRPX_MEMCACHED_RESULT_H + +#include "shrpx.h" + +#include + +namespace shrpx { + +enum MemcachedStatusCode { + MEMCACHED_ERR_OK, + MEMCACHED_ERR_ERROR = 0x1001, +}; + +struct MemcachedResult { + MemcachedResult(int status_code) : status_code(status_code) {} + MemcachedResult(int status_code, std::vector value) + : value(std::move(value)), status_code(status_code) {} + + std::vector value; + int status_code; +}; + +} // namespace shrpx + +#endif // SHRPX_MEMCACHED_RESULT_H diff --git a/src/shrpx_rate_limit.cc b/src/shrpx_rate_limit.cc index 85836c14..53451f4c 100644 --- a/src/shrpx_rate_limit.cc +++ b/src/shrpx_rate_limit.cc @@ -106,4 +106,6 @@ void RateLimit::handle_tls_pending_read() { ev_feed_event(loop_, w_, EV_READ); } +void RateLimit::set_ssl(SSL *ssl) { ssl_ = ssl; } + } // namespace shrpx diff --git a/src/shrpx_rate_limit.h b/src/shrpx_rate_limit.h index 4550d012..24c96275 100644 --- a/src/shrpx_rate_limit.h +++ b/src/shrpx_rate_limit.h @@ -48,6 +48,7 @@ public: // required since it is buffered in ssl_ object, io event is not // generated unless new incoming data is received. void handle_tls_pending_read(); + void set_ssl(SSL *ssl); private: ev_timer t_; diff --git a/src/shrpx_ssl.cc b/src/shrpx_ssl.cc index 0681888e..19584ebf 100644 --- a/src/shrpx_ssl.cc +++ b/src/shrpx_ssl.cc @@ -54,6 +54,8 @@ #include "shrpx_worker.h" #include "shrpx_downstream_connection_pool.h" #include "shrpx_http2_session.h" +#include "shrpx_memcached_request.h" +#include "shrpx_memcached_dispatcher.h" #include "util.h" #include "ssl.h" #include "template.h" @@ -183,6 +185,126 @@ int ocsp_resp_cb(SSL *ssl, void *arg) { } } // namespace +constexpr char MEMCACHED_SESSION_ID_PREFIX[] = "nghttpx:session-id:"; + +namespace { +int tls_session_new_cb(SSL *ssl, SSL_SESSION *session) { + auto handler = static_cast(SSL_get_app_data(ssl)); + auto worker = handler->get_worker(); + auto dispatcher = worker->get_session_cache_memcached_dispatcher(); + + const unsigned char *id; + unsigned int idlen; + + id = SSL_SESSION_get_id(session, &idlen); + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Memached: cache session, id=" << util::format_hex(id, idlen); + } + + auto req = make_unique(); + req->op = MEMCACHED_OP_ADD; + req->key = MEMCACHED_SESSION_ID_PREFIX; + req->key += util::format_hex(id, idlen); + + auto sessionlen = i2d_SSL_SESSION(session, nullptr); + req->value.resize(sessionlen); + auto buf = &req->value[0]; + i2d_SSL_SESSION(session, &buf); + req->expiry = 12_h; + req->cb = [](MemcachedRequest *req, MemcachedResult res) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Memcached: session cache done. key=" << req->key + << ", status_code=" << res.status_code << ", value=" + << std::string(std::begin(res.value), std::end(res.value)); + } + if (res.status_code != 0) { + LOG(WARN) << "Memcached: failed to cache session key=" << req->key + << ", status_code=" << res.status_code << ", value=" + << std::string(std::begin(res.value), std::end(res.value)); + } + }; + assert(!req->canceled); + + dispatcher->add_request(std::move(req)); + + return 0; +} +} // namespace + +namespace { +SSL_SESSION *tls_session_get_cb(SSL *ssl, unsigned char *id, int idlen, + int *copy) { + auto handler = static_cast(SSL_get_app_data(ssl)); + auto worker = handler->get_worker(); + auto dispatcher = worker->get_session_cache_memcached_dispatcher(); + auto conn = handler->get_connection(); + + if (conn->tls.cached_session) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Memcached: found cached session, id=" + << util::format_hex(id, idlen); + } + + // This is required, without this, memory leak occurs. + *copy = 0; + + auto session = conn->tls.cached_session; + conn->tls.cached_session = nullptr; + return session; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Memcached: get cached session, id=" + << util::format_hex(id, idlen); + } + + auto req = make_unique(); + req->op = MEMCACHED_OP_GET; + req->key = MEMCACHED_SESSION_ID_PREFIX; + req->key += util::format_hex(id, idlen); + req->cb = [conn](MemcachedRequest *, MemcachedResult res) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Memcached: returned status code " << res.status_code; + } + + // We might stop reading, so start it again + conn->rlimit.startw(); + ev_timer_again(conn->loop, &conn->rt); + + conn->wlimit.startw(); + ev_timer_again(conn->loop, &conn->wt); + + conn->tls.cached_session_lookup_req = nullptr; + if (res.status_code != 0) { + conn->tls.handshake_state = TLS_CONN_CANCEL_SESSION_CACHE; + return; + } + + const uint8_t *p = res.value.data(); + + auto session = d2i_SSL_SESSION(nullptr, &p, res.value.size()); + if (!session) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "cannot materialize session"; + } + conn->tls.handshake_state = TLS_CONN_CANCEL_SESSION_CACHE; + return; + } + + conn->tls.cached_session = session; + conn->tls.handshake_state = TLS_CONN_GOT_SESSION_CACHE; + }; + + conn->tls.handshake_state = TLS_CONN_WAIT_FOR_SESSION_CACHE; + conn->tls.cached_session_lookup_req = req.get(); + + dispatcher->add_request(std::move(req)); + + return nullptr; +} +} // namespace + namespace { int ticket_key_cb(SSL *ssl, unsigned char *key_name, unsigned char *iv, EVP_CIPHER_CTX *ctx, HMAC_CTX *hctx, int enc) { @@ -334,18 +456,23 @@ SSL_CTX *create_ssl_context(const char *private_key_file, DIE(); } - auto ssl_opts = (SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS) | - SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION | - SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION | - SSL_OP_SINGLE_ECDH_USE | SSL_OP_SINGLE_DH_USE | - SSL_OP_CIPHER_SERVER_PREFERENCE | - get_config()->tls_proto_mask; + constexpr auto ssl_opts = + (SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS) | SSL_OP_NO_SSLv2 | + SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION | + SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION | SSL_OP_SINGLE_ECDH_USE | + SSL_OP_SINGLE_DH_USE | SSL_OP_CIPHER_SERVER_PREFERENCE; - SSL_CTX_set_options(ssl_ctx, ssl_opts); + SSL_CTX_set_options(ssl_ctx, ssl_opts | get_config()->tls_proto_mask); const unsigned char sid_ctx[] = "shrpx"; SSL_CTX_set_session_id_context(ssl_ctx, sid_ctx, sizeof(sid_ctx) - 1); SSL_CTX_set_session_cache_mode(ssl_ctx, SSL_SESS_CACHE_SERVER); + + if (get_config()->session_cache_memcached_host) { + SSL_CTX_sess_set_new_cb(ssl_ctx, tls_session_new_cb); + SSL_CTX_sess_set_get_cb(ssl_ctx, tls_session_get_cb); + } + SSL_CTX_set_timeout(ssl_ctx, get_config()->tls_session_timeout.count()); const char *ciphers; @@ -493,12 +620,12 @@ SSL_CTX *create_ssl_client_context() { DIE(); } - auto ssl_opts = (SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS) | - SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION | - SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION | - get_config()->tls_proto_mask; + constexpr auto ssl_opts = (SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS) | + SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | + SSL_OP_NO_COMPRESSION | + SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION; - SSL_CTX_set_options(ssl_ctx, ssl_opts); + SSL_CTX_set_options(ssl_ctx, ssl_opts | get_config()->tls_proto_mask); const char *ciphers; if (get_config()->ciphers) { @@ -564,6 +691,17 @@ SSL_CTX *create_ssl_client_context() { return ssl_ctx; } +SSL *create_ssl(SSL_CTX *ssl_ctx) { + auto ssl = SSL_new(ssl_ctx); + if (!ssl) { + LOG(ERROR) << "SSL_new() failed: " << ERR_error_string(ERR_get_error(), + nullptr); + return nullptr; + } + + return ssl; +} + ClientHandler *accept_connection(Worker *worker, int fd, sockaddr *addr, int addrlen) { char host[NI_MAXHOST]; @@ -586,21 +724,10 @@ ClientHandler *accept_connection(Worker *worker, int fd, sockaddr *addr, SSL *ssl = nullptr; auto ssl_ctx = worker->get_sv_ssl_ctx(); if (ssl_ctx) { - ssl = SSL_new(ssl_ctx); + ssl = create_ssl(ssl_ctx); if (!ssl) { - LOG(ERROR) << "SSL_new() failed: " << ERR_error_string(ERR_get_error(), - nullptr); return nullptr; } - - if (SSL_set_fd(ssl, fd) == 0) { - LOG(ERROR) << "SSL_set_fd() failed: " << ERR_error_string(ERR_get_error(), - nullptr); - SSL_free(ssl); - return nullptr; - } - - SSL_set_accept_state(ssl); } return new ClientHandler(worker, fd, ssl, host, service); diff --git a/src/shrpx_ssl.h b/src/shrpx_ssl.h index de2509a8..7fdbbd67 100644 --- a/src/shrpx_ssl.h +++ b/src/shrpx_ssl.h @@ -172,6 +172,8 @@ SSL_CTX *setup_client_ssl_context(); // this function returns nullptr. CertLookupTree *create_cert_lookup_tree(); +SSL *create_ssl(SSL_CTX *ssl_ctx); + } // namespace ssl } // namespace shrpx diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 449d4c48..16e3420a 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -36,6 +36,7 @@ #include "shrpx_http2_session.h" #include "shrpx_log_config.h" #include "shrpx_connect_blocker.h" +#include "shrpx_memcached_dispatcher.h" #include "util.h" #include "template.h" @@ -75,6 +76,12 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.); mcpool_clear_timer_.data = this; + if (get_config()->session_cache_memcached_host) { + session_cache_memcached_dispatcher_ = make_unique( + &get_config()->session_cache_memcached_addr, + get_config()->session_cache_memcached_addrlen, loop); + } + if (get_config()->downstream_proto == PROTO_HTTP2) { auto n = get_config()->http2_downstream_connections_per_worker; size_t group = 0; @@ -253,4 +260,8 @@ DownstreamGroup *Worker::get_dgrp(size_t group) { return &dgrps_[group]; } +MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() { + return session_cache_memcached_dispatcher_.get(); +} + } // namespace shrpx diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 1ad7e3b1..3613b0d5 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -49,6 +49,7 @@ namespace shrpx { class Http2Session; class ConnectBlocker; +class MemcachedDispatcher; namespace ssl { class CertLookupTree; @@ -121,6 +122,8 @@ public: DownstreamGroup *get_dgrp(size_t group); + MemcachedDispatcher *get_session_cache_memcached_dispatcher(); + private: #ifndef NOTHREADS std::future fut_; @@ -133,6 +136,7 @@ private: DownstreamConnectionPool dconn_pool_; WorkerStat worker_stat_; std::vector dgrps_; + std::unique_ptr session_cache_memcached_dispatcher_; struct ev_loop *loop_; // Following fields are shared across threads if diff --git a/src/util.cc b/src/util.cc index 0f70f0da..1ef77bf0 100644 --- a/src/util.cc +++ b/src/util.cc @@ -1130,6 +1130,41 @@ void hexdump(FILE *out, const uint8_t *src, size_t len) { } } +void put_uint16be(uint8_t *buf, uint16_t n) { + uint16_t x = htons(n); + memcpy(buf, &x, sizeof(uint16_t)); +} + +void put_uint32be(uint8_t *buf, uint32_t n) { + uint32_t x = htonl(n); + memcpy(buf, &x, sizeof(uint32_t)); +} + +uint16_t get_uint16(const uint8_t *data) { + uint16_t n; + memcpy(&n, data, sizeof(uint16_t)); + return ntohs(n); +} + +uint32_t get_uint32(const uint8_t *data) { + uint32_t n; + memcpy(&n, data, sizeof(uint32_t)); + return ntohl(n); +} + +uint64_t get_uint64(const uint8_t *data) { + uint64_t n = 0; + n += static_cast(data[0]) << 56; + n += static_cast(data[1]) << 48; + n += static_cast(data[2]) << 40; + n += static_cast(data[3]) << 32; + n += data[4] << 24; + n += data[5] << 16; + n += data[6] << 8; + n += data[7]; + return n; +} + } // namespace util } // namespace nghttp2 diff --git a/src/util.h b/src/util.h index 1f84c37a..3f60e810 100644 --- a/src/util.h +++ b/src/util.h @@ -631,6 +631,26 @@ std::string make_hostport(const char *host, uint16_t port); // Dumps |src| of length |len| in the format similar to `hexdump -C`. void hexdump(FILE *out, const uint8_t *src, size_t len); +// Copies 2 byte unsigned integer |n| in host byte order to |buf| in +// network byte order. +void put_uint16be(uint8_t *buf, uint16_t n); + +// Copies 4 byte unsigned integer |n| in host byte order to |buf| in +// network byte order. +void put_uint32be(uint8_t *buf, uint32_t n); + +// Retrieves 2 byte unsigned integer stored in |data| in network byte +// order and returns it in host byte order. +uint16_t get_uint16(const uint8_t *data); + +// Retrieves 4 byte unsigned integer stored in |data| in network byte +// order and returns it in host byte order. +uint32_t get_uint32(const uint8_t *data); + +// Retrieves 8 byte unsigned integer stored in |data| in network byte +// order and returns it in host byte order. +uint64_t get_uint64(const uint8_t *data); + } // namespace util } // namespace nghttp2 diff --git a/src/util_test.cc b/src/util_test.cc index f55f6520..85c791f6 100644 --- a/src/util_test.cc +++ b/src/util_test.cc @@ -393,4 +393,13 @@ void test_util_localtime_date(void) { tzset(); } +void test_util_get_uint64(void) { + auto v = std::array{ + {0x01, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xab, 0xbc}}; + + auto n = util::get_uint64(v.data()); + + CU_ASSERT(0x01123456789aabbcULL == n); +} + } // namespace shrpx diff --git a/src/util_test.h b/src/util_test.h index 3c294b34..80933bff 100644 --- a/src/util_test.h +++ b/src/util_test.h @@ -55,6 +55,7 @@ void test_util_starts_with(void); void test_util_ends_with(void); void test_util_parse_http_date(void); void test_util_localtime_date(void); +void test_util_get_uint64(void); } // namespace shrpx