Pool Memchunk per worker
This commit is contained in:
parent
b2bb6f1db1
commit
8c3b379b66
|
@ -102,6 +102,11 @@ template <typename T> struct Pool {
|
|||
}
|
||||
freelist = m;
|
||||
}
|
||||
void clear() {
|
||||
freelist = nullptr;
|
||||
pool = nullptr;
|
||||
poolsize = 0;
|
||||
}
|
||||
using value_type = T;
|
||||
std::unique_ptr<T> pool;
|
||||
T *freelist;
|
||||
|
|
|
@ -404,6 +404,10 @@ ClientHandler::~ClientHandler() {
|
|||
auto worker_stat = worker_->get_worker_stat();
|
||||
--worker_stat->num_connections;
|
||||
|
||||
if (worker_stat->num_connections == 0) {
|
||||
worker_->get_mcpool()->clear();
|
||||
}
|
||||
|
||||
ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
|
||||
|
||||
// TODO If backend is http/2, and it is in CONNECTED state, signal
|
||||
|
@ -634,6 +638,8 @@ ClientHandler::get_downstream_connection() {
|
|||
return dconn;
|
||||
}
|
||||
|
||||
MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
|
||||
|
||||
SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
|
||||
|
||||
ConnectBlocker *ClientHandler::get_connect_blocker() const {
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "shrpx_rate_limit.h"
|
||||
#include "shrpx_connection.h"
|
||||
#include "buffer.h"
|
||||
#include "memchunk.h"
|
||||
|
||||
using namespace nghttp2;
|
||||
|
||||
|
@ -92,6 +93,7 @@ public:
|
|||
void pool_downstream_connection(std::unique_ptr<DownstreamConnection> dconn);
|
||||
void remove_downstream_connection(DownstreamConnection *dconn);
|
||||
std::unique_ptr<DownstreamConnection> get_downstream_connection();
|
||||
MemchunkPool *get_mcpool();
|
||||
SSL *get_ssl() const;
|
||||
ConnectBlocker *get_connect_blocker() const;
|
||||
// Call this function when HTTP/2 connection header is received at
|
||||
|
|
|
@ -106,12 +106,12 @@ void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
|
|||
} // namespace
|
||||
|
||||
// upstream could be nullptr for unittests
|
||||
Downstream::Downstream(Upstream *upstream, int32_t stream_id, int32_t priority)
|
||||
Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
|
||||
int32_t stream_id, int32_t priority)
|
||||
: dlnext(nullptr), dlprev(nullptr),
|
||||
request_start_time_(std::chrono::high_resolution_clock::now()),
|
||||
request_buf_(upstream ? upstream->get_mcpool() : nullptr),
|
||||
response_buf_(upstream ? upstream->get_mcpool() : nullptr),
|
||||
request_bodylen_(0), response_bodylen_(0), response_sent_bodylen_(0),
|
||||
request_buf_(mcpool), response_buf_(mcpool), request_bodylen_(0),
|
||||
response_bodylen_(0), response_sent_bodylen_(0),
|
||||
request_content_length_(-1), response_content_length_(-1),
|
||||
upstream_(upstream), blocked_link_(nullptr), request_headers_sum_(0),
|
||||
response_headers_sum_(0), request_datalen_(0), response_datalen_(0),
|
||||
|
|
|
@ -52,7 +52,8 @@ struct BlockedLink;
|
|||
|
||||
class Downstream {
|
||||
public:
|
||||
Downstream(Upstream *upstream, int32_t stream_id, int32_t priority);
|
||||
Downstream(Upstream *upstream, MemchunkPool *mcpool, int32_t stream_id,
|
||||
int32_t priority);
|
||||
~Downstream();
|
||||
void reset_upstream(Upstream *upstream);
|
||||
Upstream *get_upstream() const;
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
namespace shrpx {
|
||||
|
||||
void test_downstream_index_request_headers(void) {
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.add_request_header("1", "0");
|
||||
d.add_request_header("2", "1");
|
||||
d.add_request_header("Charlie", "2");
|
||||
|
@ -56,7 +56,7 @@ void test_downstream_index_request_headers(void) {
|
|||
}
|
||||
|
||||
void test_downstream_index_response_headers(void) {
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.add_response_header("Charlie", "0");
|
||||
d.add_response_header("Alpha", "1");
|
||||
d.add_response_header("Delta", "2");
|
||||
|
@ -69,7 +69,7 @@ void test_downstream_index_response_headers(void) {
|
|||
}
|
||||
|
||||
void test_downstream_get_request_header(void) {
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.add_request_header("alpha", "0");
|
||||
d.add_request_header(":authority", "1");
|
||||
d.add_request_header("content-length", "2");
|
||||
|
@ -86,7 +86,7 @@ void test_downstream_get_request_header(void) {
|
|||
}
|
||||
|
||||
void test_downstream_get_response_header(void) {
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.add_response_header("alpha", "0");
|
||||
d.add_response_header(":status", "1");
|
||||
d.add_response_header("content-length", "2");
|
||||
|
@ -99,7 +99,7 @@ void test_downstream_get_response_header(void) {
|
|||
}
|
||||
|
||||
void test_downstream_crumble_request_cookie(void) {
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.add_request_header(":method", "get");
|
||||
d.add_request_header(":path", "/");
|
||||
auto val = "alpha; bravo; ; ;; charlie;;";
|
||||
|
@ -122,7 +122,7 @@ void test_downstream_crumble_request_cookie(void) {
|
|||
}
|
||||
|
||||
void test_downstream_assemble_request_cookie(void) {
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.add_request_header(":method", "get");
|
||||
d.add_request_header(":path", "/");
|
||||
d.add_request_header("cookie", "alpha");
|
||||
|
@ -135,7 +135,7 @@ void test_downstream_assemble_request_cookie(void) {
|
|||
|
||||
void test_downstream_rewrite_location_response_header(void) {
|
||||
{
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.set_request_downstream_host("localhost:3000");
|
||||
d.add_request_header("host", "localhost");
|
||||
d.add_response_header("location", "http://localhost:3000/");
|
||||
|
@ -146,7 +146,7 @@ void test_downstream_rewrite_location_response_header(void) {
|
|||
CU_ASSERT("https://localhost/" == (*location).value);
|
||||
}
|
||||
{
|
||||
Downstream d(nullptr, 0, 0);
|
||||
Downstream d(nullptr, nullptr, 0, 0);
|
||||
d.set_request_downstream_host("localhost");
|
||||
d.set_request_http2_authority("localhost");
|
||||
d.add_response_header("location", "http://localhost:3000/");
|
||||
|
|
|
@ -256,8 +256,11 @@ int on_begin_headers_callback(nghttp2_session *session,
|
|||
<< frame->hd.stream_id;
|
||||
}
|
||||
|
||||
auto handler = upstream->get_client_handler();
|
||||
|
||||
// TODO Use priority 0 for now
|
||||
auto downstream = make_unique<Downstream>(upstream, frame->hd.stream_id, 0);
|
||||
auto downstream = make_unique<Downstream>(upstream, handler->get_mcpool(),
|
||||
frame->hd.stream_id, 0);
|
||||
nghttp2_session_set_stream_user_data(session, frame->hd.stream_id,
|
||||
downstream.get());
|
||||
|
||||
|
@ -484,6 +487,7 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
|
|||
verbose_on_frame_send_callback(session, frame, user_data);
|
||||
}
|
||||
auto upstream = static_cast<Http2Upstream *>(user_data);
|
||||
auto handler = upstream->get_client_handler();
|
||||
|
||||
switch (frame->hd.type) {
|
||||
case NGHTTP2_SETTINGS:
|
||||
|
@ -492,8 +496,9 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
|
|||
}
|
||||
return 0;
|
||||
case NGHTTP2_PUSH_PROMISE: {
|
||||
auto downstream = make_unique<Downstream>(
|
||||
upstream, frame->push_promise.promised_stream_id, 0);
|
||||
auto downstream =
|
||||
make_unique<Downstream>(upstream, handler->get_mcpool(),
|
||||
frame->push_promise.promised_stream_id, 0);
|
||||
|
||||
downstream->disable_upstream_rtimer();
|
||||
|
||||
|
@ -1498,8 +1503,6 @@ int Http2Upstream::on_downstream_reset(bool no_retry) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
MemchunkPool *Http2Upstream::get_mcpool() { return &mcpool_; }
|
||||
|
||||
int Http2Upstream::prepare_push_promise(Downstream *downstream) {
|
||||
int rv;
|
||||
http_parser_url u;
|
||||
|
|
|
@ -79,8 +79,6 @@ public:
|
|||
virtual void on_handler_delete();
|
||||
virtual int on_downstream_reset(bool no_retry);
|
||||
|
||||
virtual MemchunkPool *get_mcpool();
|
||||
|
||||
bool get_flow_control() const;
|
||||
// Perform HTTP/2 upgrade from |upstream|. On success, this object
|
||||
// takes ownership of the |upstream|. This function returns 0 if it
|
||||
|
@ -103,9 +101,7 @@ public:
|
|||
int on_request_headers(Downstream *downstream, const nghttp2_frame *frame);
|
||||
|
||||
private:
|
||||
// must be put before downstream_queue_
|
||||
std::unique_ptr<HttpsUpstream> pre_upstream_;
|
||||
MemchunkPool mcpool_;
|
||||
DownstreamQueue downstream_queue_;
|
||||
ev_timer settings_timer_;
|
||||
ev_timer shutdown_timer_;
|
||||
|
|
|
@ -64,8 +64,12 @@ int htp_msg_begin(http_parser *htp) {
|
|||
ULOG(INFO, upstream) << "HTTP request started";
|
||||
}
|
||||
upstream->reset_current_header_length();
|
||||
|
||||
auto handler = upstream->get_client_handler();
|
||||
|
||||
// TODO specify 0 as priority for now
|
||||
upstream->attach_downstream(make_unique<Downstream>(upstream, 0, 0));
|
||||
upstream->attach_downstream(
|
||||
make_unique<Downstream>(upstream, handler->get_mcpool(), 0, 0));
|
||||
return 0;
|
||||
}
|
||||
} // namespace
|
||||
|
@ -636,7 +640,8 @@ void HttpsUpstream::error_reply(unsigned int status_code) {
|
|||
auto downstream = get_downstream();
|
||||
|
||||
if (!downstream) {
|
||||
attach_downstream(make_unique<Downstream>(this, 1, 1));
|
||||
attach_downstream(
|
||||
make_unique<Downstream>(this, handler_->get_mcpool(), 1, 1));
|
||||
downstream = get_downstream();
|
||||
}
|
||||
|
||||
|
@ -926,6 +931,4 @@ fail:
|
|||
return 0;
|
||||
}
|
||||
|
||||
MemchunkPool *HttpsUpstream::get_mcpool() { return &mcpool_; }
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -76,8 +76,6 @@ public:
|
|||
virtual void on_handler_delete();
|
||||
virtual int on_downstream_reset(bool no_retry);
|
||||
|
||||
virtual MemchunkPool *get_mcpool();
|
||||
|
||||
void reset_current_header_length();
|
||||
void log_response_headers(const std::string &hdrs) const;
|
||||
|
||||
|
@ -85,8 +83,6 @@ private:
|
|||
ClientHandler *handler_;
|
||||
http_parser htp_;
|
||||
size_t current_header_length_;
|
||||
// must be put before downstream_
|
||||
MemchunkPool mcpool_;
|
||||
std::unique_ptr<Downstream> downstream_;
|
||||
IOControl ioctrl_;
|
||||
};
|
||||
|
|
|
@ -797,7 +797,8 @@ int SpdyUpstream::error_reply(Downstream *downstream,
|
|||
|
||||
Downstream *SpdyUpstream::add_pending_downstream(int32_t stream_id,
|
||||
int32_t priority) {
|
||||
auto downstream = make_unique<Downstream>(this, stream_id, priority);
|
||||
auto downstream = make_unique<Downstream>(this, handler_->get_mcpool(),
|
||||
stream_id, priority);
|
||||
spdylay_session_set_stream_user_data(session_, stream_id, downstream.get());
|
||||
auto res = downstream.get();
|
||||
|
||||
|
@ -1080,6 +1081,4 @@ int SpdyUpstream::on_downstream_reset(bool no_retry) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
MemchunkPool *SpdyUpstream::get_mcpool() { return &mcpool_; }
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -74,8 +74,6 @@ public:
|
|||
virtual void on_handler_delete();
|
||||
virtual int on_downstream_reset(bool no_retry);
|
||||
|
||||
virtual MemchunkPool *get_mcpool();
|
||||
|
||||
bool get_flow_control() const;
|
||||
|
||||
int consume(int32_t stream_id, size_t len);
|
||||
|
@ -84,8 +82,6 @@ public:
|
|||
void initiate_downstream(Downstream *downstream);
|
||||
|
||||
private:
|
||||
// must be put before downstream_queue_
|
||||
MemchunkPool mcpool_;
|
||||
DownstreamQueue downstream_queue_;
|
||||
ClientHandler *handler_;
|
||||
spdylay_session *session_;
|
||||
|
|
|
@ -27,9 +27,6 @@
|
|||
|
||||
#include "shrpx.h"
|
||||
#include "shrpx_io_control.h"
|
||||
#include "memchunk.h"
|
||||
|
||||
using namespace nghttp2;
|
||||
|
||||
namespace shrpx {
|
||||
|
||||
|
@ -65,8 +62,6 @@ public:
|
|||
virtual void pause_read(IOCtrlReason reason) = 0;
|
||||
virtual int resume_read(IOCtrlReason reason, Downstream *downstream,
|
||||
size_t consumed) = 0;
|
||||
|
||||
virtual MemchunkPool *get_mcpool() = 0;
|
||||
};
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -37,8 +37,6 @@
|
|||
#include "util.h"
|
||||
#include "template.h"
|
||||
|
||||
using namespace nghttp2;
|
||||
|
||||
namespace shrpx {
|
||||
|
||||
namespace {
|
||||
|
@ -218,4 +216,6 @@ void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }
|
|||
|
||||
bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
|
||||
|
||||
MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
|
||||
|
||||
} // namespace shrpx
|
||||
|
|
|
@ -42,6 +42,9 @@
|
|||
|
||||
#include "shrpx_config.h"
|
||||
#include "shrpx_downstream_connection_pool.h"
|
||||
#include "memchunk.h"
|
||||
|
||||
using namespace nghttp2;
|
||||
|
||||
namespace shrpx {
|
||||
|
||||
|
@ -104,6 +107,8 @@ public:
|
|||
void set_graceful_shutdown(bool f);
|
||||
bool get_graceful_shutdown() const;
|
||||
|
||||
MemchunkPool *get_mcpool();
|
||||
|
||||
private:
|
||||
std::vector<std::unique_ptr<Http2Session>> http2sessions_;
|
||||
size_t next_http2session_;
|
||||
|
@ -113,6 +118,7 @@ private:
|
|||
std::mutex m_;
|
||||
std::deque<WorkerEvent> q_;
|
||||
ev_async w_;
|
||||
MemchunkPool mcpool_;
|
||||
DownstreamConnectionPool dconn_pool_;
|
||||
WorkerStat worker_stat_;
|
||||
struct ev_loop *loop_;
|
||||
|
|
Loading…
Reference in New Issue