Skip to content

Commit

Permalink
New option: Blocking slotmap updates after initial async connect
Browse files Browse the repository at this point in the history
Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Jan 2, 2025
1 parent 0531a26 commit 7a63913
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 39 deletions.
3 changes: 2 additions & 1 deletion include/valkey/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ typedef struct valkeyClusterNodeIterator {
/* Enable parsing of replica nodes. Currently not used, but the
* information is added to its primary node structure. */
#define VALKEY_OPT_USE_REPLICAS 0x2000
/* Use a blocking slotmap update after an initial async connect. */
#define VALKEY_OPT_BLOCKING_INITIAL_UPDATE 0x4000

typedef struct {
const char *initial_nodes; /* Initial cluster node address(es). */
Expand Down Expand Up @@ -204,7 +206,6 @@ int valkeyClusterOptionsSetConnectCallback(valkeyClusterOptions *options,
valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options);
valkeyClusterContext *valkeyClusterConnect(const char *addrs);
valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs, const struct timeval tv);
int valkeyClusterConnect2(valkeyClusterContext *cc);
void valkeyClusterFree(valkeyClusterContext *cc);

/* Options configurable in runtime. */
Expand Down
42 changes: 19 additions & 23 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ vk_static_assert(VALKEY_OPT_USE_CLUSTER_SLOTS > VALKEY_OPT_LAST_SA_OPTION);
#define VALKEY_FLAG_USE_CLUSTER_SLOTS 0x1
#define VALKEY_FLAG_PARSE_REPLICAS 0x2
#define VALKEY_FLAG_DISCONNECTING 0x4
#define VALKEY_FLAG_BLOCKING_INITIAL_UPDATE 0x8

// Cluster errors are offset by 100 to be sufficiently out of range of
// standard Valkey errors
Expand Down Expand Up @@ -1268,6 +1269,9 @@ static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions
if (options->options & VALKEY_OPT_USE_REPLICAS) {
cc->flags |= VALKEY_FLAG_PARSE_REPLICAS;
}
if (options->options & VALKEY_OPT_BLOCKING_INITIAL_UPDATE) {
cc->flags |= VALKEY_FLAG_BLOCKING_INITIAL_UPDATE;
}
if (options->max_retry > 0) {
cc->max_retry_count = options->max_retry;
} else {
Expand Down Expand Up @@ -1625,24 +1629,6 @@ int valkeyClusterSetOptionTimeout(valkeyClusterContext *cc,
return VALKEY_OK;
}

int valkeyClusterConnect2(valkeyClusterContext *cc) {

if (cc == NULL) {
return VALKEY_ERR;
}

if (dictSize(cc->nodes) == 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"server address not configured");
return VALKEY_ERR;
}
/* Clear a previously set shutdown flag since we allow a
* reconnection of an async context using this API (legacy). */
cc->flags &= ~VALKEY_FLAG_DISCONNECTING;

return valkeyClusterUpdateSlotmap(cc);
}

valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
valkeyClusterNode *node) {
valkeyContext *c = NULL;
Expand Down Expand Up @@ -2817,10 +2803,7 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClus
return NULL;
}

//TODO: valkeyClusterAsyncConnect(acc);
if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) {
valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr);
}
valkeyClusterAsyncConnect(acc);
return acc;
}

Expand All @@ -2829,7 +2812,20 @@ int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) {
if (acc->attach_fn == NULL) {
return VALKEY_ERR;
}
/* TODO: add options to use: valkeyClusterUpdateSlotmap(acc->cc); */

/* Clear a previously set shutdown flag to allow a
* reconnection of an async context using this API. */
acc->cc->flags &= ~VALKEY_FLAG_DISCONNECTING;

/* Use blocking initial slotmap update when configured. */
if (acc->cc->flags & VALKEY_FLAG_BLOCKING_INITIAL_UPDATE) {
if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) {
valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr);
return VALKEY_ERR;
}
return VALKEY_OK;
}
/* Use non-blocking initial slotmap update. */
return updateSlotMapAsync(acc, NULL /*any node*/);
}

Expand Down
3 changes: 2 additions & 1 deletion tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,12 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = initnode;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.connect_timeout = &timeout;
options.command_timeout = &timeout;
options.max_retry = 1;
if (use_cluster_slots) {
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options |= VALKEY_OPT_USE_CLUSTER_SLOTS;
}
if (show_events) {
options.event_callback = eventCallback;
Expand Down
16 changes: 8 additions & 8 deletions tests/clusterclient_reconnect_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ void connectToValkey(valkeyClusterAsyncContext *acc) {
/* reset context in case of reconnect */
valkeyClusterAsyncDisconnect(acc);

int status = valkeyClusterConnect2(acc->cc);
if (status == VALKEY_OK) {
if (valkeyClusterAsyncConnect(acc) == VALKEY_OK) {
// cluster mode
} else if (acc->cc->err &&
strcmp(acc->cc->errstr, VALKEY_ENOCLUSTER) == 0) {
} else if (acc->err &&
strcmp(acc->errstr, VALKEY_ENOCLUSTER) == 0) {
printf("[no cluster]\n");
acc->cc->err = 0;
memset(acc->cc->errstr, '\0', strlen(acc->cc->errstr));
acc->err = 0;
memset(acc->errstr, '\0', strlen(acc->errstr));
} else {
printf("Connect error: %s\n", acc->cc->errstr);
printf("Connect error: %s\n", acc->errstr);
exit(-1);
}
}
Expand Down Expand Up @@ -99,7 +98,8 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = initnode;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
valkeyClusterOptionsUseLibevent(&options, base);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options);
Expand Down
1 change: 1 addition & 0 deletions tests/ct_async_glib.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
valkeyClusterOptionsUseGlib(&options, context);
Expand Down
1 change: 1 addition & 0 deletions tests/ct_async_libev.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
valkeyClusterOptionsUseLibev(&options, EV_DEFAULT);
Expand Down
1 change: 1 addition & 0 deletions tests/ct_async_libuv.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
valkeyClusterOptionsUseLibuv(&options, loop);
Expand Down
8 changes: 8 additions & 0 deletions tests/ct_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ void test_async_password_ok(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.password = CLUSTER_PASSWORD;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
Expand Down Expand Up @@ -324,6 +325,7 @@ void test_async_password_wrong(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.password = "faultypass";
valkeyClusterOptionsUseLibevent(&options, base);

Expand Down Expand Up @@ -353,6 +355,7 @@ void test_async_password_missing(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
valkeyClusterOptionsUseLibevent(&options, base);
Expand Down Expand Up @@ -383,6 +386,7 @@ void test_async_username_ok(void) {
// Connect to the cluster using username and password
valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
options.username = "missing-user";
Expand Down Expand Up @@ -423,6 +427,7 @@ void test_async_multicluster(void) {

valkeyClusterOptions options1 = {0};
options1.initial_nodes = CLUSTER_NODE;
options1.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options1.async_connect_cb = connectCallback;
options1.async_disconnect_cb = disconnectCallback;
valkeyClusterOptionsUseLibevent(&options1, base);
Expand All @@ -433,6 +438,7 @@ void test_async_multicluster(void) {

valkeyClusterOptions options2 = {0};
options2.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options2.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options2.password = CLUSTER_PASSWORD;
options2.async_connect_cb = connectCallback;
options2.async_disconnect_cb = disconnectCallback;
Expand Down Expand Up @@ -483,6 +489,7 @@ void test_async_connect_timeout(void) {
valkeyClusterOptions options = {0};
/* Configure a non-routable IP address and a timeout */
options.initial_nodes = "192.168.0.0:7000";
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.connect_timeout = &timeout;
valkeyClusterOptionsUseLibevent(&options, base);

Expand All @@ -504,6 +511,7 @@ void test_async_command_timeout(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.command_timeout = &timeout;
valkeyClusterOptionsUseLibevent(&options, base);

Expand Down
3 changes: 2 additions & 1 deletion tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ void test_alloc_failure_handling_async(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_REPLICAS;
options.options = VALKEY_OPT_USE_REPLICAS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
valkeyClusterOptionsUseLibevent(&options, base);
Expand Down
1 change: 1 addition & 0 deletions tests/ct_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void test_async_pipeline(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
valkeyClusterOptionsUseLibevent(&options, base);
Expand Down
15 changes: 10 additions & 5 deletions tests/ct_specific_nodes.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ void test_async_to_single_node(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry = 1;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
Expand Down Expand Up @@ -353,7 +354,8 @@ void test_async_formatted_to_single_node(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry = 1;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
Expand Down Expand Up @@ -385,7 +387,8 @@ void test_async_command_argv_to_single_node(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry = 1;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
Expand Down Expand Up @@ -417,7 +420,8 @@ void test_async_to_all_nodes(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry = 1;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
Expand Down Expand Up @@ -457,7 +461,8 @@ void test_async_transaction(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry = 1;
options.async_connect_cb = connectCallback;
options.async_disconnect_cb = disconnectCallback;
Expand Down

0 comments on commit 7a63913

Please sign in to comment.