diff --git a/include/valkey/cluster.h b/include/valkey/cluster.h index 10c2442..29db89f 100644 --- a/include/valkey/cluster.h +++ b/include/valkey/cluster.h @@ -152,6 +152,8 @@ typedef struct valkeyClusterNodeIterator { /* Enable parsing of replica nodes. Currently not used, but the * information is added to its primary node structure. */ #define VALKEY_OPT_USE_REPLICAS 0x2000 +/* Use a blocking slotmap update after an initial async connect. */ +#define VALKEY_OPT_BLOCKING_INITIAL_UPDATE 0x4000 typedef struct { const char *initial_nodes; /* Initial cluster node address(es). */ @@ -204,7 +206,6 @@ int valkeyClusterOptionsSetConnectCallback(valkeyClusterOptions *options, valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options); valkeyClusterContext *valkeyClusterConnect(const char *addrs); valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs, const struct timeval tv); -int valkeyClusterConnect2(valkeyClusterContext *cc); void valkeyClusterFree(valkeyClusterContext *cc); /* Options configurable in runtime. */ diff --git a/src/cluster.c b/src/cluster.c index ec990af..9597261 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -56,6 +56,7 @@ vk_static_assert(VALKEY_OPT_USE_CLUSTER_SLOTS > VALKEY_OPT_LAST_SA_OPTION); #define VALKEY_FLAG_USE_CLUSTER_SLOTS 0x1 #define VALKEY_FLAG_PARSE_REPLICAS 0x2 #define VALKEY_FLAG_DISCONNECTING 0x4 +#define VALKEY_FLAG_BLOCKING_INITIAL_UPDATE 0x8 // Cluster errors are offset by 100 to be sufficiently out of range of // standard Valkey errors @@ -1268,6 +1269,9 @@ static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions if (options->options & VALKEY_OPT_USE_REPLICAS) { cc->flags |= VALKEY_FLAG_PARSE_REPLICAS; } + if (options->options & VALKEY_OPT_BLOCKING_INITIAL_UPDATE) { + cc->flags |= VALKEY_FLAG_BLOCKING_INITIAL_UPDATE; + } if (options->max_retry > 0) { cc->max_retry_count = options->max_retry; } else { @@ -1625,24 +1629,6 @@ int valkeyClusterSetOptionTimeout(valkeyClusterContext *cc, return VALKEY_OK; } -int valkeyClusterConnect2(valkeyClusterContext *cc) { - - if (cc == NULL) { - return VALKEY_ERR; - } - - if (dictSize(cc->nodes) == 0) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "server address not configured"); - return VALKEY_ERR; - } - /* Clear a previously set shutdown flag since we allow a - * reconnection of an async context using this API (legacy). */ - cc->flags &= ~VALKEY_FLAG_DISCONNECTING; - - return valkeyClusterUpdateSlotmap(cc); -} - valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc, valkeyClusterNode *node) { valkeyContext *c = NULL; @@ -2817,10 +2803,7 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClus return NULL; } - //TODO: valkeyClusterAsyncConnect(acc); - if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) { - valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr); - } + valkeyClusterAsyncConnect(acc); return acc; } @@ -2829,7 +2812,20 @@ int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) { if (acc->attach_fn == NULL) { return VALKEY_ERR; } - /* TODO: add options to use: valkeyClusterUpdateSlotmap(acc->cc); */ + + /* Clear a previously set shutdown flag to allow a + * reconnection of an async context using this API. */ + acc->cc->flags &= ~VALKEY_FLAG_DISCONNECTING; + + /* Use blocking initial slotmap update when configured. */ + if (acc->cc->flags & VALKEY_FLAG_BLOCKING_INITIAL_UPDATE) { + if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) { + valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr); + return VALKEY_ERR; + } + return VALKEY_OK; + } + /* Use non-blocking initial slotmap update. */ return updateSlotMapAsync(acc, NULL /*any node*/); } diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index 80f620f..fd1cb3f 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -251,11 +251,12 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = initnode; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.connect_timeout = &timeout; options.command_timeout = &timeout; options.max_retry = 1; if (use_cluster_slots) { - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options |= VALKEY_OPT_USE_CLUSTER_SLOTS; } if (show_events) { options.event_callback = eventCallback; diff --git a/tests/clusterclient_reconnect_async.c b/tests/clusterclient_reconnect_async.c index ddde2bc..ebb0755 100644 --- a/tests/clusterclient_reconnect_async.c +++ b/tests/clusterclient_reconnect_async.c @@ -27,16 +27,15 @@ void connectToValkey(valkeyClusterAsyncContext *acc) { /* reset context in case of reconnect */ valkeyClusterAsyncDisconnect(acc); - int status = valkeyClusterConnect2(acc->cc); - if (status == VALKEY_OK) { + if (valkeyClusterAsyncConnect(acc) == VALKEY_OK) { // cluster mode - } else if (acc->cc->err && - strcmp(acc->cc->errstr, VALKEY_ENOCLUSTER) == 0) { + } else if (acc->err && + strcmp(acc->errstr, VALKEY_ENOCLUSTER) == 0) { printf("[no cluster]\n"); - acc->cc->err = 0; - memset(acc->cc->errstr, '\0', strlen(acc->cc->errstr)); + acc->err = 0; + memset(acc->errstr, '\0', strlen(acc->errstr)); } else { - printf("Connect error: %s\n", acc->cc->errstr); + printf("Connect error: %s\n", acc->errstr); exit(-1); } } @@ -99,7 +98,8 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = initnode; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; valkeyClusterOptionsUseLibevent(&options, base); valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); diff --git a/tests/ct_async_glib.c b/tests/ct_async_glib.c index 94cbb76..750b469 100644 --- a/tests/ct_async_glib.c +++ b/tests/ct_async_glib.c @@ -43,6 +43,7 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; valkeyClusterOptionsUseGlib(&options, context); diff --git a/tests/ct_async_libev.c b/tests/ct_async_libev.c index 17fd930..22f517a 100644 --- a/tests/ct_async_libev.c +++ b/tests/ct_async_libev.c @@ -37,6 +37,7 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; valkeyClusterOptionsUseLibev(&options, EV_DEFAULT); diff --git a/tests/ct_async_libuv.c b/tests/ct_async_libuv.c index 2fd38b3..481f235 100644 --- a/tests/ct_async_libuv.c +++ b/tests/ct_async_libuv.c @@ -40,6 +40,7 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; valkeyClusterOptionsUseLibuv(&options, loop); diff --git a/tests/ct_connection.c b/tests/ct_connection.c index 6ca2d78..720be54 100644 --- a/tests/ct_connection.c +++ b/tests/ct_connection.c @@ -297,6 +297,7 @@ void test_async_password_ok(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.password = CLUSTER_PASSWORD; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; @@ -324,6 +325,7 @@ void test_async_password_wrong(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.password = "faultypass"; valkeyClusterOptionsUseLibevent(&options, base); @@ -353,6 +355,7 @@ void test_async_password_missing(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; valkeyClusterOptionsUseLibevent(&options, base); @@ -383,6 +386,7 @@ void test_async_username_ok(void) { // Connect to the cluster using username and password valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; options.username = "missing-user"; @@ -423,6 +427,7 @@ void test_async_multicluster(void) { valkeyClusterOptions options1 = {0}; options1.initial_nodes = CLUSTER_NODE; + options1.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options1.async_connect_cb = connectCallback; options1.async_disconnect_cb = disconnectCallback; valkeyClusterOptionsUseLibevent(&options1, base); @@ -433,6 +438,7 @@ void test_async_multicluster(void) { valkeyClusterOptions options2 = {0}; options2.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options2.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options2.password = CLUSTER_PASSWORD; options2.async_connect_cb = connectCallback; options2.async_disconnect_cb = disconnectCallback; @@ -483,6 +489,7 @@ void test_async_connect_timeout(void) { valkeyClusterOptions options = {0}; /* Configure a non-routable IP address and a timeout */ options.initial_nodes = "192.168.0.0:7000"; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.connect_timeout = &timeout; valkeyClusterOptionsUseLibevent(&options, base); @@ -504,6 +511,7 @@ void test_async_command_timeout(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.command_timeout = &timeout; valkeyClusterOptionsUseLibevent(&options, base); diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 68228a5..33143d2 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -452,7 +452,8 @@ void test_alloc_failure_handling_async(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_REPLICAS; + options.options = VALKEY_OPT_USE_REPLICAS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; valkeyClusterOptionsUseLibevent(&options, base); diff --git a/tests/ct_pipeline.c b/tests/ct_pipeline.c index 6dd1ed2..5d14b6b 100644 --- a/tests/ct_pipeline.c +++ b/tests/ct_pipeline.c @@ -96,6 +96,7 @@ void test_async_pipeline(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; valkeyClusterOptionsUseLibevent(&options, base); diff --git a/tests/ct_specific_nodes.c b/tests/ct_specific_nodes.c index eefb09a..8008f69 100644 --- a/tests/ct_specific_nodes.c +++ b/tests/ct_specific_nodes.c @@ -322,7 +322,8 @@ void test_async_to_single_node(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry = 1; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; @@ -353,7 +354,8 @@ void test_async_formatted_to_single_node(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry = 1; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; @@ -385,7 +387,8 @@ void test_async_command_argv_to_single_node(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry = 1; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; @@ -417,7 +420,8 @@ void test_async_to_all_nodes(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry = 1; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback; @@ -457,7 +461,8 @@ void test_async_transaction(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry = 1; options.async_connect_cb = connectCallback; options.async_disconnect_cb = disconnectCallback;