Skip to content

Commit

Permalink
both: review writecb
Browse files Browse the repository at this point in the history
  • Loading branch information
liudongmiao committed Jun 12, 2024
1 parent dd58b82 commit 414c9b5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 48 deletions.
103 changes: 66 additions & 37 deletions common.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,59 +840,88 @@ void tunnel_ss(struct bufferevent *raw, struct bufferevent *tev) {
raw->readcb(raw, raw->cbarg);
}

void udp_read_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg) {
struct bufferevent *raw = arg;
(void) buf;
if (info->n_added > 0) {
raw->readcb(raw, raw->cbarg);
}
}

void bev_context_udp_writecb(evutil_socket_t fd, short event, void *arg) {
int err;
size_t size;
ssize_t res;
unsigned length;
uint16_t payload_length;
struct evbuffer *buf;
struct bufferevent *raw;
struct bev_context_udp *bev_context_udp;
struct udp_frame udp_frame;
short what = BEV_EVENT_WRITING;

(void) event;
raw = arg;
bev_context_udp = bufferevent_get_context(raw);
buf = raw->output;
size = evbuffer_get_length(buf);
while (size > 0) {
if (size < UDP_FRAME_LENGTH_SIZE) {
break;
}
if (evbuffer_copyout(buf, &udp_frame, UDP_FRAME_LENGTH_SIZE) != UDP_FRAME_LENGTH_SIZE) {
LOGE("cannot copy udp to get payload length for %d", get_port(bev_context_udp->sockaddr));
raw->errorcb(raw, BEV_EVENT_ERROR, raw->cbarg);
break;
}
payload_length = htons(udp_frame.length);
length = payload_length + UDP_FRAME_LENGTH_SIZE;
if (size < length) {
break;
}
if (evbuffer_copyout(buf, &udp_frame, length) != (int) length) {
LOGE("cannot copy udp %d for %d", (int) length, get_port(bev_context_udp->sockaddr));
raw->errorcb(raw, BEV_EVENT_ERROR, raw->cbarg);
break;
}
if (sendto(fd, udp_frame.buffer, payload_length, 0, bev_context_udp->sockaddr, bev_context_udp->socklen) < 0) {
// is there any chance to sendto later?
int socket_error = evutil_socket_geterror(bev_context_udp->sock);
LOGE("cannot send udp to %d: %s",
get_port(bev_context_udp->sockaddr), evutil_socket_error_to_string(socket_error));
raw->errorcb(raw, BEV_EVENT_ERROR, raw->cbarg);
break;

if (size < UDP_FRAME_LENGTH_SIZE) {
goto reschedule;
}
if (evbuffer_copyout(buf, &udp_frame, UDP_FRAME_LENGTH_SIZE) != UDP_FRAME_LENGTH_SIZE) {
LOGE("cannot copy udp to get payload length for %d", get_port(bev_context_udp->sockaddr));
what |= BEV_EVENT_ERROR;
goto error;
}
payload_length = htons(udp_frame.length);
length = payload_length + UDP_FRAME_LENGTH_SIZE;
if (size < length) {
goto reschedule;
}
if (evbuffer_copyout(buf, &udp_frame, length) != (ssize_t) length) {
LOGE("cannot copy udp %d for %d", (int) length, get_port(bev_context_udp->sockaddr));
what |= BEV_EVENT_ERROR;
goto error;
}
res = sendto(fd, udp_frame.buffer, payload_length, 0, bev_context_udp->sockaddr, bev_context_udp->socklen);
if (res < 0) {
err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err)) {
goto reschedule;
}
LOGD("udp sent %d to peer %d", payload_length, get_port(bev_context_udp->sockaddr));
evbuffer_drain(buf, length);
size -= length;
LOGW("cannot send udp to %d: %s", get_port(bev_context_udp->sockaddr), evutil_socket_error_to_string(err));
what |= BEV_EVENT_ERROR;
goto error;
}
if (res == 0) {
what |= BEV_EVENT_EOF;
goto error;
}
if (res != payload_length) {
LOGW("cannot send entire udp packet to %d", get_port(bev_context_udp->sockaddr));
what |= BEV_EVENT_ERROR;
goto error;
}
LOGD("udp sent %d to peer %d", payload_length, get_port(bev_context_udp->sockaddr));
evbuffer_drain(buf, length);

if (evbuffer_get_length(buf) == 0) {
event_del(&raw->ev_write);
}

if (raw->writecb) {
raw->writecb(raw, raw->cbarg);
}

goto done;

reschedule:
if (evbuffer_get_length(buf) == 0) {
event_del(&raw->ev_write);
}
goto done;

error:
bufferevent_disable(raw, EV_WRITE);
if (raw->errorcb) {
raw->errorcb(raw, what, raw->cbarg);
}

done:
return;
}

ssize_t udp_read(evutil_socket_t sock, struct udp_frame *udp_frame, struct sockaddr *sockaddr, ev_socklen_t *socklen) {
Expand Down
2 changes: 0 additions & 2 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ void tunnel_ss(struct bufferevent *raw, struct bufferevent *tev);

ssize_t udp_read(evutil_socket_t sock, struct udp_frame *udp_frame, struct sockaddr *sockaddr, ev_socklen_t *socklen);

void udp_read_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg);

void bev_context_udp_writecb(evutil_socket_t fd, short event, void *arg);

#ifdef HAVE_SSL_CTX_SET_KEYLOG_CALLBACK
Expand Down
23 changes: 14 additions & 9 deletions wss-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ static void http2_readcb(evutil_socket_t sock, short event, void *context) {
lh_bufferevent_http_stream_doall(http_streams, evict_http2_stream);
lh_bufferevent_http_stream_set_down_load(http_streams, hash_factor);
}
if (wss_context->ssl_connected) {
if (wss_context->ssl_connected && evbuffer_get_length(wss_context->output)) {
do_http2_write(wss_context, wss_context->output);
}
}
Expand Down Expand Up @@ -1258,6 +1258,7 @@ static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) {
short what = BEV_EVENT_WRITING;
struct bufferevent *bev = arg;
struct bev_context_ssl *bev_context_ssl;
struct wss_context *wss_context;

if (event == EV_TIMEOUT) {
what |= BEV_EVENT_TIMEOUT;
Expand All @@ -1274,10 +1275,14 @@ static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) {
default:
break;
}

if (bev_context_ssl && bev_context_ssl->http == http2) {
res = do_http2_write(bev_context_ssl->wss_context, bev_context_ssl->wss_context->output);
if (res < 0) {
goto write;
wss_context = bev_context_ssl->wss_context;
if (evbuffer_get_length(wss_context->output)) {
res = do_http2_write(wss_context, wss_context->output);
if (res <= 0) {
goto check;
}
}
}

Expand All @@ -1288,23 +1293,23 @@ static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) {
goto error;
}
res = do_write(bev, fd, buffer, size);
if (res > 0) {
evbuffer_drain(bev->output, res);
if (res <= 0) {
goto check;
}
goto write;
evbuffer_drain(bev->output, res);
}

if (evbuffer_get_length(bev->output) == 0) {
event_del(&bev->ev_write);
}

if (bev->writecb && evbuffer_get_length(bev->output) == 0) {
if (bev->writecb) {
bev->writecb(bev, bev->cbarg);
}

goto done;

write:
check:
if (res == WSS_AGAIN) {
goto reschedule;
} else if (res == WSS_ERROR) {
Expand Down

0 comments on commit 414c9b5

Please sign in to comment.