Skip to content

Commit

Permalink
Add pagination to management API
Browse files Browse the repository at this point in the history
- This splits long-running queries into multiple chunks
- Reducing the size of the needed memory buffer
- Allowing less blocking of the main loop
- This does open a race condition for the underlying data to change, but
  this is considered a reasonable trade off
- The implementation is not efficient as we would need to implement
  persistent cursors accessing the uthash data to avoid multiple list
  scans.
  • Loading branch information
hamishcoleman committed Jul 12, 2024
1 parent 1a23fad commit a0569b8
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 7 deletions.
11 changes: 11 additions & 0 deletions doc/ManagementAPI.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,14 @@ defaults to 'n3n' and can either be set with the config option
If the result of an API call will overrun the size of the internal buffer,
the response will indicate an "overflow" condition by returning a 507 result
and a JsonRPC error object.
When an overflow condition is returned, the error object may contain the
count of the number of items that could be added before the overflow occured.
The request can be retried with pagination parameters to avoid the overflow.
(Note that this also opens up a window for the internal data to change during
the paginated request)
Add the offset and limit values to the param dictionary.
The n3nctl tool has an example on how to use this implemented in its
JsonRPC.get() method
94 changes: 87 additions & 7 deletions src/management.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,26 @@ void mgmt_event_post (const enum n3n_event_topic topic, int data0, const void *d
// - if the write returns EWOULDBLOCK, increment a metric and return
}

static void extract_pagination (char *params, int *limit, int *offset) {
char *limitstr = json_find_field(params, "\"limit\"");
char *offsetstr = json_find_field(params, "\"offset\"");

// do all the field finding first, since the value extractor will
// insert nulls at the end of its strings

if(limitstr) {
*limit = atoi(json_extract_val(limitstr));
} else {
*limit = 2147483647; // default
}

if(offsetstr) {
*offset = atoi(json_extract_val(offsetstr));
} else {
offset = 0;
}
}

static void jsonrpc_error (char *id, conn_t *conn, int code, char *message, int count) {
// Reuse the request buffer
sb_zero(conn->request);
Expand Down Expand Up @@ -382,6 +402,7 @@ static void jsonrpc_set_verbose (char *id, struct n3n_runtime_data *eee, conn_t
}

// Avoid discarding the const attribute
// TODO: avoid malloc()
char *params = strdup(params_in+1);

char *arg1 = json_extract_val(params);
Expand Down Expand Up @@ -438,9 +459,21 @@ static void jsonrpc_get_mac (char *id, struct n3n_runtime_data *eee, conn_t *con
struct sn_community *tmp_community;
struct node_supernode_association *assoc;
struct node_supernode_association *tmp_assoc;
int count = 0;

int limit; // max number of items to add to this packet
int offset = 0; // Number of items to skip before adding
extract_pagination((char *)params, &limit, &offset);
int count = 0; // Number of items in this reply packet
int index = 0; // Track the current item number

HASH_ITER(hh, eee->communities, community, tmp_community) {
HASH_ITER(hh, community->assoc, assoc, tmp_assoc) {
if(index < offset) {
index++;
continue;
}
index++;


char buf[1000];
char port[10];
Expand Down Expand Up @@ -473,6 +506,9 @@ static void jsonrpc_get_mac (char *id, struct n3n_runtime_data *eee, conn_t *con
return;
}
count++;
if(count >= limit) {
break;
}
}
}

Expand Down Expand Up @@ -505,9 +541,18 @@ static void jsonrpc_get_communities (char *id, struct n3n_runtime_data *eee, con
jsonrpc_result_head(id, conn);
sb_reprintf(&conn->request, "[");

int count = 0;
int limit; // max number of items to add to this packet
int offset = 0; // Number of items to skip before adding
extract_pagination((char *)params, &limit, &offset);
int count = 0; // Number of items in this reply packet
int index = 0; // Track the current item number

HASH_ITER(hh, eee->communities, community, tmp) {
if(index < offset) {
index++;
continue;
}
index++;

sb_reprintf(&conn->request,
"{"
Expand All @@ -524,6 +569,9 @@ static void jsonrpc_get_communities (char *id, struct n3n_runtime_data *eee, con
return;
}
count++;
if(count >= limit) {
break;
}
}

jsonrpc_listend_hack(conn, "]");
Expand Down Expand Up @@ -581,9 +629,20 @@ static void jsonrpc_get_edges (char *id, struct n3n_runtime_data *eee, conn_t *c
jsonrpc_result_head(id, conn);
sb_reprintf(&conn->request, "[");

int count = 0;
int limit; // max number of items to add to this packet
int offset = 0; // Number of items to skip before adding
extract_pagination((char *)params, &limit, &offset);
int count = 0; // Number of items in this reply packet
int index = 0; // Track the current item number

// dump nodes with forwarding through supernodes
HASH_ITER(hh, eee->pending_peers, peer, tmpPeer) {
if(index < offset) {
index++;
continue;
}
index++;

jsonrpc_get_edges_row(
&conn->request,
peer,
Expand All @@ -595,10 +654,19 @@ static void jsonrpc_get_edges (char *id, struct n3n_runtime_data *eee, conn_t *c
return;
}
count++;
if(count >= limit) {
break;
}
}

// dump peer-to-peer nodes
HASH_ITER(hh, eee->known_peers, peer, tmpPeer) {
if(index < offset) {
index++;
continue;
}
index++;

jsonrpc_get_edges_row(
&conn->request,
peer,
Expand All @@ -610,23 +678,35 @@ static void jsonrpc_get_edges (char *id, struct n3n_runtime_data *eee, conn_t *c
return;
}
count++;
if(count >= limit) {
break;
}
}

struct sn_community *community, *tmp;
HASH_ITER(hh, eee->communities, community, tmp) {
HASH_ITER(hh, community->edges, peer, tmpPeer) {
if(index < offset) {
index++;
continue;
}
index++;

jsonrpc_get_edges_row(
&conn->request,
peer,
"sn",
(community->is_federation) ? "-/-" : community->community
);
}

if(jsonrpc_error_overflow(id, conn, count)) {
return;
if(jsonrpc_error_overflow(id, conn, count)) {
return;
}
count++;
if(count >= limit) {
break;
}
}
count++;
}


Expand Down

0 comments on commit a0569b8

Please sign in to comment.