Use libev for nghttpd

Benchmark shows 10% faster with libev compared to libevent.  Also
response time in high load condition is much faster.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-12-24 00:05:45 +09:00
parent c3215af5f6
commit cd7258a7cd
8 changed files with 809 additions and 329 deletions

View File

@ -275,6 +275,21 @@ fi
AM_CONDITIONAL([HAVE_CUNIT], [ test "x${have_cunit}" = "xyes" ])
# libev (for src)
# libev does not have pkg-config file. Check it in an old way.
LIBS_OLD=$LIBS
AC_CHECK_LIB([ev], [ev_time], [have_libev=yes], [have_libev=no])
if test "x${have_libev}" = "xyes"; then
AC_CHECK_HEADER([ev.h], [have_libev=yes], [have_libev=no])
if test "x${have_libev}" = "xyes"; then
LIBEV_LIBS=-lev
LIBEV_CFLAGS=
AC_SUBST([LIBEV_LIBS])
AC_SUBST([LIBEV_CFLAGS])
fi
fi
LIBS=$LIBS_OLD
# openssl (for src)
PKG_CHECK_MODULES([OPENSSL], [openssl >= 1.0.1],
[have_openssl=yes], [have_openssl=no])
@ -375,11 +390,12 @@ if test "x${request_asio_lib}" = "xyes"; then
fi
# The nghttp, nghttpd and nghttpx under src depend on zlib, OpenSSL
# and libevent_openssl
# libev and libevent_openssl
enable_app=no
if test "x${request_app}" != "xno" &&
test "x${have_zlib}" = "xyes" &&
test "x${have_openssl}" = "xyes" &&
test "x${have_libev}" = "xyes" &&
test "x${have_libevent_openssl}" = "xyes"; then
enable_app=yes
fi
@ -649,6 +665,7 @@ AC_MSG_NOTICE([summary of build options:
Libs:
OpenSSL: ${have_openssl}
Libxml2: ${have_libxml2}
Libev: ${have_libev}
Libevent(SSL): ${have_libevent_openssl}
Spdylay: ${have_spdylay}
Jansson: ${have_jansson}

View File

@ -36,29 +36,16 @@
#include <set>
#include <iostream>
#include <thread>
#include <mutex>
#include <deque>
#include <openssl/err.h>
#include <zlib.h>
#include <event.h>
#include <event2/listener.h>
#include <event2/bufferevent_ssl.h>
#ifdef __cplusplus
extern "C" {
#endif
#include "nghttp2_helper.h"
#ifdef __cplusplus
}
#endif
#include "app_helper.h"
#include "http2.h"
#include "util.h"
#include "libevent_util.h"
#include "ssl.h"
#ifndef O_BINARY
@ -77,6 +64,19 @@ const std::string DEFAULT_HTML = "index.html";
const std::string NGHTTPD_SERVER = "nghttpd nghttp2/" NGHTTP2_VERSION;
} // namespace
namespace {
int make_socket_nonblocking(int fd) {
int flags;
int rv;
while ((flags = fcntl(fd, F_GETFL, 0)) == -1 && errno == EINTR)
;
while ((rv = fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1 && errno == EINTR)
;
return rv;
}
} // namespace
namespace {
void delete_handler(Http2Handler *handler) {
handler->remove_self();
@ -98,7 +98,7 @@ void append_nv(Stream *stream, const std::vector<nghttp2_nv> &nva) {
} // namespace
Config::Config()
: stream_read_timeout{60, 0}, stream_write_timeout{60, 0},
: stream_read_timeout(60.), stream_write_timeout(60.),
session_option(nullptr), data_ptr(nullptr), padding(0), num_worker(1),
header_table_size(-1), port(0), verbose(false), daemon(false),
verify_client(false), no_tls(false), error_gzip(false),
@ -109,30 +109,10 @@ Config::Config()
Config::~Config() { nghttp2_option_del(session_option); }
Stream::Stream(Http2Handler *handler, int32_t stream_id)
: handler(handler), rtimer(nullptr), wtimer(nullptr), body_left(0),
stream_id(stream_id), file(-1) {
headers.reserve(10);
}
Stream::~Stream() {
if (file != -1) {
close(file);
}
if (wtimer) {
event_free(wtimer);
}
if (rtimer) {
event_free(rtimer);
}
}
namespace {
void stream_timeout_cb(evutil_socket_t fd, short what, void *arg) {
void stream_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
int rv;
auto stream = static_cast<Stream *>(arg);
auto stream = static_cast<Stream *>(w->data);
auto hd = stream->handler;
auto config = hd->get_config();
@ -154,19 +134,15 @@ void stream_timeout_cb(evutil_socket_t fd, short what, void *arg) {
namespace {
void add_stream_read_timeout(Stream *stream) {
auto hd = stream->handler;
auto config = hd->get_config();
evtimer_add(stream->rtimer, &config->stream_read_timeout);
ev_timer_again(hd->get_loop(), &stream->rtimer);
}
} // namespace
namespace {
void add_stream_read_timeout_if_pending(Stream *stream) {
auto hd = stream->handler;
auto config = hd->get_config();
if (evtimer_pending(stream->rtimer, nullptr)) {
evtimer_add(stream->rtimer, &config->stream_read_timeout);
if (ev_is_active(&stream->rtimer)) {
ev_timer_again(hd->get_loop(), &stream->rtimer);
}
}
} // namespace
@ -174,25 +150,21 @@ void add_stream_read_timeout_if_pending(Stream *stream) {
namespace {
void add_stream_write_timeout(Stream *stream) {
auto hd = stream->handler;
auto config = hd->get_config();
evtimer_add(stream->wtimer, &config->stream_write_timeout);
ev_timer_again(hd->get_loop(), &stream->wtimer);
}
} // namespace
namespace {
void remove_stream_read_timeout(Stream *stream) {
if (stream->rtimer) {
evtimer_del(stream->rtimer);
}
auto hd = stream->handler;
ev_timer_stop(hd->get_loop(), &stream->rtimer);
}
} // namespace
namespace {
void remove_stream_write_timeout(Stream *stream) {
if (stream->wtimer) {
evtimer_del(stream->wtimer);
}
auto hd = stream->handler;
ev_timer_stop(hd->get_loop(), &stream->wtimer);
}
} // namespace
@ -201,7 +173,7 @@ std::shared_ptr<std::string> cached_date;
} // namespace
namespace {
void refresh_cb(evutil_socket_t sig, short events, void *arg) {
void refresh_cb(struct ev_loop *loop, ev_timer *w, int revents) {
cached_date = std::make_shared<std::string>(util::http_date(time(nullptr)));
}
} // namespace
@ -212,9 +184,9 @@ void fill_callback(nghttp2_session_callbacks *callbacks, const Config *config);
class Sessions {
public:
Sessions(event_base *evbase, const Config *config, SSL_CTX *ssl_ctx)
: evbase_(evbase), config_(config), ssl_ctx_(ssl_ctx),
callbacks_(nullptr), next_session_id_(1) {
Sessions(struct ev_loop *loop, const Config *config, SSL_CTX *ssl_ctx)
: loop_(loop), config_(config), ssl_ctx_(ssl_ctx), callbacks_(nullptr),
next_session_id_(1) {
nghttp2_session_callbacks_new(&callbacks_);
fill_callback(callbacks_, config_);
@ -242,7 +214,9 @@ public:
return ssl;
}
const Config *get_config() const { return config_; }
event_base *get_evbase() const { return evbase_; }
struct ev_loop *get_loop() const {
return loop_;
}
int64_t get_next_session_id() {
auto session_id = next_session_id_;
if (next_session_id_ == std::numeric_limits<int64_t>::max()) {
@ -278,13 +252,34 @@ public:
private:
std::set<Http2Handler *> handlers_;
event_base *evbase_;
struct ev_loop *loop_;
const Config *config_;
SSL_CTX *ssl_ctx_;
nghttp2_session_callbacks *callbacks_;
int64_t next_session_id_;
};
Stream::Stream(Http2Handler *handler, int32_t stream_id)
: handler(handler), body_left(0), stream_id(stream_id), file(-1) {
auto config = handler->get_config();
ev_timer_init(&rtimer, stream_timeout_cb, 0., config->stream_read_timeout);
ev_timer_init(&wtimer, stream_timeout_cb, 0., config->stream_write_timeout);
rtimer.data = this;
wtimer.data = this;
headers.reserve(10);
}
Stream::~Stream() {
if (file != -1) {
close(file);
}
auto loop = handler->get_loop();
ev_timer_stop(loop, &rtimer);
ev_timer_stop(loop, &wtimer);
}
namespace {
void on_session_closed(Http2Handler *hd, int64_t session_id) {
if (hd->get_config()->verbose) {
@ -295,38 +290,18 @@ void on_session_closed(Http2Handler *hd, int64_t session_id) {
}
} // namespace
Http2Handler::Http2Handler(Sessions *sessions, int fd, SSL *ssl,
int64_t session_id)
: session_id_(session_id), session_(nullptr), sessions_(sessions),
ssl_(ssl), bev_(nullptr), settings_timerev_(nullptr), fd_(fd) {}
Http2Handler::~Http2Handler() {
on_session_closed(this, session_id_);
if (settings_timerev_) {
event_free(settings_timerev_);
}
nghttp2_session_del(session_);
if (ssl_) {
SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN);
SSL_shutdown(ssl_);
}
if (bev_) {
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
}
if (ssl_) {
SSL_free(ssl_);
}
shutdown(fd_, SHUT_WR);
close(fd_);
namespace {
void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto hd = static_cast<Http2Handler *>(w->data);
hd->terminate_session(NGHTTP2_SETTINGS_TIMEOUT);
hd->on_write();
}
void Http2Handler::remove_self() { sessions_->remove_handler(this); }
} // namespace
namespace {
void readcb(bufferevent *bev, void *arg) {
void readcb(struct ev_loop *loop, ev_io *w, int revents) {
int rv;
auto handler = static_cast<Http2Handler *>(arg);
auto handler = static_cast<Http2Handler *>(w->data);
rv = handler->on_read();
if (rv == -1) {
@ -336,9 +311,9 @@ void readcb(bufferevent *bev, void *arg) {
} // namespace
namespace {
void writecb(bufferevent *bev, void *arg) {
void writecb(struct ev_loop *loop, ev_io *w, int revents) {
int rv;
auto handler = static_cast<Http2Handler *>(arg);
auto handler = static_cast<Http2Handler *>(w->data);
rv = handler->on_write();
if (rv == -1) {
@ -347,63 +322,67 @@ void writecb(bufferevent *bev, void *arg) {
}
} // namespace
namespace {
void eventcb(bufferevent *bev, short events, void *arg) {
auto handler = static_cast<Http2Handler *>(arg);
Http2Handler::Http2Handler(Sessions *sessions, int fd, SSL *ssl,
int64_t session_id)
: session_id_(session_id), session_(nullptr), sessions_(sessions),
ssl_(ssl), data_pending_(nullptr), data_pendinglen_(0), fd_(fd) {
ev_timer_init(&settings_timerev_, settings_timeout_cb, 10., 0.);
ev_io_init(&wev_, writecb, fd, EV_WRITE);
ev_io_init(&rev_, readcb, fd, EV_READ);
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {
delete_handler(handler);
settings_timerev_.data = this;
wev_.data = this;
rev_.data = this;
return;
}
auto loop = sessions_->get_loop();
ev_io_start(loop, &rev_);
if (events & BEV_EVENT_CONNECTED) {
if (handler->get_sessions()->get_config()->verbose) {
std::cerr << "SSL/TLS handshake completed" << std::endl;
}
if (handler->verify_npn_result() != 0) {
delete_handler(handler);
return;
}
if (handler->on_connect() != 0) {
delete_handler(handler);
return;
}
}
}
} // namespace
int Http2Handler::setup_bev() {
auto evbase = sessions_->get_evbase();
if (ssl_) {
bev_ = bufferevent_openssl_socket_new(
evbase, fd_, ssl_, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS);
if (ssl) {
SSL_set_accept_state(ssl);
read_ = &Http2Handler::tls_handshake;
write_ = &Http2Handler::tls_handshake;
} else {
bev_ = bufferevent_socket_new(evbase, fd_, BEV_OPT_DEFER_CALLBACKS);
read_ = &Http2Handler::read_clear;
write_ = &Http2Handler::write_clear;
}
bufferevent_enable(bev_, EV_READ);
bufferevent_setcb(bev_, readcb, writecb, eventcb, this);
return 0;
}
int Http2Handler::send() {
int rv;
uint8_t buf[16384];
auto output = bufferevent_get_output(bev_);
util::EvbufferBuffer evbbuf(output, buf, sizeof(buf));
for (;;) {
// Check buffer length and break if it is large enough.
if (evbuffer_get_length(output) > 0) {
break;
}
Http2Handler::~Http2Handler() {
on_session_closed(this, session_id_);
nghttp2_session_del(session_);
if (ssl_) {
SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN);
SSL_shutdown(ssl_);
}
auto loop = sessions_->get_loop();
ev_timer_stop(loop, &settings_timerev_);
ev_io_stop(loop, &rev_);
ev_io_stop(loop, &wev_);
if (ssl_) {
SSL_free(ssl_);
}
shutdown(fd_, SHUT_WR);
close(fd_);
}
void Http2Handler::remove_self() { sessions_->remove_handler(this); }
struct ev_loop *Http2Handler::get_loop() const {
return sessions_->get_loop();
}
int Http2Handler::setup_bev() { return 0; }
int Http2Handler::fill_rb() {
if (data_pending_) {
assert(rb_.wleft() >= data_pendinglen_);
rb_.write(data_pending_, data_pendinglen_);
data_pending_ = nullptr;
data_pendinglen_ = 0;
}
for (;;) {
const uint8_t *data;
auto datalen = nghttp2_session_mem_send(session_, &data);
@ -415,62 +394,220 @@ int Http2Handler::send() {
if (datalen == 0) {
break;
}
rv = evbbuf.add(data, datalen);
if (rv != 0) {
std::cerr << "evbuffer_add() failed" << std::endl;
auto n = rb_.write(data, datalen);
if (n < static_cast<decltype(n)>(datalen)) {
data_pending_ = data + n;
data_pendinglen_ = datalen - n;
break;
}
}
return 0;
}
int Http2Handler::read_clear() {
int rv;
uint8_t buf[8192];
for (;;) {
ssize_t nread;
while ((nread = read(fd_, buf, sizeof(buf))) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
return -1;
}
if (nread == 0) {
return -1;
}
rv = nghttp2_session_mem_recv(session_, buf, nread);
if (rv < 0) {
std::cerr << "nghttp2_session_mem_recv() returned error: "
<< nghttp2_strerror(rv) << std::endl;
return -1;
}
}
rv = evbbuf.flush();
if (rv != 0) {
std::cerr << "evbuffer_add() failed" << std::endl;
return -1;
return write_(*this);
}
int Http2Handler::write_clear() {
auto loop = sessions_->get_loop();
for (;;) {
if (rb_.rleft() > 0) {
struct iovec iov[2];
auto iovcnt = rb_.riovec(iov);
ssize_t nwrite;
while ((nwrite = writev(fd_, iov, iovcnt)) == -1 && errno == EINTR)
;
if (nwrite == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
ev_io_start(loop, &wev_);
return 0;
}
return -1;
}
rb_.drain(nwrite);
continue;
}
if (fill_rb() != 0) {
return -1;
}
if (rb_.rleft() == 0) {
break;
}
}
if (rb_.rleft() == 0) {
ev_io_stop(loop, &wev_);
} else {
ev_io_start(loop, &wev_);
}
if (nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 &&
evbuffer_get_length(output) == 0) {
nghttp2_session_want_write(session_) == 0 && rb_.rleft() == 0) {
return -1;
}
return 0;
}
int Http2Handler::on_read() {
int rv;
int Http2Handler::tls_handshake() {
ev_io_stop(sessions_->get_loop(), &wev_);
auto input = bufferevent_get_input(bev_);
auto len = evbuffer_get_length(input);
if (len == 0) {
return 0;
}
auto data = evbuffer_pullup(input, -1);
auto rv = SSL_do_handshake(ssl_);
rv = nghttp2_session_mem_recv(session_, data, len);
if (rv < 0) {
std::cerr << "nghttp2_session_mem_recv() returned error: "
<< nghttp2_strerror(rv) << std::endl;
if (rv == 0) {
return -1;
}
if (evbuffer_drain(input, len) == -1) {
std::cerr << "evbuffer_drain() failed" << std::endl;
if (rv < 0) {
auto err = SSL_get_error(ssl_, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
return 0;
case SSL_ERROR_WANT_WRITE:
ev_io_start(sessions_->get_loop(), &wev_);
return 0;
default:
return -1;
}
}
return send();
if (sessions_->get_config()->verbose) {
std::cerr << "SSL/TLS handshake completed" << std::endl;
}
if (verify_npn_result() != 0) {
return -1;
}
read_ = &Http2Handler::read_tls;
write_ = &Http2Handler::write_tls;
if (on_connect() != 0) {
return -1;
}
return 0;
}
int Http2Handler::on_write() { return send(); }
int Http2Handler::read_tls() {
uint8_t buf[8192];
namespace {
void settings_timeout_cb(evutil_socket_t fd, short what, void *arg) {
auto hd = static_cast<Http2Handler *>(arg);
hd->terminate_session(NGHTTP2_SETTINGS_TIMEOUT);
hd->on_write();
for (;;) {
auto rv = SSL_read(ssl_, buf, sizeof(buf));
if (rv == 0) {
return -1;
}
if (rv < 0) {
auto err = SSL_get_error(ssl_, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
goto fin;
case SSL_ERROR_WANT_WRITE:
ev_io_start(sessions_->get_loop(), &wev_);
goto fin;
default:
return -1;
}
}
auto nread = rv;
rv = nghttp2_session_mem_recv(session_, buf, nread);
if (rv < 0) {
std::cerr << "nghttp2_session_mem_recv() returned error: "
<< nghttp2_strerror(rv) << std::endl;
return -1;
}
}
ev_io_stop(sessions_->get_loop(), &wev_);
fin:
return write_(*this);
}
} // namespace
int Http2Handler::write_tls() {
auto loop = sessions_->get_loop();
for (;;) {
if (rb_.rleft() > 0) {
const void *p;
size_t len;
std::tie(p, len) = rb_.get();
auto rv = SSL_write(ssl_, p, len);
if (rv == 0) {
return -1;
}
if (rv < 0) {
auto err = SSL_get_error(ssl_, rv);
switch (err) {
case SSL_ERROR_WANT_READ:
ev_io_stop(loop, &wev_);
return 0;
case SSL_ERROR_WANT_WRITE:
ev_io_start(sessions_->get_loop(), &wev_);
return 0;
default:
return -1;
}
}
rb_.drain(rv);
continue;
}
if (fill_rb() != 0) {
return -1;
}
if (rb_.rleft() == 0) {
break;
}
}
if (rb_.rleft() == 0) {
ev_io_stop(loop, &wev_);
} else {
ev_io_start(loop, &wev_);
}
if (nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 && rb_.rleft() == 0) {
return -1;
}
return 0;
}
int Http2Handler::on_read() { return read_(*this); }
int Http2Handler::on_write() { return write_(*this); }
int Http2Handler::on_connect() {
int r;
@ -495,12 +632,8 @@ int Http2Handler::on_connect() {
if (r != 0) {
return r;
}
assert(settings_timerev_ == nullptr);
settings_timerev_ =
evtimer_new(sessions_->get_evbase(), settings_timeout_cb, this);
// SETTINGS ACK timeout is 10 seconds for now
timeval settings_timeout = {10, 0};
evtimer_add(settings_timerev_, &settings_timeout);
ev_timer_start(sessions_->get_loop(), &settings_timerev_);
return on_write();
}
@ -661,11 +794,7 @@ const Config *Http2Handler::get_config() const {
}
void Http2Handler::remove_settings_timer() {
if (settings_timerev_) {
evtimer_del(settings_timerev_);
event_free(settings_timerev_);
settings_timerev_ = nullptr;
}
ev_timer_stop(sessions_->get_loop(), &settings_timerev_);
}
void Http2Handler::terminate_session(uint32_t error_code) {
@ -919,25 +1048,6 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame,
}
} // namespace
namespace {
int setup_stream_timeout(Stream *stream) {
auto hd = stream->handler;
auto evbase = hd->get_sessions()->get_evbase();
stream->rtimer = evtimer_new(evbase, stream_timeout_cb, stream);
if (!stream->rtimer) {
return -1;
}
stream->wtimer = evtimer_new(evbase, stream_timeout_cb, stream);
if (!stream->wtimer) {
return -1;
}
return 0;
}
} // namespace
namespace {
int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data) {
@ -949,10 +1059,6 @@ int on_begin_headers_callback(nghttp2_session *session,
}
auto stream = util::make_unique<Stream>(hd, frame->hd.stream_id);
if (setup_stream_timeout(stream.get()) != 0) {
hd->submit_rst_stream(stream.get(), NGHTTP2_INTERNAL_ERROR);
return 0;
}
add_stream_read_timeout(stream.get());
@ -1099,13 +1205,6 @@ int hd_on_frame_send_callback(nghttp2_session *session,
return 0;
}
if (setup_stream_timeout(promised_stream) != 0) {
nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE, promised_stream_id,
NGHTTP2_INTERNAL_ERROR);
return 0;
}
add_stream_read_timeout_if_pending(stream);
add_stream_write_timeout(stream);
@ -1195,38 +1294,37 @@ struct ClientInfo {
int fd;
};
struct Worker {
std::unique_ptr<Sessions> sessions;
ev_async w;
// protectes q
std::mutex m;
std::deque<ClientInfo> q;
};
namespace {
void worker_readcb(bufferevent *bev, void *arg) {
auto sessions = static_cast<Sessions *>(arg);
auto input = bufferevent_get_input(bev);
while (evbuffer_get_length(input) >= sizeof(ClientInfo)) {
ClientInfo client;
if (evbuffer_remove(input, &client, sizeof(client)) == -1) {
std::cerr << "evbuffer_remove() failed" << std::endl;
}
sessions->accept_connection(client.fd);
void worker_acceptcb(struct ev_loop *loop, ev_async *w, int revents) {
auto worker = static_cast<Worker *>(w->data);
auto &sessions = worker->sessions;
std::lock_guard<std::mutex> lock(worker->m);
for (auto c : worker->q) {
sessions->accept_connection(c.fd);
}
worker->q.clear();
}
} // namespace
namespace {
void run_worker(int thread_id, int fd, SSL_CTX *ssl_ctx, const Config *config) {
auto evbase = event_base_new();
auto bev = bufferevent_socket_new(evbase, fd, BEV_OPT_DEFER_CALLBACKS |
BEV_OPT_CLOSE_ON_FREE);
auto sessions = Sessions(evbase, config, ssl_ctx);
bufferevent_enable(bev, EV_READ);
bufferevent_setcb(bev, worker_readcb, nullptr, nullptr, &sessions);
event_base_loop(evbase, 0);
}
void run_worker(Worker *worker) { ev_run(worker->sessions->get_loop(), 0); }
} // namespace
class ListenEventHandler {
class AcceptHandler {
public:
ListenEventHandler(Sessions *sessions, const Config *config)
AcceptHandler(Sessions *sessions, const Config *config)
: sessions_(sessions), config_(config), next_worker_(0) {
int rv;
if (config_->num_worker == 1) {
return;
}
@ -1234,47 +1332,42 @@ public:
if (config_->verbose) {
std::cerr << "spawning thread #" << i << std::endl;
}
int socks[2];
rv = socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
if (rv == -1) {
std::cerr << "socketpair() failed: errno=" << errno << std::endl;
assert(0);
}
evutil_make_socket_nonblocking(socks[0]);
evutil_make_socket_nonblocking(socks[1]);
auto bev = bufferevent_socket_new(sessions_->get_evbase(), socks[0],
BEV_OPT_DEFER_CALLBACKS |
BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
std::cerr << "bufferevent_socket_new() failed" << std::endl;
assert(0);
}
workers_.push_back(bev);
auto t = std::thread(run_worker, i, socks[1], sessions_->get_ssl_ctx(),
config_);
auto worker = util::make_unique<Worker>();
auto loop = ev_loop_new(0);
worker->sessions =
util::make_unique<Sessions>(loop, config_, sessions_->get_ssl_ctx());
ev_async_init(&worker->w, worker_acceptcb);
worker->w.data = worker.get();
ev_async_start(loop, &worker->w);
auto t = std::thread(run_worker, worker.get());
t.detach();
workers_.push_back(std::move(worker));
}
}
void accept_connection(int fd, sockaddr *addr, int addrlen) {
void accept_connection(int fd) {
if (config_->num_worker == 1) {
sessions_->accept_connection(fd);
return;
}
// Dispatch client to the one of the worker threads, in a round
// robin manner.
auto client = ClientInfo{fd};
bufferevent_write(workers_[next_worker_], &client, sizeof(client));
auto &worker = workers_[next_worker_];
if (next_worker_ == config_->num_worker - 1) {
next_worker_ = 0;
} else {
++next_worker_;
}
{
std::lock_guard<std::mutex> lock(worker->m);
worker->q.push_back({fd});
}
ev_async_send(worker->sessions->get_loop(), &worker->w);
}
private:
// In multi threading mode, this includes bufferevent to dispatch
// client to the worker threads.
std::vector<bufferevent *> workers_;
std::vector<std::unique_ptr<Worker>> workers_;
Sessions *sessions_;
const Config *config_;
// In multi threading mode, this points to the next thread that
@ -1282,6 +1375,47 @@ private:
size_t next_worker_;
};
namespace {
void acceptcb(struct ev_loop *loop, ev_io *w, int revents);
} // namespace
class ListenEventHandler {
public:
ListenEventHandler(Sessions *sessions, int fd,
std::shared_ptr<AcceptHandler> acceptor)
: acceptor_(acceptor), sessions_(sessions), fd_(fd) {
ev_io_init(&w_, acceptcb, fd, EV_READ);
w_.data = this;
ev_io_start(sessions_->get_loop(), &w_);
}
void accept_connection() {
for (;;) {
auto fd = accept(fd_, nullptr, nullptr);
if (fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
continue;
}
make_socket_nonblocking(fd);
acceptor_->accept_connection(fd);
}
}
private:
ev_io w_;
std::shared_ptr<AcceptHandler> acceptor_;
Sessions *sessions_;
int fd_;
};
namespace {
void acceptcb(struct ev_loop *loop, ev_io *w, int revents) {
auto handler = static_cast<ListenEventHandler *>(w->data);
handler->accept_connection();
}
} // namespace
HttpServer::HttpServer(const Config *config) : config_(config) {}
namespace {
@ -1303,24 +1437,13 @@ int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) {
} // namespace
namespace {
void evlistener_acceptcb(evconnlistener *listener, int fd, sockaddr *addr,
int addrlen, void *arg) {
auto handler = static_cast<ListenEventHandler *>(arg);
handler->accept_connection(fd, addr, addrlen);
}
} // namespace
namespace {
void evlistener_errorcb(evconnlistener *listener, void *ptr) {
std::cerr << "Accepting incoming connection failed" << std::endl;
}
} // namespace
namespace {
int start_listen(event_base *evbase, Sessions *sessions, const Config *config) {
int start_listen(struct ev_loop *loop, Sessions *sessions,
const Config *config) {
addrinfo hints;
int r;
bool ok = false;
auto acceptor = std::make_shared<AcceptHandler>(sessions, config);
auto service = util::utos(config->port);
memset(&hints, 0, sizeof(addrinfo));
@ -1331,10 +1454,6 @@ int start_listen(event_base *evbase, Sessions *sessions, const Config *config) {
hints.ai_flags |= AI_ADDRCONFIG;
#endif // AI_ADDRCONFIG
auto listen_handler_store =
util::make_unique<ListenEventHandler>(sessions, config);
auto listen_handler = listen_handler_store.get();
addrinfo *res, *rp;
r = getaddrinfo(nullptr, service.c_str(), &hints, &res);
if (r != 0) {
@ -1352,7 +1471,7 @@ int start_listen(event_base *evbase, Sessions *sessions, const Config *config) {
close(fd);
continue;
}
evutil_make_socket_nonblocking(fd);
make_socket_nonblocking(fd);
#ifdef IPV6_V6ONLY
if (rp->ai_family == AF_INET6) {
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
@ -1362,18 +1481,14 @@ int start_listen(event_base *evbase, Sessions *sessions, const Config *config) {
}
}
#endif // IPV6_V6ONLY
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0) {
auto evlistener =
evconnlistener_new(evbase, evlistener_acceptcb, listen_handler,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, fd);
evconnlistener_set_error_cb(evlistener, evlistener_errorcb);
listen_handler_store.release();
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0 && listen(fd, 1000) == 0) {
new ListenEventHandler(sessions, fd, acceptor);
if (config->verbose) {
std::cout << (rp->ai_family == AF_INET ? "IPv4" : "IPv6")
<< ": listen on port " << config->port << std::endl;
}
ok = true;
continue;
} else {
std::cerr << strerror(errno) << std::endl;
@ -1382,7 +1497,7 @@ int start_listen(event_base *evbase, Sessions *sessions, const Config *config) {
}
freeaddrinfo(res);
if (listen_handler_store) {
if (!ok) {
return -1;
}
return 0;
@ -1512,32 +1627,21 @@ int HttpServer::run() {
#endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
}
auto evcfg = event_config_new();
event_config_set_flag(evcfg, EVENT_BASE_FLAG_NOLOCK);
auto loop = EV_DEFAULT;
auto evbase = event_base_new_with_config(evcfg);
Sessions sessions(evbase, config_, ssl_ctx);
if (start_listen(evbase, &sessions, config_) != 0) {
Sessions sessions(loop, config_, ssl_ctx);
if (start_listen(loop, &sessions, config_) != 0) {
std::cerr << "Could not listen" << std::endl;
return -1;
}
auto refresh_ev = event_new(evbase, -1, EV_PERSIST, refresh_cb, nullptr);
if (!refresh_ev) {
std::cerr << "Could not add refresh timer" << std::endl;
return -1;
}
timeval refresh_timeout = {1, 0};
if (event_add(refresh_ev, &refresh_timeout) == -1) {
std::cerr << "Adding refresh event failed" << std::endl;
return -1;
}
ev_timer refresh_wtc;
ev_timer_init(&refresh_wtc, refresh_cb, 1.0, 0.);
ev_timer_again(loop, &refresh_wtc);
cached_date = std::make_shared<std::string>(util::http_date(time(nullptr)));
event_base_loop(evbase, 0);
ev_run(loop, 0);
return 0;
}

View File

@ -39,22 +39,12 @@
#include <openssl/ssl.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <ev.h>
#include <nghttp2/nghttp2.h>
#ifdef __cplusplus
extern "C" {
#endif
#include "nghttp2_buf.h"
#ifdef __cplusplus
}
#endif
#include "http2.h"
#include "ringbuf.h"
namespace nghttp2 {
@ -65,8 +55,8 @@ struct Config {
std::string private_key_file;
std::string cert_file;
std::string dh_param_file;
timeval stream_read_timeout;
timeval stream_write_timeout;
ev_tstamp stream_read_timeout;
ev_tstamp stream_write_timeout;
nghttp2_option *session_option;
void *data_ptr;
size_t padding;
@ -88,8 +78,8 @@ class Http2Handler;
struct Stream {
Headers headers;
Http2Handler *handler;
event *rtimer;
event *wtimer;
ev_timer rtimer;
ev_timer wtimer;
int64_t body_left;
int32_t stream_id;
int file;
@ -106,7 +96,6 @@ public:
void remove_self();
int setup_bev();
int send();
int on_read();
int on_write();
int on_connect();
@ -137,14 +126,29 @@ public:
void remove_settings_timer();
void terminate_session(uint32_t error_code);
int fill_rb();
int read_clear();
int write_clear();
int tls_handshake();
int read_tls();
int write_tls();
struct ev_loop *get_loop() const;
private:
ev_io wev_;
ev_io rev_;
ev_timer settings_timerev_;
std::map<int32_t, std::unique_ptr<Stream>> id2stream_;
RingBuf<16384> rb_;
std::function<int(Http2Handler &)> read_, write_;
int64_t session_id_;
nghttp2_session *session_;
Sessions *sessions_;
SSL *ssl_;
bufferevent *bev_;
event *settings_timerev_;
const uint8_t *data_pending_;
size_t data_pendinglen_;
int fd_;
};

View File

@ -50,6 +50,15 @@ AM_LDFLAGS = \
@JANSSON_LIBS@ \
@ZLIB_LIBS@ \
@APPLDFLAGS@
EVLDFLAGS = \
@JEMALLOC_LIBS@ \
@LIBSPDYLAY_LIBS@ \
@XML_LIBS@ \
@LIBEV_LIBS@ \
@OPENSSL_LIBS@ \
@JANSSON_LIBS@ \
@ZLIB_LIBS@ \
@APPLDFLAGS@
LDADD = \
$(top_builddir)/lib/libnghttp2.la \
@ -59,9 +68,9 @@ if ENABLE_APP
bin_PROGRAMS += nghttp nghttpd nghttpx
HELPER_OBJECTS = util.cc libevent_util.cc \
HELPER_OBJECTS = util.cc \
http2.cc timegm.c app_helper.cc nghttp2_gzip.c
HELPER_HFILES = util.h libevent_util.h \
HELPER_HFILES = util.h \
http2.h timegm.h app_helper.h nghttp2_config.h \
nghttp2_gzip.h
@ -74,11 +83,15 @@ endif # HAVE_LIBXML2
nghttp_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttp.cc \
${HTML_PARSER_OBJECTS} ${HTML_PARSER_HFILES} \
libevent_util.cc libevent_util.h \
ssl.cc ssl.h
nghttpd_LDFLAGS = ${EVLDFLAGS}
nghttpd_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttpd.cc \
ssl.cc ssl.h \
HttpServer.cc HttpServer.h
HttpServer.cc HttpServer.h \
ringbuf.h
bin_PROGRAMS += h2load
@ -141,7 +154,8 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \
http2_test.cc http2_test.h \
util_test.cc util_test.h \
nghttp2_gzip_test.c nghttp2_gzip_test.h \
nghttp2_gzip.c nghttp2_gzip.h
nghttp2_gzip.c nghttp2_gzip.h \
ringbuf_test.cc ringbuf_test.h
nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS}\
-DNGHTTP2_TESTS_DIR=\"$(top_srcdir)/tests\"
nghttpx_unittest_LDFLAGS = ${AM_LDFLAGS} \

121
src/ringbuf.h Normal file
View File

@ -0,0 +1,121 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef RINGBUF_H
#define RINGBUF_H
#include <sys/uio.h>
#include <cstring>
#include <algorithm>
namespace nghttp2 {
template <size_t N> struct RingBuf {
RingBuf() : pos(0), len(0) {}
// Returns the number of bytes to read.
size_t rleft() const { return len; }
// Returns the number of bytes this buffer can store.
size_t wleft() const { return N - len; }
// Writes up to min(wleft(), |count|) bytes from buffer pointed by
// |buf|. Returns number of bytes written.
size_t write(const void *buf, size_t count) {
count = std::min(count, wleft());
auto last = (pos + len) % N;
if (count > N - last) {
auto c = N - last;
memcpy(begin + last, buf, c);
memcpy(begin, reinterpret_cast<const uint8_t *>(buf) + c, count - c);
} else {
memcpy(begin + last, buf, count);
}
len += count;
return count;
}
// Drains min(rleft(), |count|) bytes from start of the buffer.
size_t drain(size_t count) {
count = std::min(count, rleft());
pos = (pos + count) % N;
len -= count;
return count;
}
// Returns pointer to the next contiguous readable buffer and its
// length.
std::pair<const void *, size_t> get() const {
if (pos + len > N) {
return {begin + pos, N - pos};
}
return {begin + pos, len};
}
void reset() { pos = len = 0; }
// Fills |iov| for reading. |iov| must contain at least 2 elements.
// Returns the number of filled elements.
int riovec(struct iovec *iov) {
if (len == 0) {
return 0;
}
if (pos + len > N) {
auto c = N - pos;
iov[0].iov_base = begin + pos;
iov[0].iov_len = c;
iov[1].iov_base = begin;
iov[1].iov_len = len - c;
return 2;
}
iov[0].iov_base = begin + pos;
iov[0].iov_len = len;
return 1;
}
// Fills |iov| for writing. |iov| must contain at least 2 elements.
// Returns the number of filled elements.
int wiovec(struct iovec *iov) {
if (len == N) {
return 0;
}
if (pos == 0) {
iov[0].iov_base = begin + pos + len;
iov[0].iov_len = N - pos - len;
return 1;
}
if (pos + len < N) {
auto c = N - pos - len;
iov[0].iov_base = begin + pos + len;
iov[0].iov_len = c;
iov[1].iov_base = begin;
iov[1].iov_len = N - len - c;
return 2;
}
auto last = (pos + len) % N;
iov[0].iov_base = begin + last;
iov[0].iov_len = N - len;
return 1;
}
size_t pos;
size_t len;
uint8_t begin[N];
};
} // namespace nghttp2
#endif // RINGBUF_H

182
src/ringbuf_test.cc Normal file
View File

@ -0,0 +1,182 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "ringbuf_test.h"
#include <cstring>
#include <iostream>
#include <CUnit/CUnit.h>
#include <nghttp2/nghttp2.h>
#include "ringbuf.h"
namespace nghttp2 {
void test_ringbuf_write(void) {
RingBuf<16> b;
CU_ASSERT(0 == b.rleft());
CU_ASSERT(16 == b.wleft());
b.write("012", 3);
CU_ASSERT(3 == b.rleft());
CU_ASSERT(13 == b.wleft());
CU_ASSERT(0 == b.pos);
CU_ASSERT(3 == b.len);
b.drain(3);
CU_ASSERT(0 == b.rleft());
CU_ASSERT(16 == b.wleft());
CU_ASSERT(3 == b.pos);
CU_ASSERT(0 == b.len);
b.write("0123456789ABCDEF", 16);
CU_ASSERT(16 == b.rleft());
CU_ASSERT(0 == b.wleft());
CU_ASSERT(3 == b.pos);
CU_ASSERT(16 == b.len);
CU_ASSERT(0 == memcmp(b.begin, "DEF0123456789ABC", 16));
const void *p;
size_t len;
std::tie(p, len) = b.get();
CU_ASSERT(13 == len);
CU_ASSERT(0 == memcmp(p, "0123456789ABC", len));
b.drain(14);
CU_ASSERT(2 == b.rleft());
CU_ASSERT(14 == b.wleft());
CU_ASSERT(1 == b.pos);
CU_ASSERT(2 == b.len);
std::tie(p, len) = b.get();
CU_ASSERT(2 == len);
CU_ASSERT(0 == memcmp(p, "EF", len));
}
void test_ringbuf_iovec(void) {
RingBuf<16> b;
struct iovec iov[2];
auto rv = b.riovec(iov);
CU_ASSERT(0 == rv);
rv = b.wiovec(iov);
CU_ASSERT(1 == rv);
CU_ASSERT(b.begin == iov[0].iov_base);
CU_ASSERT(16 == iov[0].iov_len);
// set pos to somewhere middle of the buffer, this will require 2
// iovec for writing.
b.pos = 6;
rv = b.riovec(iov);
CU_ASSERT(0 == rv);
rv = b.wiovec(iov);
CU_ASSERT(2 == rv);
CU_ASSERT(b.begin + b.pos == iov[0].iov_base);
CU_ASSERT(10 == iov[0].iov_len);
CU_ASSERT(b.begin == iov[1].iov_base);
CU_ASSERT(6 == iov[1].iov_len);
// occupy first region of buffer
b.pos = 0;
b.len = 10;
rv = b.riovec(iov);
CU_ASSERT(1 == rv);
CU_ASSERT(b.begin == iov[0].iov_base);
CU_ASSERT(10 == iov[0].iov_len);
rv = b.wiovec(iov);
CU_ASSERT(1 == rv);
CU_ASSERT(b.begin + b.len == iov[0].iov_base);
CU_ASSERT(6 == iov[0].iov_len);
// occupy last region of buffer
b.pos = 6;
b.len = 10;
rv = b.riovec(iov);
CU_ASSERT(1 == rv);
CU_ASSERT(b.begin + b.pos == iov[0].iov_base);
CU_ASSERT(10 == iov[0].iov_len);
rv = b.wiovec(iov);
CU_ASSERT(1 == rv);
CU_ASSERT(b.begin == iov[0].iov_base);
CU_ASSERT(6 == iov[0].iov_len);
// occupy middle of buffer
b.pos = 3;
b.len = 10;
rv = b.riovec(iov);
CU_ASSERT(1 == rv);
CU_ASSERT(b.begin + b.pos == iov[0].iov_base);
CU_ASSERT(10 == iov[0].iov_len);
rv = b.wiovec(iov);
CU_ASSERT(2 == rv);
CU_ASSERT(b.begin + b.pos + b.len == iov[0].iov_base);
CU_ASSERT(3 == iov[0].iov_len);
CU_ASSERT(b.begin == iov[1].iov_base);
CU_ASSERT(3 == iov[1].iov_len);
// crossover
b.pos = 13;
b.len = 10;
rv = b.riovec(iov);
CU_ASSERT(2 == rv);
CU_ASSERT(b.begin + b.pos == iov[0].iov_base);
CU_ASSERT(3 == iov[0].iov_len);
CU_ASSERT(b.begin == iov[1].iov_base);
CU_ASSERT(7 == iov[1].iov_len);
rv = b.wiovec(iov);
CU_ASSERT(1 == rv);
CU_ASSERT(b.begin + 7 == iov[0].iov_base);
CU_ASSERT(6 == iov[0].iov_len);
}
} // namespace nghttp2

35
src/ringbuf_test.h Normal file
View File

@ -0,0 +1,35 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef RINGBUF_TEST_H
#define RINGBUF_TEST_H
namespace nghttp2 {
void test_ringbuf_write(void);
void test_ringbuf_iovec(void);
} // namespace nghttp2
#endif // RINGBUF_TEST_H

View File

@ -38,6 +38,7 @@
#include "http2_test.h"
#include "util_test.h"
#include "nghttp2_gzip_test.h"
#include "ringbuf_test.h"
static int init_suite1(void) { return 0; }
@ -115,7 +116,9 @@ int main(int argc, char *argv[]) {
!CU_add_test(pSuite, "util_utox", shrpx::test_util_utox) ||
!CU_add_test(pSuite, "util_http_date", shrpx::test_util_http_date) ||
!CU_add_test(pSuite, "util_select_h2", shrpx::test_util_select_h2) ||
!CU_add_test(pSuite, "gzip_inflate", test_nghttp2_gzip_inflate)) {
!CU_add_test(pSuite, "gzip_inflate", test_nghttp2_gzip_inflate) ||
!CU_add_test(pSuite, "ringbuf_write", nghttp2::test_ringbuf_write) ||
!CU_add_test(pSuite, "ringbuf_iovec", nghttp2::test_ringbuf_iovec)) {
CU_cleanup_registry();
return CU_get_error();
}