diff --git a/src/h2load.h b/src/h2load.h index 7fb0067a..4dd323ad 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -441,8 +441,8 @@ struct Client { int quic_recv_crypto_data(ngtcp2_crypto_level crypto_level, const uint8_t *data, size_t datalen); int quic_handshake_completed(); - int quic_recv_stream_data(int64_t stream_id, int fin, const uint8_t *data, - size_t datalen); + int quic_recv_stream_data(uint32_t flags, int64_t stream_id, + const uint8_t *data, size_t datalen); int quic_acked_stream_data_offset(int64_t stream_id, size_t datalen); int quic_stream_close(int64_t stream_id, uint64_t app_error_code); int quic_stream_reset(int64_t stream_id, uint64_t app_error_code); diff --git a/src/h2load_http3_session.cc b/src/h2load_http3_session.cc index aac8df45..3fad5dc7 100644 --- a/src/h2load_http3_session.cc +++ b/src/h2load_http3_session.cc @@ -343,10 +343,10 @@ int Http3Session::init_conn() { return 0; } -ssize_t Http3Session::read_stream(int64_t stream_id, const uint8_t *data, - size_t datalen, int fin) { - auto nconsumed = - nghttp3_conn_read_stream(conn_, stream_id, data, datalen, fin); +ssize_t Http3Session::read_stream(uint32_t flags, int64_t stream_id, + const uint8_t *data, size_t datalen) { + auto nconsumed = nghttp3_conn_read_stream( + conn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN); if (nconsumed < 0) { std::cerr << "nghttp3_conn_read_stream: " << nghttp3_strerror(nconsumed) << std::endl; diff --git a/src/h2load_http3_session.h b/src/h2load_http3_session.h index 48b4f80c..cd3d96ba 100644 --- a/src/h2load_http3_session.h +++ b/src/h2load_http3_session.h @@ -58,8 +58,8 @@ public: int extend_max_local_streams(); int64_t submit_request_internal(); - ssize_t read_stream(int64_t stream_id, const uint8_t *data, size_t datalen, - int fin); + ssize_t read_stream(uint32_t flags, int64_t stream_id, const uint8_t *data, + size_t datalen); ssize_t write_stream(int64_t &stream_id, int &fin, nghttp3_vec *vec, size_t veccnt); int block_stream(int64_t stream_id); diff --git a/src/h2load_quic.cc b/src/h2load_quic.cc index 6fb587dc..10b7caa4 100644 --- a/src/h2load_quic.cc +++ b/src/h2load_quic.cc @@ -71,11 +71,11 @@ int handshake_completed(ngtcp2_conn *conn, void *user_data) { int Client::quic_handshake_completed() { return connection_made(); } namespace { -int recv_stream_data(ngtcp2_conn *conn, int64_t stream_id, int fin, +int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id, uint64_t offset, const uint8_t *data, size_t datalen, void *user_data, void *stream_user_data) { auto c = static_cast(user_data); - if (c->quic_recv_stream_data(stream_id, fin, data, datalen) != 0) { + if (c->quic_recv_stream_data(flags, stream_id, data, datalen) != 0) { // TODO Better to do this gracefully rather than // NGTCP2_ERR_CALLBACK_FAILURE. Perhaps, call // ngtcp2_conn_write_application_close() ? @@ -85,14 +85,14 @@ int recv_stream_data(ngtcp2_conn *conn, int64_t stream_id, int fin, } } // namespace -int Client::quic_recv_stream_data(int64_t stream_id, int fin, +int Client::quic_recv_stream_data(uint32_t flags, int64_t stream_id, const uint8_t *data, size_t datalen) { if (worker->current_phase == Phase::MAIN_DURATION) { worker->stats.bytes_total += datalen; } auto s = static_cast(session.get()); - auto nconsumed = s->read_stream(stream_id, data, datalen, fin); + auto nconsumed = s->read_stream(flags, stream_id, data, datalen); if (nconsumed == -1) { return -1; } @@ -570,10 +570,15 @@ int Client::write_quic() { auto v = vec.data(); auto vcnt = static_cast(sveccnt); + uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE; + if (fin) { + flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; + } + auto nwrite = ngtcp2_conn_writev_stream( - quic.conn, &ps.path, buf.data(), quic.max_pktlen, &ndatalen, - NGTCP2_WRITE_STREAM_FLAG_MORE, stream_id, fin, - reinterpret_cast(v), vcnt, timestamp(worker->loop)); + quic.conn, &ps.path, buf.data(), quic.max_pktlen, &ndatalen, flags, + stream_id, reinterpret_cast(v), vcnt, + timestamp(worker->loop)); if (nwrite < 0) { switch (nwrite) { case NGTCP2_ERR_STREAM_DATA_BLOCKED: