asio: read_cb: Use similar semantics with nghttp2_data_source_read_callback

This commit is contained in:
Tatsuhiro Tsujikawa 2015-03-04 22:45:29 +09:00
parent 44642af227
commit 7fb7575f78
8 changed files with 41 additions and 58 deletions

View File

@ -61,11 +61,15 @@ void request_impl::call_on_close(uint32_t error_code) {
void request_impl::on_read(read_cb cb) { read_cb_ = std::move(cb); } void request_impl::on_read(read_cb cb) { read_cb_ = std::move(cb); }
read_cb::result_type request_impl::call_on_read(uint8_t *buf, std::size_t len) { read_cb::result_type request_impl::call_on_read(uint8_t *buf, std::size_t len,
uint32_t *data_flags) {
if (read_cb_) { if (read_cb_) {
return read_cb_(buf, len); return read_cb_(buf, len, data_flags);
} }
return read_cb::result_type{};
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
} }
void request_impl::header(header_map h) { header_ = std::move(h); } void request_impl::header(header_map h) { header_ = std::move(h); }

View File

@ -55,7 +55,8 @@ public:
void call_on_close(uint32_t error_code); void call_on_close(uint32_t error_code);
void on_read(read_cb cb); void on_read(read_cb cb);
read_cb::result_type call_on_read(uint8_t *buf, std::size_t len); read_cb::result_type call_on_read(uint8_t *buf, std::size_t len,
uint32_t *data_flags);
void header(header_map h); void header(header_map h);
header_map &header(); header_map &header();

View File

@ -409,18 +409,7 @@ const request *session_impl::submit(boost::system::error_code &ec,
size_t length, uint32_t *data_flags, nghttp2_data_source *source, size_t length, uint32_t *data_flags, nghttp2_data_source *source,
void *user_data) -> ssize_t { void *user_data) -> ssize_t {
auto strm = static_cast<stream *>(source->ptr); auto strm = static_cast<stream *>(source->ptr);
auto rv = strm->request().impl().call_on_read(buf, length); return strm->request().impl().call_on_read(buf, length, data_flags);
if (rv.first < 0) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
if (rv.second) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
} else if (rv.first == 0) {
return NGHTTP2_ERR_DEFERRED;
}
return rv.first;
}; };
prdptr = &prd; prdptr = &prd;
} }

View File

@ -49,11 +49,16 @@ boost::system::error_code make_error_code(nghttp2_error ev) {
read_cb string_reader(std::string data) { read_cb string_reader(std::string data) {
auto strio = std::make_shared<std::pair<std::string, size_t>>(std::move(data), auto strio = std::make_shared<std::pair<std::string, size_t>>(std::move(data),
data.size()); data.size());
return [strio](uint8_t *buf, size_t len) { return [strio](uint8_t *buf, size_t len, uint32_t *data_flags) {
auto n = std::min(len, strio->second); auto &data = strio->first;
std::copy_n(strio->first.c_str(), n, buf); auto &left = strio->second;
strio->second -= n; auto n = std::min(len, left);
return std::make_pair(n, strio->second == 0); std::copy_n(data.c_str() + data.size() - left, n, buf);
left -= n;
if (left == 0) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
return n;
}; };
} }

View File

@ -177,13 +177,15 @@ const header_map &response_impl::header() const { return header_; }
void response_impl::stream(http2_stream *s) { stream_ = s; } void response_impl::stream(http2_stream *s) { stream_ = s; }
std::pair<ssize_t, bool> response_impl::call_read(uint8_t *data, read_cb::result_type response_impl::call_read(uint8_t *data, std::size_t len,
std::size_t len) { uint32_t *data_flags) {
if (read_cb_) { if (read_cb_) {
return read_cb_(data, len); return read_cb_(data, len, data_flags);
} }
return std::make_pair(0, true); *data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
} }
http2_stream::http2_stream(http2_handler *h, int32_t stream_id) http2_stream::http2_stream(http2_handler *h, int32_t stream_id)
@ -481,18 +483,7 @@ int http2_handler::start_response(http2_stream &stream) {
size_t length, uint32_t *data_flags, nghttp2_data_source *source, size_t length, uint32_t *data_flags, nghttp2_data_source *source,
void *user_data) -> ssize_t { void *user_data) -> ssize_t {
auto &stream = *static_cast<http2_stream *>(source->ptr); auto &stream = *static_cast<http2_stream *>(source->ptr);
auto rv = stream.response().impl().call_read(buf, length); return stream.response().impl().call_read(buf, length, data_flags);
if (rv.first < 0) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
if (rv.second) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
} else if (rv.first == 0) {
return NGHTTP2_ERR_DEFERRED;
}
return rv.first;
}; };
rv = nghttp2_submit_response(session_, stream.get_stream_id(), nva.data(), rv = nghttp2_submit_response(session_, stream.get_stream_id(), nva.data(),

View File

@ -91,7 +91,8 @@ public:
const header_map &header() const; const header_map &header() const;
bool started() const; bool started() const;
void stream(http2_stream *s); void stream(http2_stream *s);
read_cb::result_type call_read(uint8_t *data, std::size_t len); read_cb::result_type call_read(uint8_t *data, std::size_t len,
uint32_t *data_flags);
private: private:
http2_stream *stream_; http2_stream *stream_;

View File

@ -139,20 +139,21 @@ read_cb file_reader(const std::string &path) {
read_cb file_reader_from_fd(int fd) { read_cb file_reader_from_fd(int fd) {
auto d = defer_shared(close, fd); auto d = defer_shared(close, fd);
return [fd, d](uint8_t *buf, size_t len) -> read_cb::result_type { return [fd, d](uint8_t *buf, size_t len, uint32_t *data_flags)
int rv; -> read_cb::result_type {
while ((rv = read(fd, buf, len)) == -1 && errno == EINTR) ssize_t n;
while ((n = read(fd, buf, len)) == -1 && errno == EINTR)
; ;
if (rv == -1) { if (n == -1) {
return std::make_pair(-1, false); return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
} }
if (rv == 0) { if (n == 0) {
return std::make_pair(rv, true); *data_flags |= NGHTTP2_DATA_FLAG_EOF;
} }
return std::make_pair(rv, false); return n;
}; };
} }

View File

@ -76,18 +76,9 @@ typedef std::function<void(void)> void_cb;
typedef std::function<void(const boost::system::error_code &ec)> error_cb; typedef std::function<void(const boost::system::error_code &ec)> error_cb;
typedef std::function<void(uint32_t)> close_cb; typedef std::function<void(uint32_t)> close_cb;
// Callback function to generate response body. The implementation of // Callback function to generate response body. TBD
// this callback must fill at most |len| bytes data to |buf|. The typedef std::function<
// return value is pair of written bytes and bool value indicating ssize_t(uint8_t *buf, std::size_t len, uint32_t *data_flags)> read_cb;
// that this is the end of the body. If the end of the body was
// reached, return true. If there is error and application wants to
// terminate stream, return std::make_pair(-1, false). Returning
// std::make_pair(0, false) tells the library that don't call this
// callback until application calls response::resume(). This is
// useful when there is no data to send at the moment but there will
// be more to come in near future.
typedef std::function<std::pair<ssize_t, bool>(uint8_t *buf, std::size_t len)>
read_cb;
// Convenient function to create function to read file denoted by // Convenient function to create function to read file denoted by
// |path|. This can be passed to response::end(). // |path|. This can be passed to response::end().