Skip to content

Commit

Permalink
Make it easier to see the steps needed for the two packet paths by re…
Browse files Browse the repository at this point in the history
…factoring into two functions
  • Loading branch information
hamishcoleman committed Oct 26, 2024
1 parent a4d6abd commit ed50bcb
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 94 deletions.
22 changes: 18 additions & 4 deletions apps/n3n-edge.c
Original file line number Diff line number Diff line change
Expand Up @@ -977,10 +977,24 @@ int main (int argc, char* argv[]) {

if(select(eee->sock + 1, &socket_mask, NULL, NULL, &wait_time) > 0) {
if(FD_ISSET(eee->sock, &socket_mask)) {

fetch_and_eventually_process_data(eee, eee->sock,
pktbuf, &expected, &position,
now);
if(!eee->conf.connect_tcp) {
edge_read_proto3_udp(
eee,
eee->sock,
pktbuf,
sizeof(pktbuf),
now
);
} else {
edge_read_proto3_tcp(
eee,
eee->sock,
pktbuf,
&expected,
&position,
now
);
}
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions include/n3n/edge.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ void edge_term_conf (n2n_edge_conf_t *conf);
void send_register_super (struct n3n_runtime_data *eee);
void send_query_peer (struct n3n_runtime_data *eee, const n2n_mac_t dst_mac);
int supernode_connect (struct n3n_runtime_data *eee);
int fetch_and_eventually_process_data (struct n3n_runtime_data *eee, SOCKET sock,
uint8_t *pktbuf, uint16_t *expected, uint16_t *position,
time_t now);

void edge_read_proto3_udp (struct n3n_runtime_data *eee,
SOCKET sock,
uint8_t *pktbuf,
ssize_t pktbuf_len,
time_t now);
void edge_read_proto3_tcp (struct n3n_runtime_data *eee,
SOCKET sock,
uint8_t *pktbuf,
uint16_t *expected,
uint16_t *position,
time_t now);

#endif
203 changes: 116 additions & 87 deletions src/edge_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -2878,92 +2878,117 @@ void process_udp (struct n3n_runtime_data *eee, const struct sockaddr *sender_so

/* ************************************** */

void edge_read_proto3_udp (struct n3n_runtime_data *eee,
SOCKET sock,
uint8_t *pktbuf,
ssize_t pktbuf_len,
time_t now) {
struct sockaddr_storage sas;
struct sockaddr *sender_sock = (struct sockaddr*)&sas;
socklen_t ss_size = sizeof(sas);

ssize_t bread = recvfrom(
sock,
pktbuf,
pktbuf_len,
0 /*flags*/,
sender_sock,
&ss_size
);

if(bread < 0) {
#ifdef _WIN32
unsigned int wsaerr = WSAGetLastError();
if(wsaerr == WSAECONNRESET) {
return;
}
traceEvent(TRACE_ERROR, "WSAGetLastError(): %u", wsaerr);
#endif

/* For UDP bread of zero just means no data (unlike TCP). */
/* The fd is no good now. Maybe we lost our interface. */
traceEvent(TRACE_ERROR, "recvfrom() failed %d errno %d (%s)", bread, errno, strerror(errno));
*eee->keep_running = false;
return;
}
if(bread == 0) {
return;
}

int fetch_and_eventually_process_data (struct n3n_runtime_data *eee, SOCKET sock,
uint8_t *pktbuf, uint16_t *expected, uint16_t *position,
time_t now) {
// TODO:
// - detect when pktbuf is too small for the packet and add that to stats
// (could switch to using recvmsg() for that)

ssize_t bread = 0;
// we have a datagram to process...
// ...and the datagram has data (not just a header)
//
process_udp(eee, sender_sock, sock, pktbuf, bread, now);
return;
}

void edge_read_proto3_tcp (struct n3n_runtime_data *eee,
SOCKET sock,
uint8_t *pktbuf,
uint16_t *expected,
uint16_t *position,
time_t now) {
struct sockaddr_storage sas;
struct sockaddr *sender_sock = (struct sockaddr*)&sas;
socklen_t ss_size = sizeof(sas);

if((!eee->conf.connect_tcp)
#ifndef SKIP_MULTICAST_PEERS_DISCOVERY
|| (sock == eee->udp_multicast_sock)
#endif
) {
// udp
bread = recvfrom(sock, (void *)pktbuf, N2N_PKT_BUF_SIZE, 0 /*flags*/,
sender_sock, &ss_size);
// tcp
ssize_t bread = recvfrom(
sock,
(void *)(pktbuf + *position),
*expected - *position,
0 /*flags*/,
sender_sock,
&ss_size
);

if((bread < 0)
#ifdef _WIN32
&& (WSAGetLastError() != WSAECONNRESET)
#endif
) {
/* For UDP bread of zero just means no data (unlike TCP). */
/* The fd is no good now. Maybe we lost our interface. */
traceEvent(TRACE_ERROR, "recvfrom() failed %d errno %d (%s)", bread, errno, strerror(errno));
if((bread <= 0) && (errno)) {
traceEvent(
TRACE_ERROR,
"recvfrom() failed %d errno %d (%s)",
bread,
errno,
strerror(errno)
);
#ifdef _WIN32
traceEvent(TRACE_ERROR, "WSAGetLastError(): %u", WSAGetLastError());
traceEvent(TRACE_ERROR, "WSAGetLastError(): %u", WSAGetLastError());
#endif
return -1;
}
supernode_disconnect(eee);
eee->sn_wait = 1;
return;
}

// TODO: if bread > 64K, something is wrong
// but this case should not happen
*position = *position + bread;

// we have a datagram to process...
if(bread > 0) {
// ...and the datagram has data (not just a header)
process_udp(eee, sender_sock, sock, pktbuf, bread, now);
}
if(*position != *expected) {
// not enough bytes yet to process buffer
return;
}

} else {
// tcp
bread = recvfrom(sock,
(void *)(pktbuf + *position), *expected - *position, 0 /*flags*/,
sender_sock, &ss_size);
if((bread <= 0) && (errno)) {
traceEvent(TRACE_ERROR, "recvfrom() failed %d errno %d (%s)", bread, errno, strerror(errno));
#ifdef _WIN32
traceEvent(TRACE_ERROR, "WSAGetLastError(): %u", WSAGetLastError());
#endif
if(*position == sizeof(uint16_t)) {
// the prepended length has been read, preparing for the packet
*expected = *expected + be16toh(*(uint16_t*)(pktbuf));
if(*expected > N2N_PKT_BUF_SIZE) {
supernode_disconnect(eee);
eee->sn_wait = 1;
goto tcp_done;
}
*position = *position + bread;

if(*position == *expected) {
if(*position == sizeof(uint16_t)) {
// the prepended length has been read, preparing for the packet
*expected = *expected + be16toh(*(uint16_t*)(pktbuf));
if(*expected > N2N_PKT_BUF_SIZE) {
supernode_disconnect(eee);
eee->sn_wait = 1;
traceEvent(TRACE_DEBUG, "too many bytes expected");
goto tcp_done;
}
} else {
// full packet read, handle it
process_udp(eee, sender_sock, sock,
pktbuf + sizeof(uint16_t), *position - sizeof(uint16_t), now);
// reset, await new prepended length
*expected = sizeof(uint16_t);
*position = 0;
}
traceEvent(TRACE_DEBUG, "too many bytes expected");
}
return;
}
tcp_done:
;

return 0;
// full packet read, handle it
process_udp(eee, sender_sock, sock,
pktbuf + sizeof(uint16_t), *position - sizeof(uint16_t), now);
// reset, await new prepended length
*expected = sizeof(uint16_t);
*position = 0;
return;
}


void print_edge_stats (const struct n3n_runtime_data *eee) {

const struct n2n_edge_stats *s = &eee->stats;
Expand Down Expand Up @@ -3030,17 +3055,24 @@ int run_edge_loop (struct n3n_runtime_data *eee) {

// external packets
if((eee->sock != -1) && FD_ISSET(eee->sock, &readers)) {
if(0 != fetch_and_eventually_process_data(
eee,
eee->sock,
pktbuf,
&expected,
&position,
now
)) {
*eee->keep_running = false;
}
if(eee->conf.connect_tcp) {
if(!eee->conf.connect_tcp) {
edge_read_proto3_udp(
eee,
eee->sock,
pktbuf,
sizeof(pktbuf),
now
);
} else {
edge_read_proto3_tcp(
eee,
eee->sock,
pktbuf,
&expected,
&position,
now
);

if((expected >= N2N_PKT_BUF_SIZE) || (position >= N2N_PKT_BUF_SIZE)) {
// something went wrong, possibly even before
// e.g. connection failure/closure in the middle of transmission (between len & data)
Expand All @@ -3055,16 +3087,13 @@ int run_edge_loop (struct n3n_runtime_data *eee) {

#ifndef SKIP_MULTICAST_PEERS_DISCOVERY
if((eee->udp_multicast_sock != -1) && FD_ISSET(eee->udp_multicast_sock, &readers)) {
if(0 != fetch_and_eventually_process_data(
eee,
eee->udp_multicast_sock,
pktbuf,
&expected,
&position,
now
)) {
*eee->keep_running = false;
}
edge_read_proto3_udp(
eee,
eee->udp_multicast_sock,
pktbuf,
sizeof(pktbuf),
now
);
}
#endif

Expand Down

0 comments on commit ed50bcb

Please sign in to comment.