Add nghttp2_session_mem_send() API function

This function behaves like nghttp2_session_send(), but it does not
use nghttp2_send_callback to send data. Instead, it returns the
serialized data to trasmit and its length to the caller.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-02-18 23:23:11 +09:00
parent a9991133af
commit 649586fff6
6 changed files with 200 additions and 157 deletions

View File

@ -14,9 +14,8 @@ The header files are also available online: :doc:`nghttp2.h` and
Remarks
-------
Do not call `nghttp2_session_send`, `nghttp2_session_recv` or
`nghttp2_session_mem_recv` from the nghttp2 callback functions
directly or indirectly. It will lead to the crash. You can submit
requests or frames in the callbacks then call `nghttp2_session_send`,
`nghttp2_session_recv` or `nghttp2_session_mem_recv` outside of the
callbacks.
Do not call `nghttp2_session_send`, `nghttp2_session_mem_send`,
`nghttp2_session_recv` or `nghttp2_session_mem_recv` from the nghttp2
callback functions directly or indirectly. It will lead to the
crash. You can submit requests or frames in the callbacks then call
these functions outside the callbacks.

View File

@ -882,6 +882,11 @@ typedef union {
* must return :enum:`NGHTTP2_ERR_CALLBACK_FAILURE`. The |user_data|
* pointer is the third argument passed in to the call to
* `nghttp2_session_client_new()` or `nghttp2_session_server_new()`.
*
* This callback is required if the application uses
* `nghttp2_session_send()` to send data to the remote endpoint. If
* the application uses `nghttp2_session_mem_send()` instead, this
* callback function is unnecessary.
*/
typedef ssize_t (*nghttp2_send_callback)
(nghttp2_session *session,
@ -902,6 +907,11 @@ typedef ssize_t (*nghttp2_send_callback)
* :enum:`NGHTTP2_ERR_WOULDBLOCK`. The |user_data| pointer is the
* third argument passed in to the call to
* `nghttp2_session_client_new()` or `nghttp2_session_server_new()`.
*
* This callback is required if the application uses
* `nghttp2_session_recv()` to receive data from the remote
* endpoint. If the application uses `nghttp2_session_mem_recv()`
* instead, this callback function is unnecessary.
*/
typedef ssize_t (*nghttp2_recv_callback)
(nghttp2_session *session,
@ -1204,12 +1214,16 @@ typedef ssize_t (*nghttp2_select_padding_callback)
typedef struct {
/**
* Callback function invoked when the |session| wants to send data
* to the remote peer.
* to the remote peer. This callback is not necessary if the
* application uses `nghttp2_session_mem_send()` to serialize data
* to transmit.
*/
nghttp2_send_callback send_callback;
/**
* Callback function invoked when the |session| wants to receive
* data from the remote peer.
* data from the remote peer. This callback is not necessary if the
* application uses `nghttp2_session_mem_recv()` to process received
* data.
*/
nghttp2_recv_callback recv_callback;
/**
@ -1478,6 +1492,39 @@ void nghttp2_session_del(nghttp2_session *session);
*/
int nghttp2_session_send(nghttp2_session *session);
/**
* @function
*
* Returns the serialized data to send.
*
* This function behaves like `nghttp2_session_mem_send()` except that
* it does not use :member:`nghttp2_session_callbacks.send_callback`
* to transmit data. Instead, it assigns the pointer to the serialized
* data to the |*data_ptr| and returns its length. The other callbacks
* are called in the same way as they are in `nghttp2_session_send()`.
*
* If no data is available to send, this function returns 0.
*
* This function may not return all serialized data in one
* invocation. To get all data, call this function repeatedly until it
* returns 0 or one of negative error codes.
*
* The assigned |*data_ptr| is valid until the next call of
* `nghttp2_session_mem_send()` or `nghttp2_session_send()`.
*
* The caller must send all data before sending the next chunk of
* data.
*
* This function returns the length of the data pointed by the
* |*data_ptr| if it succeeds, or one of the following negative error
* codes:
*
* :enum:`NGHTTP2_ERR_NOMEM`
* Out of memory.
*/
ssize_t nghttp2_session_mem_send(nghttp2_session *session,
const uint8_t **data_ptr);
/**
* @function
*

View File

@ -188,6 +188,18 @@ static void init_settings(uint32_t *settings)
NGHTTP2_INITIAL_WINDOW_SIZE;
}
static void nghttp2_active_outbound_item_reset
(nghttp2_active_outbound_item *aob)
{
DEBUGF(fprintf(stderr, "reset nghttp2_active_outbound_item\n"));
DEBUGF(fprintf(stderr, "aob->item = %p\n", aob->item));
nghttp2_outbound_item_free(aob->item);
free(aob->item);
aob->item = NULL;
aob->framebuflen = aob->framebufoff = aob->framebufmark = 0;
aob->state = NGHTTP2_OB_POP_ITEM;
}
typedef struct {
nghttp2_session *session;
int rv;
@ -265,6 +277,7 @@ static int nghttp2_session_new(nghttp2_session **session_ptr,
goto fail_aob_framebuf;
}
(*session_ptr)->aob.framebufmax = NGHTTP2_INITIAL_OUTBOUND_FRAMEBUF_LENGTH;
nghttp2_active_outbound_item_reset(&(*session_ptr)->aob);
memset((*session_ptr)->remote_settings, 0,
sizeof((*session_ptr)->remote_settings));
@ -370,17 +383,6 @@ static void nghttp2_session_ob_pq_free(nghttp2_pq *pq)
nghttp2_pq_free(pq);
}
static void nghttp2_active_outbound_item_reset
(nghttp2_active_outbound_item *aob)
{
DEBUGF(fprintf(stderr, "reset nghttp2_active_outbound_item\n"));
DEBUGF(fprintf(stderr, "aob->item = %p\n", aob->item));
nghttp2_outbound_item_free(aob->item);
free(aob->item);
aob->item = NULL;
aob->framebuflen = aob->framebufoff = aob->framebufmark = 0;
}
void nghttp2_session_del(nghttp2_session *session)
{
if(session == NULL) {
@ -1825,24 +1827,26 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session)
assert(0);
}
int nghttp2_session_send(nghttp2_session *session)
ssize_t nghttp2_session_mem_send(nghttp2_session *session,
const uint8_t **data_ptr)
{
int r;
while(1) {
const uint8_t *data;
size_t datalen;
ssize_t sentlen;
if(session->aob.item == NULL) {
int rv;
*data_ptr = NULL;
for(;;) {
switch(session->aob.state) {
case NGHTTP2_OB_POP_ITEM: {
nghttp2_outbound_item *item;
ssize_t framebuflen;
item = nghttp2_session_pop_next_ob_item(session);
if(item == NULL) {
break;
return 0;
}
framebuflen = nghttp2_session_prep_frame(session, item);
if(framebuflen == NGHTTP2_ERR_DEFERRED) {
DEBUGF(fprintf(stderr, "frame deferred\n"));
continue;
break;
}
if(framebuflen < 0) {
DEBUGF(fprintf(stderr, "frame preparation failed with %s\n",
@ -1875,9 +1879,8 @@ int nghttp2_session_send(nghttp2_session *session)
}
if(nghttp2_is_fatal(framebuflen)) {
return framebuflen;
} else {
continue;
}
break;
}
session->aob.item = item;
session->aob.framebuflen = framebuflen;
@ -1891,37 +1894,66 @@ int nghttp2_session_send(nghttp2_session *session)
session->aob.framebufmark =
session->aob.framebufoff + NGHTTP2_FRAME_HEAD_LENGTH +
nghttp2_get_uint16(session->aob.framebuf + session->aob.framebufoff);
r = session_call_before_frame_send(session, frame);
if(nghttp2_is_fatal(r)) {
return r;
rv = session_call_before_frame_send(session, frame);
if(nghttp2_is_fatal(rv)) {
return rv;
}
} else {
session->aob.framebufmark = session->aob.framebuflen;
}
session->aob.state = NGHTTP2_OB_SEND_DATA;
break;
}
case NGHTTP2_OB_SEND_DATA: {
size_t datalen;
data = session->aob.framebuf + session->aob.framebufoff;
datalen = session->aob.framebufmark - session->aob.framebufoff;
if(session->aob.framebufoff == session->aob.framebufmark) {
/* Frame has completely sent */
rv = nghttp2_session_after_frame_sent(session);
if(rv < 0) {
/* FATAL */
assert(nghttp2_is_fatal(rv));
return rv;
}
/* We have already adjusted the next state */
break;
}
*data_ptr = session->aob.framebuf + session->aob.framebufoff;
datalen = session->aob.framebufmark - session->aob.framebufoff;
/* We increment the offset here. If send_callback does not send
everything, we will adjust it. */
session->aob.framebufoff += datalen;
return datalen;
}
}
}
}
int nghttp2_session_send(nghttp2_session *session)
{
const uint8_t *data;
ssize_t datalen;
ssize_t sentlen;
for(;;) {
datalen = nghttp2_session_mem_send(session, &data);
if(datalen <= 0) {
return datalen;
}
sentlen = session->callbacks.send_callback(session, data, datalen, 0,
session->user_data);
if(sentlen < 0) {
if(sentlen == NGHTTP2_ERR_WOULDBLOCK) {
/* Transmission canceled. Rewind the offset */
session->aob.framebufoff -= datalen;
return 0;
} else {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
} else {
session->aob.framebufoff += sentlen;
if(session->aob.framebufoff == session->aob.framebufmark) {
/* Frame has completely sent */
r = nghttp2_session_after_frame_sent(session);
if(r < 0) {
/* FATAL */
assert(r < NGHTTP2_ERR_FATAL);
return r;
}
}
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
/* Rewind the offset to the amount of unsent bytes */
session->aob.framebufoff -= datalen - sentlen;
}
return 0;
}

View File

@ -47,6 +47,11 @@ typedef enum {
NGHTTP2_OPTMASK_NO_AUTO_CONNECTION_WINDOW_UPDATE = 1 << 1
} nghttp2_optmask;
typedef enum {
NGHTTP2_OB_POP_ITEM,
NGHTTP2_OB_SEND_DATA
} nghttp2_outbound_state;
typedef struct {
nghttp2_outbound_item *item;
/* Buffer for outbound frames. Used to pack one frame. The memory
@ -63,6 +68,7 @@ typedef struct {
/* Marks the last position to send. This is used to implement
CONTINUATION */
size_t framebufmark;
nghttp2_outbound_state state;
} nghttp2_active_outbound_item;
/* Buffer length for inbound raw byte stream. */

View File

@ -687,30 +687,6 @@ void call_downstream_readcb(Http2Session *http2session, Downstream *downstream)
}
} // namespace
namespace {
ssize_t send_callback(nghttp2_session *session,
const uint8_t *data, size_t len, int flags,
void *user_data)
{
int rv;
auto http2session = static_cast<Http2Session*>(user_data);
auto bev = http2session->get_bev();
auto output = bufferevent_get_output(bev);
// Check buffer length and return WOULDBLOCK if it is large enough.
if(evbuffer_get_length(output) > Http2Session::OUTBUF_MAX_THRES) {
return NGHTTP2_ERR_WOULDBLOCK;
}
rv = evbuffer_add(output, data, len);
if(rv == -1) {
SSLOG(FATAL, http2session) << "evbuffer_add() failed";
return NGHTTP2_ERR_CALLBACK_FAILURE;
} else {
return len;
}
}
} // namespace
namespace {
int on_stream_close_callback
(nghttp2_session *session, int32_t stream_id, nghttp2_error_code error_code,
@ -1183,7 +1159,6 @@ int Http2Session::on_connect()
}
nghttp2_session_callbacks callbacks;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.send_callback = send_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
callbacks.on_frame_recv_callback = on_frame_recv_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
@ -1271,21 +1246,7 @@ int Http2Session::on_read()
return -1;
}
evbuffer_drain(input, rv);
rv = nghttp2_session_send(session_);
if(rv < 0) {
SSLOG(ERROR, this) << "nghttp2_session_send() returned error: "
<< nghttp2_strerror(rv);
return -1;
}
if(nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 &&
evbuffer_get_length(bufferevent_get_output(bev_)) == 0) {
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "No more read/write for this session";
}
return -1;
}
return 0;
return send();
}
int Http2Session::on_write()
@ -1295,22 +1256,41 @@ int Http2Session::on_write()
int Http2Session::send()
{
int rv = 0;
if((rv = nghttp2_session_send(session_)) < 0) {
SSLOG(ERROR, this) << "nghttp2_session_send() returned error: "
<< nghttp2_strerror(rv);
}
if(rv == 0) {
if(nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 &&
evbuffer_get_length(bufferevent_get_output(bev_)) == 0) {
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "No more read/write for this session";
}
rv = -1;
int rv;
auto output = bufferevent_get_output(bev_);
for(;;) {
// Check buffer length and return WOULDBLOCK if it is large enough.
if(evbuffer_get_length(output) > Http2Session::OUTBUF_MAX_THRES) {
return NGHTTP2_ERR_WOULDBLOCK;
}
const uint8_t *data;
auto datalen = nghttp2_session_mem_send(session_, &data);
if(datalen < 0) {
SSLOG(ERROR, this) << "nghttp2_session_mem_send() returned error: "
<< nghttp2_strerror(datalen);
break;
}
if(datalen == 0) {
break;
}
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
SSLOG(FATAL, this) << "evbuffer_add() failed";
return -1;
}
}
return rv;
if(nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 &&
evbuffer_get_length(bufferevent_get_output(bev_)) == 0) {
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "No more read/write for this session";
}
return -1;
}
return 0;
}
void Http2Session::clear_notify()

View File

@ -49,31 +49,6 @@ namespace {
const size_t OUTBUF_MAX_THRES = 64*1024;
} // namespace
namespace {
ssize_t send_callback(nghttp2_session *session,
const uint8_t *data, size_t len, int flags,
void *user_data)
{
int rv;
auto upstream = static_cast<Http2Upstream*>(user_data);
auto handler = upstream->get_client_handler();
auto bev = handler->get_bev();
auto output = bufferevent_get_output(bev);
// Check buffer length and return WOULDBLOCK if it is large enough.
if(handler->get_outbuf_length() > OUTBUF_MAX_THRES) {
return NGHTTP2_ERR_WOULDBLOCK;
}
rv = evbuffer_add(output, data, len);
if(rv == -1) {
ULOG(FATAL, upstream) << "evbuffer_add() failed";
return NGHTTP2_ERR_CALLBACK_FAILURE;
} else {
return len;
}
}
} // namespace
namespace {
int on_stream_close_callback
(nghttp2_session *session, int32_t stream_id, nghttp2_error_code error_code,
@ -510,7 +485,6 @@ Http2Upstream::Http2Upstream(ClientHandler *handler)
nghttp2_session_callbacks callbacks;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.send_callback = send_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
callbacks.on_frame_recv_callback = on_frame_recv_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
@ -580,21 +554,7 @@ int Http2Upstream::on_read()
return -1;
}
evbuffer_drain(input, rv);
rv = nghttp2_session_send(session_);
if(rv < 0) {
ULOG(ERROR, this) << "nghttp2_session_send() returned error: "
<< nghttp2_strerror(rv);
return -1;
}
if(nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 &&
handler_->get_outbuf_length() == 0) {
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
}
rv = -1;
}
return 0;
return send();
}
int Http2Upstream::on_write()
@ -605,22 +565,41 @@ int Http2Upstream::on_write()
// After this function call, downstream may be deleted.
int Http2Upstream::send()
{
int rv = 0;
if((rv = nghttp2_session_send(session_)) < 0) {
ULOG(ERROR, this) << "nghttp2_session_send() returned error: "
<< nghttp2_strerror(rv);
}
if(rv == 0) {
if(nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 &&
handler_->get_outbuf_length() == 0) {
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
}
rv = -1;
int rv;
auto bev = handler_->get_bev();
auto output = bufferevent_get_output(bev);
for(;;) {
// Check buffer length and return WOULDBLOCK if it is large enough.
if(handler_->get_outbuf_length() > OUTBUF_MAX_THRES) {
break;
}
const uint8_t *data;
auto datalen = nghttp2_session_mem_send(session_, &data);
if(datalen < 0) {
ULOG(ERROR, this) << "nghttp2_session_mem_send() returned error: "
<< nghttp2_strerror(datalen);
return -1;
}
if(datalen == 0) {
break;
}
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
ULOG(FATAL, this) << "evbuffer_add() failed";
return -1;
}
}
return rv;
if(nghttp2_session_want_read(session_) == 0 &&
nghttp2_session_want_write(session_) == 0 &&
handler_->get_outbuf_length() == 0) {
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "No more read/write for this HTTP2 session";
}
return -1;
}
return 0;
}
int Http2Upstream::on_event()