diff --git a/common.c b/common.c index 45e3a9b..4d6a358 100644 --- a/common.c +++ b/common.c @@ -840,59 +840,88 @@ void tunnel_ss(struct bufferevent *raw, struct bufferevent *tev) { raw->readcb(raw, raw->cbarg); } -void udp_read_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg) { - struct bufferevent *raw = arg; - (void) buf; - if (info->n_added > 0) { - raw->readcb(raw, raw->cbarg); - } -} - void bev_context_udp_writecb(evutil_socket_t fd, short event, void *arg) { + int err; size_t size; + ssize_t res; unsigned length; uint16_t payload_length; struct evbuffer *buf; struct bufferevent *raw; struct bev_context_udp *bev_context_udp; struct udp_frame udp_frame; + short what = BEV_EVENT_WRITING; (void) event; raw = arg; bev_context_udp = bufferevent_get_context(raw); buf = raw->output; size = evbuffer_get_length(buf); - while (size > 0) { - if (size < UDP_FRAME_LENGTH_SIZE) { - break; - } - if (evbuffer_copyout(buf, &udp_frame, UDP_FRAME_LENGTH_SIZE) != UDP_FRAME_LENGTH_SIZE) { - LOGE("cannot copy udp to get payload length for %d", get_port(bev_context_udp->sockaddr)); - raw->errorcb(raw, BEV_EVENT_ERROR, raw->cbarg); - break; - } - payload_length = htons(udp_frame.length); - length = payload_length + UDP_FRAME_LENGTH_SIZE; - if (size < length) { - break; - } - if (evbuffer_copyout(buf, &udp_frame, length) != (int) length) { - LOGE("cannot copy udp %d for %d", (int) length, get_port(bev_context_udp->sockaddr)); - raw->errorcb(raw, BEV_EVENT_ERROR, raw->cbarg); - break; - } - if (sendto(fd, udp_frame.buffer, payload_length, 0, bev_context_udp->sockaddr, bev_context_udp->socklen) < 0) { - // is there any chance to sendto later? - int socket_error = evutil_socket_geterror(bev_context_udp->sock); - LOGE("cannot send udp to %d: %s", - get_port(bev_context_udp->sockaddr), evutil_socket_error_to_string(socket_error)); - raw->errorcb(raw, BEV_EVENT_ERROR, raw->cbarg); - break; + + if (size < UDP_FRAME_LENGTH_SIZE) { + goto reschedule; + } + if (evbuffer_copyout(buf, &udp_frame, UDP_FRAME_LENGTH_SIZE) != UDP_FRAME_LENGTH_SIZE) { + LOGE("cannot copy udp to get payload length for %d", get_port(bev_context_udp->sockaddr)); + what |= BEV_EVENT_ERROR; + goto error; + } + payload_length = htons(udp_frame.length); + length = payload_length + UDP_FRAME_LENGTH_SIZE; + if (size < length) { + goto reschedule; + } + if (evbuffer_copyout(buf, &udp_frame, length) != (ssize_t) length) { + LOGE("cannot copy udp %d for %d", (int) length, get_port(bev_context_udp->sockaddr)); + what |= BEV_EVENT_ERROR; + goto error; + } + res = sendto(fd, udp_frame.buffer, payload_length, 0, bev_context_udp->sockaddr, bev_context_udp->socklen); + if (res < 0) { + err = evutil_socket_geterror(fd); + if (EVUTIL_ERR_RW_RETRIABLE(err)) { + goto reschedule; } - LOGD("udp sent %d to peer %d", payload_length, get_port(bev_context_udp->sockaddr)); - evbuffer_drain(buf, length); - size -= length; + LOGW("cannot send udp to %d: %s", get_port(bev_context_udp->sockaddr), evutil_socket_error_to_string(err)); + what |= BEV_EVENT_ERROR; + goto error; } + if (res == 0) { + what |= BEV_EVENT_EOF; + goto error; + } + if (res != payload_length) { + LOGW("cannot send entire udp packet to %d", get_port(bev_context_udp->sockaddr)); + what |= BEV_EVENT_ERROR; + goto error; + } + LOGD("udp sent %d to peer %d", payload_length, get_port(bev_context_udp->sockaddr)); + evbuffer_drain(buf, length); + + if (evbuffer_get_length(buf) == 0) { + event_del(&raw->ev_write); + } + + if (raw->writecb) { + raw->writecb(raw, raw->cbarg); + } + + goto done; + +reschedule: + if (evbuffer_get_length(buf) == 0) { + event_del(&raw->ev_write); + } + goto done; + +error: + bufferevent_disable(raw, EV_WRITE); + if (raw->errorcb) { + raw->errorcb(raw, what, raw->cbarg); + } + +done: + return; } ssize_t udp_read(evutil_socket_t sock, struct udp_frame *udp_frame, struct sockaddr *sockaddr, ev_socklen_t *socklen) { diff --git a/common.h b/common.h index 1783168..4861d5a 100644 --- a/common.h +++ b/common.h @@ -219,8 +219,6 @@ void tunnel_ss(struct bufferevent *raw, struct bufferevent *tev); ssize_t udp_read(evutil_socket_t sock, struct udp_frame *udp_frame, struct sockaddr *sockaddr, ev_socklen_t *socklen); -void udp_read_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg); - void bev_context_udp_writecb(evutil_socket_t fd, short event, void *arg); #ifdef HAVE_SSL_CTX_SET_KEYLOG_CALLBACK diff --git a/wss-client.c b/wss-client.c index 63feeef..1c59281 100644 --- a/wss-client.c +++ b/wss-client.c @@ -1112,7 +1112,7 @@ static void http2_readcb(evutil_socket_t sock, short event, void *context) { lh_bufferevent_http_stream_doall(http_streams, evict_http2_stream); lh_bufferevent_http_stream_set_down_load(http_streams, hash_factor); } - if (wss_context->ssl_connected) { + if (wss_context->ssl_connected && evbuffer_get_length(wss_context->output)) { do_http2_write(wss_context, wss_context->output); } } @@ -1258,6 +1258,7 @@ static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { short what = BEV_EVENT_WRITING; struct bufferevent *bev = arg; struct bev_context_ssl *bev_context_ssl; + struct wss_context *wss_context; if (event == EV_TIMEOUT) { what |= BEV_EVENT_TIMEOUT; @@ -1274,10 +1275,14 @@ static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { default: break; } + if (bev_context_ssl && bev_context_ssl->http == http2) { - res = do_http2_write(bev_context_ssl->wss_context, bev_context_ssl->wss_context->output); - if (res < 0) { - goto write; + wss_context = bev_context_ssl->wss_context; + if (evbuffer_get_length(wss_context->output)) { + res = do_http2_write(wss_context, wss_context->output); + if (res <= 0) { + goto check; + } } } @@ -1288,23 +1293,23 @@ static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { goto error; } res = do_write(bev, fd, buffer, size); - if (res > 0) { - evbuffer_drain(bev->output, res); + if (res <= 0) { + goto check; } - goto write; + evbuffer_drain(bev->output, res); } if (evbuffer_get_length(bev->output) == 0) { event_del(&bev->ev_write); } - if (bev->writecb && evbuffer_get_length(bev->output) == 0) { + if (bev->writecb) { bev->writecb(bev, bev->cbarg); } goto done; -write: +check: if (res == WSS_AGAIN) { goto reschedule; } else if (res == WSS_ERROR) {