From 924b1bd61aad9f4901838603a741fd2c7bf477a5 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 31 Jul 2013 21:48:37 +0900 Subject: [PATCH] Use unmodified http-parser Handle HTTP Upgrade and CONNECT explicitly --- src/http-parser/http_parser.c | 11 ++----- src/shrpx_downstream.cc | 42 +++++++++++++++++++++++-- src/shrpx_downstream.h | 17 ++++++++-- src/shrpx_http2_upstream.cc | 7 +++-- src/shrpx_http_downstream_connection.cc | 27 ++++++++++++++-- src/shrpx_https_upstream.cc | 33 +++++++++++++++---- src/shrpx_spdy_downstream_connection.cc | 4 ++- src/shrpx_spdy_session.cc | 12 +++++-- src/shrpx_spdy_upstream.cc | 7 +++-- 9 files changed, 129 insertions(+), 31 deletions(-) diff --git a/src/http-parser/http_parser.c b/src/http-parser/http_parser.c index f6a44e1e..ed3a9232 100644 --- a/src/http-parser/http_parser.c +++ b/src/http-parser/http_parser.c @@ -1606,14 +1606,9 @@ size_t http_parser_execute (http_parser *parser, /* Exit, the rest of the connect is in a different protocol. */ if (parser->upgrade) { - /* We want to use http_parser for tunneling connection - transparently */ - /* Read body until EOF */ - parser->state = s_body_identity_eof; - break; - /* parser->state = NEW_MESSAGE(); */ - /* CALLBACK_NOTIFY(message_complete); */ - /* return (p - data) + 1; */ + parser->state = NEW_MESSAGE(); + CALLBACK_NOTIFY(message_complete); + return (p - data) + 1; } if (parser->flags & F_SKIPBODY) { diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index b2c940a7..e7c8cb87 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -43,6 +43,8 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) stream_id_(stream_id), priority_(priority), downstream_stream_id_(-1), + upgrade_request_(false), + upgraded_(false), request_state_(INITIAL), request_major_(1), request_minor_(1), @@ -474,10 +476,44 @@ void Downstream::set_recv_window_size(int32_t new_size) recv_window_size_ = new_size; } -bool Downstream::tunnel_established() const +void Downstream::check_upgrade_fulfilled() { - return request_method_ == "CONNECT" && - 200 <= response_http_status_ && response_http_status_ < 300; + if(request_method_ == "CONNECT") { + upgraded_ = 200 <= response_http_status_ && response_http_status_ < 300; + } else { + // TODO Do more strict checking for upgrade headers + for(auto& hd : request_headers_) { + if(util::strieq("upgrade", hd.first.c_str())) { + upgraded_ = true; + break; + } + } + } +} + +bool Downstream::get_upgraded() const +{ + return upgraded_; +} + +void Downstream::check_upgrade_request() +{ + if(request_method_ == "CONNECT") { + upgrade_request_ = true; + } else { + // TODO Do more strict checking for upgrade headers + for(auto& hd : request_headers_) { + if(util::strieq("upgrade", hd.first.c_str())) { + upgrade_request_ = true; + break; + } + } + } +} + +bool Downstream::get_upgrade_request() const +{ + return upgrade_request_; } void Downstream::set_downstream_stream_id(int32_t stream_id) diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 38090e17..578d05b1 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -68,8 +68,16 @@ public: int32_t get_recv_window_size() const; void inc_recv_window_size(int32_t amount); void set_recv_window_size(int32_t new_size); - // Returns true if tunnel connection has been established. - bool tunnel_established() const; + // Returns true if upgrade (HTTP Upgrade or CONNECT) is succeeded. + void check_upgrade_fulfilled(); + // Checks request headers whether the request is upgrade request or + // not. + void check_upgrade_request(); + // Returns true if the request is upgrade. + bool get_upgrade_request() const; + // Returns true if the upgrade is succeded as a result of the call + // check_upgrade_fulfilled(). + bool get_upgraded() const; // downstream request API const Headers& get_request_headers() const; void add_request_header(const std::string& name, const std::string& value); @@ -145,6 +153,11 @@ private: int priority_; // stream ID in backend connection int32_t downstream_stream_id_; + // true if the request contains upgrade token (HTTP Upgrade or + // CONNECT) + bool upgrade_request_; + // true if the connection is upgraded (HTTP Upgrade or CONNECT) + bool upgraded_; int request_state_; std::string request_method_; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 0f14f1b8..44554e98 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -108,7 +108,7 @@ void on_stream_close_callback downstream->set_request_state(Downstream::STREAM_CLOSED); if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { // At this point, downstream response was read - if(!downstream->tunnel_established() && + if(!downstream->get_upgraded() && !downstream->get_response_connection_close()) { // Keep-alive DownstreamConnection *dconn; @@ -192,6 +192,7 @@ void on_frame_recv_callback } downstream->add_request_header("host", host); + downstream->check_upgrade_request(); if(LOG_ENABLED(INFO)) { std::stringstream ss; @@ -588,7 +589,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) } else { DCLOG(INFO, dconn) << "Timeout"; } - if(downstream->tunnel_established()) { + if(downstream->get_upgraded()) { DCLOG(INFO, dconn) << "Note: this is tunnel connection"; } } @@ -678,7 +679,7 @@ ssize_t spdy_data_read_callback(nghttp2_session *session, int nread = evbuffer_remove(body, buf, length); if(nread == 0 && downstream->get_response_state() == Downstream::MSG_COMPLETE) { - if(!downstream->tunnel_established()) { + if(!downstream->get_upgraded()) { *eof = 1; } else { // For tunneling, issue RST_STREAM to finish the stream. diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 68ca6a87..948d393b 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -356,13 +356,26 @@ int htp_hdrs_completecb(http_parser *htp) downstream->set_response_minor(htp->http_minor); downstream->set_response_connection_close(!http_should_keep_alive(htp)); downstream->set_response_state(Downstream::HEADER_COMPLETE); - if(downstream->tunnel_established()) { + downstream->check_upgrade_fulfilled(); + if(downstream->get_upgraded()) { downstream->set_response_connection_close(true); } if(downstream->get_upstream()->on_downstream_header_complete(downstream) != 0) { return -1; } + + if(downstream->get_upgraded()) { + // Upgrade complete, read until EOF in both ends + downstream->get_upstream()->resume_read(SHRPX_MSG_BLOCK, downstream); + downstream->set_request_state(Downstream::HEADER_COMPLETE); + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "HTTP upgrade success. stream_id=" + << downstream->get_stream_id(); + } + } + + unsigned int status = downstream->get_response_http_status(); // Ignore the response body. HEAD response may contain // Content-Length or Transfer-Encoding: chunked. Some server send @@ -441,11 +454,19 @@ http_parser_settings htp_hooks = { int HttpDownstreamConnection::on_read() { evbuffer *input = bufferevent_get_input(bev_); + size_t inputlen = evbuffer_get_length(input); unsigned char *mem = evbuffer_pullup(input, -1); - + if(downstream_->get_upgraded()) { + // For upgraded connection, just pass data to the upstream. + int rv; + rv = downstream_->get_upstream()->on_downstream_body + (downstream_, reinterpret_cast(mem), inputlen); + evbuffer_drain(input, inputlen); + return rv; + } size_t nread = http_parser_execute(response_htp_, &htp_hooks, reinterpret_cast(mem), - evbuffer_get_length(input)); + inputlen); evbuffer_drain(input, nread); http_errno htperr = HTTP_PARSER_ERRNO(response_htp_); diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 5d18748e..9b0f5e67 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -142,6 +142,7 @@ int htp_hdrs_completecb(http_parser *htp) downstream->set_request_connection_close(!http_should_keep_alive(htp)); + downstream->check_upgrade_request(); if(LOG_ENABLED(INFO)) { std::stringstream ss; ss << downstream->get_request_method() << " " @@ -258,20 +259,40 @@ int HttpsUpstream::on_read() { bufferevent *bev = handler_->get_bev(); evbuffer *input = bufferevent_get_input(bev); - + size_t inputlen = evbuffer_get_length(input); unsigned char *mem = evbuffer_pullup(input, -1); - if(evbuffer_get_length(input) == 0) { + if(inputlen == 0) { + return 0; + } + auto downstream = get_downstream(); + // downstream can be nullptr here, because it is initialized in the + // callback chain called by http_parser_execute() + if(downstream && downstream->get_upgraded()) { + int rv = downstream->push_upload_data_chunk + (reinterpret_cast(mem), inputlen); + evbuffer_drain(input, inputlen); + if(rv != 0) { + return -1; + } + if(downstream->get_output_buffer_full()) { + if(LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Downstream output buffer is full"; + } + pause_read(SHRPX_NO_BUFFER); + } return 0; } size_t nread = http_parser_execute(htp_, &htp_hooks, reinterpret_cast(mem), - evbuffer_get_length(input)); + inputlen); evbuffer_drain(input, nread); // Well, actually header length + some body bytes current_header_length_ += nread; - Downstream *downstream = get_downstream(); + // Get downstream again because it may be initialized in http parser + // execution + downstream = get_downstream(); http_errno htperr = HTTP_PARSER_ERRNO(htp_); if(htperr == HPE_PAUSED) { @@ -403,7 +424,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) return; } } - } else if(downstream->tunnel_established()) { + } else if(downstream->get_upgraded()) { // This path is effectively only taken for SPDY downstream // because only SPDY downstream sets response_state to // MSG_COMPLETE and this function. For HTTP downstream, EOF @@ -660,7 +681,7 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) } else if(connection_upgrade) { hdrs += "Connection: upgrade\r\n"; } - } else { + } else if(!downstream->get_upgraded()) { hdrs += "Connection: close\r\n"; } if(!get_config()->no_via) { diff --git a/src/shrpx_spdy_downstream_connection.cc b/src/shrpx_spdy_downstream_connection.cc index 282c4c4c..35892174 100644 --- a/src/shrpx_spdy_downstream_connection.cc +++ b/src/shrpx_spdy_downstream_connection.cc @@ -180,7 +180,9 @@ ssize_t spdy_data_read_callback(nghttp2_session *session, nread = evbuffer_remove(body, buf, length); if(nread == 0) { if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { - *eof = 1; + if(!downstream->get_upgrade_request()) { + *eof = 1; + } break; } else { // This is important because it will handle flow control diff --git a/src/shrpx_spdy_session.cc b/src/shrpx_spdy_session.cc index bd9b670b..647e1820 100644 --- a/src/shrpx_spdy_session.cc +++ b/src/shrpx_spdy_session.cc @@ -814,8 +814,16 @@ void on_frame_recv_callback auto upstream = downstream->get_upstream(); downstream->set_response_state(Downstream::HEADER_COMPLETE); - if(downstream->tunnel_established()) { + downstream->check_upgrade_fulfilled(); + if(downstream->get_upgraded()) { downstream->set_response_connection_close(true); + // On upgrade sucess, both ends can send data + upstream->resume_read(SHRPX_MSG_BLOCK, downstream); + downstream->set_request_state(Downstream::HEADER_COMPLETE); + if(LOG_ENABLED(INFO)) { + SSLOG(INFO, spdy) << "HTTP upgrade success. stream_id=" + << frame->hd.stream_id; + } } rv = upstream->on_downstream_header_complete(downstream); if(rv != 0) { @@ -833,7 +841,7 @@ void on_frame_recv_callback auto downstream = sd->dconn->get_downstream(); if(downstream && downstream->get_downstream_stream_id() == frame->hd.stream_id) { - if(downstream->tunnel_established() && + if(downstream->get_upgraded() && downstream->get_response_state() == Downstream::HEADER_COMPLETE) { // For tunneled connection, we has to submit RST_STREAM to // upstream *after* whole response body is sent. We just set diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 0bc6c943..1cb76314 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -110,7 +110,7 @@ void on_stream_close_callback downstream->set_request_state(Downstream::STREAM_CLOSED); if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { // At this point, downstream response was read - if(!downstream->tunnel_established() && + if(!downstream->get_upgraded() && !downstream->get_response_connection_close()) { // Keep-alive DownstreamConnection *dconn; @@ -194,6 +194,7 @@ void on_ctrl_recv_callback } downstream->add_request_header("host", host); + downstream->check_upgrade_request(); if(LOG_ENABLED(INFO)) { std::stringstream ss; @@ -592,7 +593,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) } else { DCLOG(INFO, dconn) << "Timeout"; } - if(downstream->tunnel_established()) { + if(downstream->get_upgraded()) { DCLOG(INFO, dconn) << "Note: this is tunnel connection"; } } @@ -680,7 +681,7 @@ ssize_t spdy_data_read_callback(spdylay_session *session, int nread = evbuffer_remove(body, buf, length); if(nread == 0 && downstream->get_response_state() == Downstream::MSG_COMPLETE) { - if(!downstream->tunnel_established()) { + if(!downstream->get_upgraded()) { *eof = 1; } else { // For tunneling, issue RST_STREAM to finish the stream.