diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 78c775d8..e3822289 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -109,6 +109,7 @@ if(ENABLE_APP) shrpx_worker_process.cc shrpx_signal.cc shrpx_router.cc + shrpx_api_downstream_connection.cc ) if(HAVE_SPDYLAY) list(APPEND NGHTTPX_SRCS diff --git a/src/Makefile.am b/src/Makefile.am index 01701838..1d671c48 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -132,6 +132,7 @@ NGHTTPX_SRCS = \ shrpx_process.h \ shrpx_signal.cc shrpx_signal.h \ shrpx_router.cc shrpx_router.h \ + shrpx_api_downstream_connection.cc shrpx_api_downstream_connection.h \ buffer.h memchunk.h template.h allocator.h if HAVE_SPDYLAY diff --git a/src/shrpx.cc b/src/shrpx.cc index aa0d34a8..26fd846c 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -1157,6 +1157,11 @@ void fill_default_config() { nghttp2_option_new(&upstreamconf.option); nghttp2_option_set_no_auto_window_update(upstreamconf.option, 1); nghttp2_option_set_no_recv_client_magic(upstreamconf.option, 1); + + // For API endpoint, we enable automatic window update. This is + // because we are a sink. + nghttp2_option_new(&upstreamconf.api_option); + nghttp2_option_set_no_recv_client_magic(upstreamconf.api_option, 1); } { diff --git a/src/shrpx_api_downstream_connection.cc b/src/shrpx_api_downstream_connection.cc new file mode 100644 index 00000000..eb10ef8f --- /dev/null +++ b/src/shrpx_api_downstream_connection.cc @@ -0,0 +1,107 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2016 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_api_downstream_connection.h" + +#include "shrpx_client_handler.h" +#include "shrpx_upstream.h" +#include "shrpx_downstream.h" +#include "shrpx_worker.h" + +namespace shrpx { + +APIDownstreamConnection::APIDownstreamConnection(Worker *worker) + : worker_(worker) {} + +APIDownstreamConnection::~APIDownstreamConnection() {} + +int APIDownstreamConnection::attach_downstream(Downstream *downstream) { + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream; + } + + auto &req = downstream->request(); + + if (req.path != StringRef::from_lit("/api/v1/dynamicconfig")) { + // TODO this will return 503 error, which is not nice. We'd like + // to use 404 in this case. + return -1; + } + + downstream_ = downstream; + + return 0; +} + +void APIDownstreamConnection::detach_downstream(Downstream *downstream) { + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream; + } + downstream_ = nullptr; +} + +int APIDownstreamConnection::push_request_headers() { return 0; } + +int APIDownstreamConnection::push_upload_data_chunk(const uint8_t *data, + size_t datalen) { + auto output = downstream_->get_request_buf(); + + // TODO limit the maximum payload size + output->append(data, datalen); + + // We don't have to call Upstream::resume_read() here, because + // request buffer is effectively unlimited. Actually, we cannot + // call it here since it could recursively call this function again. + + return 0; +} + +int APIDownstreamConnection::end_upload_data() { + // TODO process request payload here + (void)worker_; + return 0; +} + +void APIDownstreamConnection::pause_read(IOCtrlReason reason) {} + +int APIDownstreamConnection::resume_read(IOCtrlReason reason, size_t consumed) { + return 0; +} + +void APIDownstreamConnection::force_resume_read() {} + +int APIDownstreamConnection::on_read() { return 0; } + +int APIDownstreamConnection::on_write() { return 0; } + +void APIDownstreamConnection::on_upstream_change(Upstream *uptream) {} + +bool APIDownstreamConnection::poolable() const { return false; } + +DownstreamAddrGroup * +APIDownstreamConnection::get_downstream_addr_group() const { + return nullptr; +} + +} // namespace shrpx diff --git a/src/shrpx_api_downstream_connection.h b/src/shrpx_api_downstream_connection.h new file mode 100644 index 00000000..1bd6c1b6 --- /dev/null +++ b/src/shrpx_api_downstream_connection.h @@ -0,0 +1,65 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2016 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_API_DOWNSTREAM_CONNECTION_H +#define SHRPX_API_DOWNSTREAM_CONNECTION_H + +#include "shrpx_downstream_connection.h" + +namespace shrpx { + +class Worker; + +class APIDownstreamConnection : public DownstreamConnection { +public: + APIDownstreamConnection(Worker *worker); + virtual ~APIDownstreamConnection(); + virtual int attach_downstream(Downstream *downstream); + virtual void detach_downstream(Downstream *downstream); + + virtual int push_request_headers(); + virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen); + virtual int end_upload_data(); + + virtual void pause_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason, size_t consumed); + virtual void force_resume_read(); + + virtual int on_read(); + virtual int on_write(); + + virtual void on_upstream_change(Upstream *uptream); + + // true if this object is poolable. + virtual bool poolable() const; + + virtual DownstreamAddrGroup *get_downstream_addr_group() const; + +private: + Worker *worker_; +}; + +} // namespace shrpx + +#endif // SHRPX_API_DOWNSTREAM_CONNECTION_H diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index a2083b38..2c27e700 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -48,6 +48,7 @@ #include "shrpx_downstream.h" #include "shrpx_http2_session.h" #include "shrpx_connect_blocker.h" +#include "shrpx_api_downstream_connection.h" #ifdef HAVE_SPDYLAY #include "shrpx_spdy_upstream.h" #endif // HAVE_SPDYLAY @@ -820,6 +821,10 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { const auto &req = downstream->request(); + if (faddr_->api) { + return make_unique(worker_); + } + // Fast path. If we have one group, it must be catch-all group. // proxy mode falls in this case. if (groups.size() == 1) { diff --git a/src/shrpx_config.h b/src/shrpx_config.h index dfd5997a..8376516e 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -540,6 +540,7 @@ struct Http2Config { ev_tstamp settings; } timeout; nghttp2_option *option; + nghttp2_option *api_option; nghttp2_session_callbacks *callbacks; size_t window_bits; size_t connection_window_bits; diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index b8d16066..3e1d7ba0 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -500,6 +500,14 @@ bool Downstream::get_chunked_request() const { return chunked_request_; } void Downstream::set_chunked_request(bool f) { chunked_request_ = f; } bool Downstream::request_buf_full() { + auto handler = upstream_->get_client_handler(); + auto faddr = handler->get_upstream_addr(); + + // We don't check buffer size here for API endpoint. + if (faddr->api) { + return false; + } + if (dconn_) { return request_buf_.rleft() >= get_config()->conn.downstream.request_buffer_size; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 6c8cc6f8..b05296e7 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -861,8 +861,11 @@ Http2Upstream::Http2Upstream(ClientHandler *handler) auto &http2conf = get_config()->http2; - rv = nghttp2_session_server_new2(&session_, http2conf.upstream.callbacks, - this, http2conf.upstream.option); + auto faddr = handler_->get_upstream_addr(); + + rv = nghttp2_session_server_new2( + &session_, http2conf.upstream.callbacks, this, + faddr->api ? http2conf.upstream.api_option : http2conf.upstream.option); assert(rv == 0); @@ -874,7 +877,11 @@ Http2Upstream::Http2Upstream(ClientHandler *handler) entry[0].value = http2conf.upstream.max_concurrent_streams; entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - entry[1].value = (1 << http2conf.upstream.window_bits) - 1; + if (faddr->api) { + entry[1].value = (1u << 31) - 1; + } else { + entry[1].value = (1 << http2conf.upstream.window_bits) - 1; + } rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(), entry.size()); @@ -883,8 +890,11 @@ Http2Upstream::Http2Upstream(ClientHandler *handler) << nghttp2_strerror(rv); } - if (http2conf.upstream.connection_window_bits != 16) { - int32_t window_size = (1 << http2conf.upstream.connection_window_bits) - 1; + int32_t window_bits = + faddr->api ? 31 : http2conf.upstream.connection_window_bits; + + if (window_bits != 16) { + int32_t window_size = (1u << window_bits) - 1; rv = nghttp2_session_set_local_window_size(session_, NGHTTP2_FLAG_NONE, 0, window_size); @@ -1675,6 +1685,12 @@ int Http2Upstream::on_downstream_abort_request(Downstream *downstream, int Http2Upstream::consume(int32_t stream_id, size_t len) { int rv; + auto faddr = handler_->get_upstream_addr(); + + if (faddr->api) { + return 0; + } + rv = nghttp2_session_consume(session_, stream_id, len); if (rv != 0) { diff --git a/src/template.h b/src/template.h index 55491dc2..f7e8cd86 100644 --- a/src/template.h +++ b/src/template.h @@ -502,6 +502,10 @@ inline bool operator==(const char *lhs, const StringRef &rhs) { return rhs == lhs; } +inline bool operator!=(const StringRef &lhs, const StringRef &rhs) { + return !(lhs == rhs); +} + inline bool operator!=(const StringRef &lhs, const std::string &rhs) { return !(lhs == rhs); }