/* * nghttp2 - HTTP/2 C Library * * Copyright (c) 2012 Tatsuhiro Tsujikawa * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "shrpx_http2_session.h" #include #include #include #include #include #include "shrpx_upstream.h" #include "shrpx_downstream.h" #include "shrpx_config.h" #include "shrpx_error.h" #include "shrpx_http2_downstream_connection.h" #include "shrpx_client_handler.h" #include "shrpx_ssl.h" #include "shrpx_http.h" #include "shrpx_worker_config.h" #include "http2.h" #include "util.h" #include "libevent_util.h" #include "base64.h" using namespace nghttp2; namespace shrpx { Http2Session::Http2Session(event_base *evbase, SSL_CTX *ssl_ctx) : evbase_(evbase), ssl_ctx_(ssl_ctx), ssl_(nullptr), session_(nullptr), bev_(nullptr), wrbev_(nullptr), rdbev_(nullptr), settings_timerev_(nullptr), fd_(-1), state_(DISCONNECTED), notified_(false), flow_control_(false) {} Http2Session::~Http2Session() { disconnect(); } int Http2Session::disconnect() { if(LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Disconnecting"; } nghttp2_session_del(session_); session_ = nullptr; if(settings_timerev_) { event_free(settings_timerev_); settings_timerev_ = nullptr; } if(ssl_) { SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN); SSL_shutdown(ssl_); } if(bev_) { int fd = bufferevent_getfd(bev_); util::bev_disable_unless(bev_, EV_READ | EV_WRITE); bufferevent_free(bev_); bev_ = nullptr; if(fd != -1) { if(fd_ == -1) { fd_ = fd; } else if(fd != fd_) { SSLOG(WARNING, this) << "fd in bev_ != fd_"; shutdown(fd, SHUT_WR); close(fd); } } } if(ssl_) { SSL_free(ssl_); } ssl_ = nullptr; if(fd_ != -1) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Closing fd=" << fd_; } shutdown(fd_, SHUT_WR); close(fd_); fd_ = -1; } if(proxy_htp_) { proxy_htp_.reset(); } notified_ = false; state_ = DISCONNECTED; // Delete all client handler associated to Downstream. When deleting // Http2DownstreamConnection, it calls this object's // remove_downstream_connection(). The multiple // Http2DownstreamConnection objects belong to the same ClientHandler // object. So first dump ClientHandler objects and delete them once // and for all. std::vector vec(std::begin(dconns_), std::end(dconns_)); std::set handlers; for(auto dc : vec) { handlers.insert(dc->get_client_handler()); } for(auto& h : handlers) { delete h; } dconns_.clear(); for(auto& s : streams_) { delete s; } streams_.clear(); return 0; } namespace { void notify_readcb(bufferevent *bev, void *arg) { int rv; auto http2session = static_cast(arg); http2session->clear_notify(); switch(http2session->get_state()) { case Http2Session::DISCONNECTED: rv = http2session->initiate_connection(); if(rv != 0) { SSLOG(FATAL, http2session) << "Could not initiate backend connection"; http2session->disconnect(); } break; case Http2Session::CONNECTED: rv = http2session->send(); if(rv != 0) { http2session->disconnect(); } break; } } } // namespace namespace { void notify_eventcb(bufferevent *bev, short events, void *arg) { auto http2session = static_cast(arg); // TODO should DIE()? if(events & BEV_EVENT_EOF) { SSLOG(ERROR, http2session) << "Notification connection lost: EOF"; } if(events & BEV_EVENT_TIMEOUT) { SSLOG(ERROR, http2session) << "Notification connection lost: timeout"; } if(events & BEV_EVENT_ERROR) { SSLOG(ERROR, http2session) << "Notification connection lost: network error"; } } } // namespace int Http2Session::init_notification() { int rv; int sockpair[2]; rv = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, sockpair); if(rv == -1) { auto error = errno; SSLOG(FATAL, this) << "socketpair() failed: errno=" << error; return -1; } wrbev_ = bufferevent_socket_new(evbase_, sockpair[0], BEV_OPT_CLOSE_ON_FREE| BEV_OPT_DEFER_CALLBACKS); if(!wrbev_) { SSLOG(FATAL, this) << "bufferevent_socket_new() failed"; for(int i = 0; i < 2; ++i) { close(sockpair[i]); } return -1; } rdbev_ = bufferevent_socket_new(evbase_, sockpair[1], BEV_OPT_CLOSE_ON_FREE| BEV_OPT_DEFER_CALLBACKS); if(!rdbev_) { SSLOG(FATAL, this) << "bufferevent_socket_new() failed"; close(sockpair[1]); return -1; } util::bev_enable_unless(rdbev_, EV_READ); bufferevent_setcb(rdbev_, notify_readcb, nullptr, notify_eventcb, this); return 0; } namespace { void readcb(bufferevent *bev, void *ptr) { int rv; auto http2session = static_cast(ptr); http2session->reset_timeouts(); rv = http2session->on_read(); if(rv != 0) { http2session->disconnect(); } } } // namespace namespace { void writecb(bufferevent *bev, void *ptr) { auto http2session = static_cast(ptr); http2session->reset_timeouts(); if(evbuffer_get_length(bufferevent_get_output(bev)) > 0) { return; } int rv; rv = http2session->on_write(); if(rv != 0) { http2session->disconnect(); } } } // namespace namespace { void eventcb(bufferevent *bev, short events, void *ptr) { auto http2session = static_cast(ptr); if(events & BEV_EVENT_CONNECTED) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Connection established"; } http2session->set_state(Http2Session::CONNECTED); if(!get_config()->downstream_no_tls && !get_config()->insecure && http2session->check_cert() != 0) { http2session->disconnect(); return; } if(http2session->on_connect() != 0) { http2session->disconnect(); return; } auto fd = bufferevent_getfd(bev); int val = 1; if(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast(&val), sizeof(val)) == -1) { auto error = errno; SSLOG(WARNING, http2session) << "Setting option TCP_NODELAY failed: errno=" << error; } return; } if(events & BEV_EVENT_EOF) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "EOF"; } http2session->disconnect(); return; } if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(LOG_ENABLED(INFO)) { if(events & BEV_EVENT_ERROR) { SSLOG(INFO, http2session) << "Network error"; } else { SSLOG(INFO, http2session) << "Timeout"; } } http2session->disconnect(); return; } } } // namespace namespace { void proxy_readcb(bufferevent *bev, void *ptr) { auto http2session = static_cast(ptr); if(http2session->on_read_proxy() == 0) { switch(http2session->get_state()) { case Http2Session::PROXY_CONNECTED: // The current bufferevent is no longer necessary, so delete it // here. But we keep fd_ inside it. http2session->unwrap_free_bev(); // Initiate SSL/TLS handshake through established tunnel. if(http2session->initiate_connection() != 0) { http2session->disconnect(); } break; case Http2Session::PROXY_FAILED: http2session->disconnect(); break; } } else { http2session->disconnect(); } } } // namespace namespace { void proxy_eventcb(bufferevent *bev, short events, void *ptr) { auto http2session = static_cast(ptr); if(events & BEV_EVENT_CONNECTED) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Connected to the proxy"; } std::string req = "CONNECT "; req += get_config()->downstream_hostport.get(); req += " HTTP/1.1\r\nHost: "; req += get_config()->downstream_host.get(); req += "\r\n"; if(get_config()->downstream_http_proxy_userinfo) { req += "Proxy-Authorization: Basic "; size_t len = strlen(get_config()->downstream_http_proxy_userinfo.get()); req += base64::encode (get_config()->downstream_http_proxy_userinfo.get(), get_config()->downstream_http_proxy_userinfo.get() + len); req += "\r\n"; } req += "\r\n"; if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "HTTP proxy request headers\n" << req; } if(bufferevent_write(bev, req.c_str(), req.size()) != 0) { SSLOG(ERROR, http2session) << "bufferevent_write() failed"; http2session->disconnect(); } return; } if(events & BEV_EVENT_EOF) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Proxy EOF"; } http2session->disconnect(); return; } if(events & (BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { if(LOG_ENABLED(INFO)) { if(events & BEV_EVENT_ERROR) { SSLOG(INFO, http2session) << "Network error"; } else { SSLOG(INFO, http2session) << "Timeout"; } } http2session->disconnect(); return; } } } // namespace int Http2Session::check_cert() { return ssl::check_cert(ssl_); } int Http2Session::initiate_connection() { int rv = 0; if(get_config()->downstream_http_proxy_host && state_ == DISCONNECTED) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Connecting to the proxy " << get_config()->downstream_http_proxy_host.get() << ":" << get_config()->downstream_http_proxy_port; } auto fd = socket(get_config()->downstream_http_proxy_addr.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); if(fd == -1) { return SHRPX_ERR_NETWORK; } bev_ = bufferevent_socket_new(evbase_, fd, BEV_OPT_DEFER_CALLBACKS); if(!bev_) { SSLOG(ERROR, this) << "bufferevent_socket_new() failed"; close(fd); return SHRPX_ERR_NETWORK; } util::bev_enable_unless(bev_, EV_READ); bufferevent_set_timeouts(bev_, &get_config()->downstream_read_timeout, &get_config()->downstream_write_timeout); // No need to set writecb because we write the request when // connected at once. bufferevent_setcb(bev_, proxy_readcb, nullptr, proxy_eventcb, this); rv = bufferevent_socket_connect (bev_, const_cast(&get_config()->downstream_http_proxy_addr.sa), get_config()->downstream_http_proxy_addrlen); if(rv != 0) { SSLOG(ERROR, this) << "Failed to connect to the proxy " << get_config()->downstream_http_proxy_host.get() << ":" << get_config()->downstream_http_proxy_port; return SHRPX_ERR_NETWORK; } proxy_htp_ = util::make_unique(); http_parser_init(proxy_htp_.get(), HTTP_RESPONSE); proxy_htp_->data = this; state_ = PROXY_CONNECTING; return 0; } if(state_ == DISCONNECTED || state_ == PROXY_CONNECTED) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Connecting to downstream server"; } if(ssl_ctx_) { // We are establishing TLS connection. ssl_ = SSL_new(ssl_ctx_); if(!ssl_) { SSLOG(ERROR, this) << "SSL_new() failed: " << ERR_error_string(ERR_get_error(), NULL); return -1; } const char *sni_name = nullptr; if ( get_config()->backend_tls_sni_name ) { sni_name = get_config()->backend_tls_sni_name.get(); } else { sni_name = get_config()->downstream_host.get(); } if(sni_name && !util::numeric_host(sni_name)) { // TLS extensions: SNI. There is no documentation about the return // code for this function (actually this is macro wrapping SSL_ctrl // at the time of this writing). SSL_set_tlsext_host_name(ssl_, sni_name); } // If state_ == PROXY_CONNECTED, we has connected to the proxy // using fd_ and tunnel has been established. if(state_ == DISCONNECTED) { assert(fd_ == -1); fd_ = socket(get_config()->downstream_addr.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); } bev_ = bufferevent_openssl_socket_new(evbase_, fd_, ssl_, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); if(!bev_) { SSLOG(ERROR, this) << "bufferevent_socket_new() failed"; return SHRPX_ERR_NETWORK; } rv = bufferevent_socket_connect (bev_, // TODO maybe not thread-safe? const_cast(&get_config()->downstream_addr.sa), get_config()->downstream_addrlen); } else { if(state_ == DISCONNECTED) { // Without TLS and proxy. assert(fd_ == -1); fd_ = socket(get_config()->downstream_addr.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); if(fd_ == -1) { return SHRPX_ERR_NETWORK; } } bev_ = bufferevent_socket_new(evbase_, fd_, BEV_OPT_DEFER_CALLBACKS); if(!bev_) { SSLOG(ERROR, this) << "bufferevent_socket_new() failed"; return SHRPX_ERR_NETWORK; } if(state_ == DISCONNECTED) { rv = bufferevent_socket_connect (bev_, const_cast(&get_config()->downstream_addr.sa), get_config()->downstream_addrlen); } else { // Without TLS but with proxy. // Connection already established. eventcb(bev_, BEV_EVENT_CONNECTED, this); // eventcb() has no return value. Check state_ to whether it was // failed or not. if(state_ == DISCONNECTED) { return -1; } } } if(rv != 0) { return SHRPX_ERR_NETWORK; } bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WATERMARK); util::bev_enable_unless(bev_, EV_READ); bufferevent_setcb(bev_, readcb, writecb, eventcb, this); // Set timeout for HTTP2 session reset_timeouts(); // We have been already connected when no TLS and proxy is used. if(state_ != CONNECTED) { state_ = CONNECTING; } return 0; } // Unreachable DIE(); return 0; } void Http2Session::unwrap_free_bev() { assert(fd_ == -1); fd_ = bufferevent_getfd(bev_); bufferevent_free(bev_); bev_ = nullptr; } namespace { int htp_hdrs_completecb(http_parser *htp) { auto http2session = static_cast(htp->data); // We just check status code here if(htp->status_code == 200) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Tunneling success"; } http2session->set_state(Http2Session::PROXY_CONNECTED); return 0; } SSLOG(WARNING, http2session) << "Tunneling failed"; http2session->set_state(Http2Session::PROXY_FAILED); return 0; } } // namespace namespace { http_parser_settings htp_hooks = { nullptr, // http_cb on_message_begin; nullptr, // http_data_cb on_url; nullptr, // http_data_cb on_status; nullptr, // http_data_cb on_header_field; nullptr, // http_data_cb on_header_value; htp_hdrs_completecb, // http_cb on_headers_complete; nullptr, // http_data_cb on_body; nullptr // http_cb on_message_complete; }; } // namespace int Http2Session::on_read_proxy() { auto input = bufferevent_get_input(bev_); for(;;) { auto inputlen = evbuffer_get_contiguous_space(input); if(inputlen == 0) { assert(evbuffer_get_length(input) == 0); return 0; } auto mem = evbuffer_pullup(input, inputlen); size_t nread = http_parser_execute(proxy_htp_.get(), &htp_hooks, reinterpret_cast(mem), inputlen); if(evbuffer_drain(input, nread) != 0) { SSLOG(FATAL, this) << "evbuffer_drain() failed"; return -1; } auto htperr = HTTP_PARSER_ERRNO(proxy_htp_.get()); if(htperr != HPE_OK) { return -1; } } } void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn) { dconns_.insert(dconn); } void Http2Session::remove_downstream_connection (Http2DownstreamConnection *dconn) { dconns_.erase(dconn); dconn->detach_stream_data(); } void Http2Session::remove_stream_data(StreamData *sd) { streams_.erase(sd); if(sd->dconn) { sd->dconn->detach_stream_data(); } delete sd; } int Http2Session::submit_request(Http2DownstreamConnection *dconn, int32_t pri, const nghttp2_nv *nva, size_t nvlen, const nghttp2_data_provider *data_prd) { assert(state_ == CONNECTED); auto sd = util::make_unique(); // TODO Specify nullptr to pri_spec for now auto stream_id = nghttp2_submit_request(session_, nullptr, nva, nvlen, data_prd, sd.get()); if(stream_id < 0) { SSLOG(FATAL, this) << "nghttp2_submit_request() failed: " << nghttp2_strerror(stream_id); return -1; } dconn->attach_stream_data(sd.get()); dconn->get_downstream()->set_downstream_stream_id(stream_id); streams_.insert(sd.release()); return 0; } int Http2Session::submit_rst_stream(int32_t stream_id, uint32_t error_code) { assert(state_ == CONNECTED); if(LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "RST_STREAM stream_id=" << stream_id << " with error_code=" << error_code; } int rv = nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE, stream_id, error_code); if(rv != 0) { SSLOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: " << nghttp2_strerror(rv); return -1; } return 0; } int Http2Session::submit_priority(Http2DownstreamConnection *dconn, int32_t pri) { assert(state_ == CONNECTED); if(!dconn) { return 0; } int rv; // TODO Disabled temporarily // rv = nghttp2_submit_priority(session_, NGHTTP2_FLAG_NONE, // dconn->get_downstream()-> // get_downstream_stream_id(), pri); rv = 0; if(rv < NGHTTP2_ERR_FATAL) { SSLOG(FATAL, this) << "nghttp2_submit_priority() failed: " << nghttp2_strerror(rv); return -1; } return 0; } nghttp2_session* Http2Session::get_session() const { return session_; } bool Http2Session::get_flow_control() const { return flow_control_; } int Http2Session::resume_data(Http2DownstreamConnection *dconn) { assert(state_ == CONNECTED); auto downstream = dconn->get_downstream(); int rv = nghttp2_session_resume_data(session_, downstream->get_downstream_stream_id()); switch(rv) { case 0: case NGHTTP2_ERR_INVALID_ARGUMENT: return 0; default: SSLOG(FATAL, this) << "nghttp2_resume_session() failed: " << nghttp2_strerror(rv); return -1; } } namespace { void call_downstream_readcb(Http2Session *http2session, Downstream *downstream) { auto upstream = downstream->get_upstream(); if(upstream) { (upstream->get_downstream_readcb()) (http2session->get_bev(), downstream->get_downstream_connection()); } } } // namespace namespace { int on_stream_close_callback (nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { auto http2session = static_cast(user_data); if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Stream stream_id=" << stream_id << " is being closed"; } auto sd = static_cast (nghttp2_session_get_stream_user_data(session, stream_id)); if(sd == 0) { // We might get this close callback when pushed streams are // closed. return 0; } auto dconn = sd->dconn; if(dconn) { auto downstream = dconn->get_downstream(); if(downstream && downstream->get_downstream_stream_id() == stream_id) { if(downstream->get_upgraded() && downstream->get_response_state() == Downstream::HEADER_COMPLETE) { // For tunneled connection, we have to submit RST_STREAM to // upstream *after* whole response body is sent. We just set // MSG_COMPLETE here. Upstream will take care of that. if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "RST_STREAM against tunneled stream " << "stream_id=" << stream_id; } downstream->get_upstream()->on_downstream_body_complete(downstream); downstream->set_response_state(Downstream::MSG_COMPLETE); } else if(error_code == NGHTTP2_NO_ERROR) { if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { downstream->set_response_state(Downstream::MSG_RESET); } } else { downstream->set_response_state(Downstream::MSG_RESET); } call_downstream_readcb(http2session, downstream); // dconn may be deleted } } // The life time of StreamData ends here http2session->remove_stream_data(sd); return 0; } } // namespace namespace { void settings_timeout_cb(evutil_socket_t fd, short what, void *arg) { auto http2session = static_cast(arg); SSLOG(INFO, http2session) << "SETTINGS timeout"; if(http2session->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) { http2session->disconnect(); return; } if(http2session->send() != 0) { http2session->disconnect(); } } } // namespace int Http2Session::start_settings_timer() { int rv; // We submit SETTINGS only once if(settings_timerev_) { return 0; } settings_timerev_ = evtimer_new(evbase_, settings_timeout_cb, this); if(settings_timerev_ == nullptr) { return -1; } // SETTINGS ACK timeout is 10 seconds for now timeval settings_timeout = { 10, 0 }; rv = evtimer_add(settings_timerev_, &settings_timeout); if(rv == -1) { return -1; } return 0; } void Http2Session::stop_settings_timer() { if(settings_timerev_ == nullptr) { return; } event_free(settings_timerev_); settings_timerev_ = nullptr; } namespace { int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) { auto http2session = static_cast(user_data); auto sd = static_cast (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if(!sd || !sd->dconn) { return 0; } auto downstream = sd->dconn->get_downstream(); if(!downstream) { return 0; } if(frame->hd.type != NGHTTP2_HEADERS || (frame->headers.cat != NGHTTP2_HCAT_RESPONSE && !downstream->get_expect_final_response())) { return 0; } if(downstream->get_response_headers_sum() > Downstream::MAX_HEADERS_SUM) { if(LOG_ENABLED(INFO)) { DLOG(INFO, downstream) << "Too large header block size=" << downstream->get_response_headers_sum(); } return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; } if(!http2::check_nv(name, namelen, value, valuelen)) { return 0; } if(namelen > 0 && name[0] == ':') { if(!downstream->response_pseudo_header_allowed() || !http2::check_http2_response_pseudo_header(name, namelen)) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_PROTOCOL_ERROR); return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; } } downstream->split_add_response_header(name, namelen, value, valuelen, flags & NGHTTP2_NV_FLAG_NO_INDEX); return 0; } } // namespace namespace { int on_begin_headers_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { auto http2session = static_cast(user_data); if(frame->headers.cat == NGHTTP2_HCAT_REQUEST) { // server sends request HEADERS http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_REFUSED_STREAM); return 0; } if(frame->headers.cat != NGHTTP2_HCAT_RESPONSE) { return 0; } auto sd = static_cast (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if(!sd || !sd->dconn) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_INTERNAL_ERROR); return 0; } auto downstream = sd->dconn->get_downstream(); if(!downstream || downstream->get_downstream_stream_id() != frame->hd.stream_id) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_INTERNAL_ERROR); return 0; } return 0; } } // namespace namespace { int on_response_headers(Http2Session *http2session, Downstream *downstream, nghttp2_session *session, const nghttp2_frame *frame) { int rv; auto upstream = downstream->get_upstream(); downstream->normalize_response_headers(); auto& nva = downstream->get_response_headers(); downstream->set_expect_final_response(false); if(!http2::check_http2_response_headers(nva)) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_PROTOCOL_ERROR); downstream->set_response_state(Downstream::MSG_RESET); call_downstream_readcb(http2session, downstream); return 0; } auto status = http2::get_unique_header(nva, ":status"); int status_code; if(!status || http2::value_lws(status) || (status_code = http2::parse_http_status_code(status->value)) == -1) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_PROTOCOL_ERROR); downstream->set_response_state(Downstream::MSG_RESET); call_downstream_readcb(http2session, downstream); return 0; } downstream->set_response_http_status(status_code); downstream->set_response_major(2); downstream->set_response_minor(0); if(LOG_ENABLED(INFO)) { std::stringstream ss; for(auto& nv : nva) { ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n"; } SSLOG(INFO, http2session) << "HTTP response headers. stream_id=" << frame->hd.stream_id << "\n" << ss.str(); } if(downstream->get_non_final_response()) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "This is non-final response."; } downstream->set_expect_final_response(true); rv = upstream->on_downstream_header_complete(downstream); // Now Dowstream's response headers are erased. if(rv != 0) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_PROTOCOL_ERROR); downstream->set_response_state(Downstream::MSG_RESET); } return 0; } auto content_length = http2::get_header(nva, "content-length"); if(!content_length && downstream->get_request_method() != "HEAD" && downstream->get_request_method() != "CONNECT") { unsigned int status; status = downstream->get_response_http_status(); if(!((100 <= status && status <= 199) || status == 204 || status == 304)) { // Here we have response body but Content-Length is not known // in advance. if(downstream->get_request_major() <= 0 || downstream->get_request_minor() <= 0) { // We simply close connection for pre-HTTP/1.1 in this case. downstream->set_response_connection_close(true); } else { // Otherwise, use chunked encoding to keep upstream // connection open. In HTTP2, we are supporsed not to // receive transfer-encoding. downstream->add_response_header("transfer-encoding", "chunked"); downstream->set_chunked_response(true); } } } downstream->set_response_state(Downstream::HEADER_COMPLETE); downstream->check_upgrade_fulfilled(); if(downstream->get_upgraded()) { downstream->set_response_connection_close(true); // On upgrade sucess, both ends can send data if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) { // If resume_read fails, just drop connection. Not ideal. delete upstream->get_client_handler(); return -1; } downstream->set_request_state(Downstream::HEADER_COMPLETE); if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "HTTP upgrade success. stream_id=" << frame->hd.stream_id; } } else if(downstream->get_request_method() == "CONNECT") { // If request is CONNECT, terminate request body to avoid for // stream to stall. downstream->end_upload_data(); } rv = upstream->on_downstream_header_complete(downstream); if(rv != 0) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_PROTOCOL_ERROR); downstream->set_response_state(Downstream::MSG_RESET); } return 0; } } // namespace namespace { int on_frame_recv_callback (nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { int rv; auto http2session = static_cast(user_data); switch(frame->hd.type) { case NGHTTP2_DATA: { auto sd = static_cast (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if(!sd || !sd->dconn) { break; } auto downstream = sd->dconn->get_downstream(); if(!downstream || downstream->get_downstream_stream_id() != frame->hd.stream_id) { break; } auto upstream = downstream->get_upstream(); rv = upstream->on_downstream_body(downstream, nullptr, 0, true); if(rv != 0) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_INTERNAL_ERROR); downstream->set_response_state(Downstream::MSG_RESET); } else if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { downstream->disable_downstream_rtimer(); if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { downstream->set_response_state(Downstream::MSG_COMPLETE); rv = upstream->on_downstream_body_complete(downstream); if(rv != 0) { downstream->set_response_state(Downstream::MSG_RESET); } } } call_downstream_readcb(http2session, downstream); break; } case NGHTTP2_HEADERS: { auto sd = static_cast (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if(!sd || !sd->dconn) { break; } auto downstream = sd->dconn->get_downstream(); if(!downstream) { return 0; } if(frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { rv = on_response_headers(http2session, downstream, session, frame); if(rv != 0) { return 0; } } if(frame->headers.cat == NGHTTP2_HCAT_HEADERS) { if(downstream->get_expect_final_response()) { rv = on_response_headers(http2session, downstream, session, frame); if(rv != 0) { return 0; } } else if((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) { http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_PROTOCOL_ERROR); return 0; } } if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { downstream->disable_downstream_rtimer(); if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { downstream->set_response_state(Downstream::MSG_COMPLETE); auto upstream = downstream->get_upstream(); rv = upstream->on_downstream_body_complete(downstream); if(rv != 0) { downstream->set_response_state(Downstream::MSG_RESET); } } } else { downstream->reset_downstream_rtimer(); } // This may delete downstream call_downstream_readcb(http2session, downstream); break; } case NGHTTP2_RST_STREAM: { auto sd = static_cast (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if(sd && sd->dconn) { auto downstream = sd->dconn->get_downstream(); if(downstream && downstream->get_downstream_stream_id() == frame->hd.stream_id) { downstream->set_response_rst_stream_error_code (frame->rst_stream.error_code); call_downstream_readcb(http2session, downstream); } } break; } case NGHTTP2_SETTINGS: if((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) { break; } http2session->stop_settings_timer(); break; case NGHTTP2_PUSH_PROMISE: if(LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Received downstream PUSH_PROMISE stream_id=" << frame->hd.stream_id << ", promised_stream_id=" << frame->push_promise.promised_stream_id; } // We just respond with RST_STREAM. http2session->submit_rst_stream(frame->push_promise.promised_stream_id, NGHTTP2_REFUSED_STREAM); break; default: break; } return 0; } } // namespace namespace { int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { int rv; auto http2session = static_cast(user_data); auto sd = static_cast (nghttp2_session_get_stream_user_data(session, stream_id)); if(!sd || !sd->dconn) { http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); if(http2session->consume(stream_id, len) != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; } return 0; } auto downstream = sd->dconn->get_downstream(); if(!downstream || downstream->get_downstream_stream_id() != stream_id || !downstream->expect_response_body()) { http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); if(http2session->consume(stream_id, len) != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; } return 0; } // We don't want DATA after non-final response, which is illegal in // HTTP. if(downstream->get_non_final_response()) { http2session->submit_rst_stream(stream_id, NGHTTP2_PROTOCOL_ERROR); if(http2session->consume(stream_id, len) != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; } return 0; } downstream->reset_downstream_rtimer(); downstream->add_response_bodylen(len); auto upstream = downstream->get_upstream(); rv = upstream->on_downstream_body(downstream, data, len, false); if(rv != 0) { http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); if(http2session->consume(stream_id, len) != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; } downstream->set_response_state(Downstream::MSG_RESET); } downstream->add_response_datalen(len); call_downstream_readcb(http2session, downstream); return 0; } } // namespace namespace { int on_frame_send_callback(nghttp2_session* session, const nghttp2_frame *frame, void *user_data) { auto http2session = static_cast(user_data); if(frame->hd.type == NGHTTP2_DATA || frame->hd.type == NGHTTP2_HEADERS) { if((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) { return 0; } auto sd = static_cast (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if(!sd || !sd->dconn) { return 0; } auto downstream = sd->dconn->get_downstream(); if(!downstream || downstream->get_downstream_stream_id() != frame->hd.stream_id) { return 0; } downstream->reset_downstream_rtimer(); return 0; } if(frame->hd.type == NGHTTP2_SETTINGS && (frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) { if(http2session->start_settings_timer() != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; } } return 0; } } // namespace namespace { int on_frame_not_send_callback(nghttp2_session *session, const nghttp2_frame *frame, int lib_error_code, void *user_data) { auto http2session = static_cast(user_data); SSLOG(WARNING, http2session) << "Failed to send control frame type=" << static_cast(frame->hd.type) << "lib_error_code=" << lib_error_code << ":" << nghttp2_strerror(lib_error_code); if(frame->hd.type == NGHTTP2_HEADERS && frame->headers.cat == NGHTTP2_HCAT_REQUEST) { // To avoid stream hanging around, flag Downstream::MSG_RESET and // terminate the upstream and downstream connections. auto sd = static_cast (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if(!sd) { return 0; } if(sd->dconn) { auto downstream = sd->dconn->get_downstream(); if(!downstream || downstream->get_downstream_stream_id() != frame->hd.stream_id) { return 0; } downstream->set_response_state(Downstream::MSG_RESET); call_downstream_readcb(http2session, downstream); } http2session->remove_stream_data(sd); } return 0; } } // namespace int Http2Session::on_connect() { int rv; if(ssl_ctx_) { const unsigned char *next_proto = nullptr; unsigned int next_proto_len; SSL_get0_next_proto_negotiated(ssl_, &next_proto, &next_proto_len); for(int i = 0; i < 2; ++i) { if(next_proto) { if(LOG_ENABLED(INFO)) { std::string proto(next_proto, next_proto+next_proto_len); SSLOG(INFO, this) << "Negotiated next protocol: " << proto; } if(next_proto_len != NGHTTP2_PROTO_VERSION_ID_LEN || memcmp(NGHTTP2_PROTO_VERSION_ID, next_proto, NGHTTP2_PROTO_VERSION_ID_LEN) != 0) { return -1; } break; } #if OPENSSL_VERSION_NUMBER >= 0x10002000L SSL_get0_alpn_selected(ssl_, &next_proto, &next_proto_len); #else // OPENSSL_VERSION_NUMBER < 0x10002000L break; #endif // OPENSSL_VERSION_NUMBER < 0x10002000L } if(!next_proto) { return -1; } } nghttp2_session_callbacks *callbacks; rv = nghttp2_session_callbacks_new(&callbacks); if(rv != 0) { return -1; } auto callbacks_deleter = util::defer(callbacks, nghttp2_session_callbacks_del); nghttp2_session_callbacks_set_on_stream_close_callback (callbacks, on_stream_close_callback); nghttp2_session_callbacks_set_on_frame_recv_callback (callbacks, on_frame_recv_callback); nghttp2_session_callbacks_set_on_data_chunk_recv_callback (callbacks, on_data_chunk_recv_callback); nghttp2_session_callbacks_set_on_frame_send_callback (callbacks, on_frame_send_callback); nghttp2_session_callbacks_set_on_frame_not_send_callback (callbacks, on_frame_not_send_callback); nghttp2_session_callbacks_set_on_header_callback (callbacks, on_header_callback); nghttp2_session_callbacks_set_on_begin_headers_callback (callbacks, on_begin_headers_callback); if(get_config()->padding) { nghttp2_session_callbacks_set_select_padding_callback (callbacks, http::select_padding_callback); } rv = nghttp2_session_client_new2(&session_, callbacks, this, get_config()->http2_option); if(rv != 0) { return -1; } flow_control_ = true; nghttp2_settings_entry entry[3]; entry[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; entry[0].value = 0; entry[1].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; entry[1].value = get_config()->http2_max_concurrent_streams; entry[2].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; entry[2].value = (1 << get_config()->http2_downstream_window_bits) - 1; rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry, util::array_size(entry)); if(rv != 0) { return -1; } if(get_config()->http2_downstream_connection_window_bits > 16) { int32_t delta = (1 << get_config()->http2_downstream_connection_window_bits) - 1 - NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE; rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_NONE, 0, delta); if(rv != 0) { return -1; } } rv = bufferevent_write(bev_, NGHTTP2_CLIENT_CONNECTION_PREFACE, NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN); if(rv != 0) { SSLOG(FATAL, this) << "bufferevent_write() failed"; return -1; } if(!get_config()->downstream_no_tls && !ssl::check_http2_requirement(ssl_)) { rv = terminate_session(NGHTTP2_INADEQUATE_SECURITY); if(rv != 0) { return -1; } } rv = send(); if(rv != 0) { return -1; } if(!get_config()->downstream_no_tls && !ssl::check_http2_requirement(ssl_)) { return 0; } // submit pending request for(auto dconn : dconns_) { if(dconn->push_request_headers() == 0) { auto downstream = dconn->get_downstream(); auto upstream = downstream->get_upstream(); upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0); continue; } if(LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "backend request failed"; } auto downstream = dconn->get_downstream(); if(!downstream) { continue; } auto upstream = downstream->get_upstream(); upstream->on_downstream_abort_request(downstream, 400); } return 0; } int Http2Session::on_read() { ssize_t rv = 0; auto input = bufferevent_get_input(bev_); for(;;) { auto inputlen = evbuffer_get_contiguous_space(input); if(inputlen == 0) { assert(evbuffer_get_length(input) == 0); return send(); } auto mem = evbuffer_pullup(input, inputlen); rv = nghttp2_session_mem_recv(session_, mem, inputlen); if(rv < 0) { SSLOG(ERROR, this) << "nghttp2_session_recv() returned error: " << nghttp2_strerror(rv); return -1; } if(evbuffer_drain(input, rv) != 0) { SSLOG(FATAL, this) << "evbuffer_drain() faild"; return -1; } } } int Http2Session::on_write() { return send(); } int Http2Session::send() { int rv; uint8_t buf[16384]; auto output = bufferevent_get_output(bev_); util::EvbufferBuffer evbbuf(output, buf, sizeof(buf)); for(;;) { // Check buffer length and return WOULDBLOCK if it is large enough. if(evbuffer_get_length(output) + evbbuf.get_buflen() > Http2Session::OUTBUF_MAX_THRES) { return NGHTTP2_ERR_WOULDBLOCK; } const uint8_t *data; auto datalen = nghttp2_session_mem_send(session_, &data); if(datalen < 0) { SSLOG(ERROR, this) << "nghttp2_session_mem_send() returned error: " << nghttp2_strerror(datalen); break; } if(datalen == 0) { break; } rv = evbbuf.add(data, datalen); if(rv != 0) { SSLOG(FATAL, this) << "evbuffer_add() failed"; return -1; } } rv = evbbuf.flush(); if(rv != 0) { SSLOG(FATAL, this) << "evbuffer_add() failed"; return -1; } if(nghttp2_session_want_read(session_) == 0 && nghttp2_session_want_write(session_) == 0 && evbuffer_get_length(output) == 0) { if(LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "No more read/write for this session"; } return -1; } return 0; } void Http2Session::clear_notify() { auto input = bufferevent_get_output(rdbev_); if(evbuffer_drain(input, evbuffer_get_length(input)) != 0) { SSLOG(FATAL, this) << "evbuffer_drain() failed"; } notified_ = false; } void Http2Session::notify() { if(!notified_) { if(bufferevent_write(wrbev_, "1", 1) != 0) { SSLOG(FATAL, this) << "bufferevent_write failed"; } notified_ = true; } } bufferevent* Http2Session::get_bev() const { return bev_; } int Http2Session::get_state() const { return state_; } void Http2Session::set_state(int state) { state_ = state; } int Http2Session::terminate_session(uint32_t error_code) { int rv; rv = nghttp2_session_terminate_session(session_, error_code); if(rv != 0) { return -1; } return 0; } size_t Http2Session::get_outbuf_length() const { if(bev_) { return evbuffer_get_length(bufferevent_get_output(bev_)); } else { return OUTBUF_MAX_THRES; } } SSL* Http2Session::get_ssl() const { return ssl_; } int Http2Session::consume(int32_t stream_id, size_t len) { int rv; if(!session_) { return 0; } rv = nghttp2_session_consume(session_, stream_id, len); if(rv != 0) { SSLOG(WARNING, this) << "nghttp2_session_consume() returned error: " << nghttp2_strerror(rv); return -1; } return 0; } void Http2Session::reset_timeouts() { bufferevent_set_timeouts(bev_, &get_config()->downstream_read_timeout, &get_config()->downstream_write_timeout); } } // namespace shrpx