diff --git a/examples/cluster-async-tls.c b/examples/cluster-async-tls.c index 00200f68..3daa81c9 100644 --- a/examples/cluster-async-tls.c +++ b/examples/cluster-async-tls.c @@ -66,26 +66,24 @@ int main(int argc, char **argv) { exit(1); } - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, connectCallback); - valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE_TLS); - valkeyClusterSetOptionRouteUseSlots(acc->cc); - valkeyClusterSetOptionParseSlaves(acc->cc); - valkeyClusterSetOptionEnableTLS(acc->cc, tls); - - if (valkeyClusterConnect2(acc->cc) != VALKEY_OK) { - printf("Error: %s\n", acc->cc->errstr); - exit(-1); - } + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_TLS; + options.async_connect_cb = connectCallback; + options.async_disconnect_cb = disconnectCallback; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + valkeyClusterSetOptionEnableTLS(&options, tls); struct event_base *base = event_base_new(); - valkeyClusterLibeventAttach(acc, base); + valkeyClusterSetOptionUseLibevent(&options, base); + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + if (acc == NULL || acc->err != 0) { + printf("Error: %s\n", acc ? acc->errstr : "OOM"); + exit(-1); + } - int status; - status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"THE_ID", - "SET %s %s", "key", "value"); + int status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"THE_ID", + "SET %s %s", "key", "value"); if (status != VALKEY_OK) { printf("error: err=%d errstr=%s\n", acc->err, acc->errstr); } diff --git a/examples/cluster-async.c b/examples/cluster-async.c index f2d9c282..72fc5e7f 100644 --- a/examples/cluster-async.c +++ b/examples/cluster-async.c @@ -49,53 +49,55 @@ void disconnectCallback(const valkeyAsyncContext *ac, int status) { int main(int argc, char **argv) { (void)argc; (void)argv; + struct event_base *base = event_base_new(); + + valkeyClusterOptions options = {0}; + options.initial_nodes = "127.0.0.1:7000"; + options.async_connect_cb = connectCallback; + options.async_disconnect_cb = disconnectCallback; + valkeyClusterSetOptionUseLibevent(&options, base); + printf("Connecting...\n"); - valkeyClusterAsyncContext *cc = - valkeyClusterAsyncConnect("127.0.0.1:7000", VALKEYCLUSTER_FLAG_NULL); - if (!cc) { + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + if (!acc) { printf("Error: Allocation failure\n"); exit(-1); - } else if (cc->err) { - printf("Error: %s\n", cc->errstr); + } else if (acc->err) { + printf("Error: %s\n", acc->errstr); // handle error exit(-1); } - struct event_base *base = event_base_new(); - valkeyClusterLibeventAttach(cc, base); - valkeyClusterAsyncSetConnectCallback(cc, connectCallback); - valkeyClusterAsyncSetDisconnectCallback(cc, disconnectCallback); - int status; - status = valkeyClusterAsyncCommand(cc, setCallback, (char *)"THE_ID", + status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"THE_ID", "SET %s %s", "key", "value"); if (status != VALKEY_OK) { - printf("error: err=%d errstr=%s\n", cc->err, cc->errstr); + printf("error: err=%d errstr=%s\n", acc->err, acc->errstr); } - status = valkeyClusterAsyncCommand(cc, getCallback, (char *)"THE_ID", + status = valkeyClusterAsyncCommand(acc, getCallback, (char *)"THE_ID", "GET %s", "key"); if (status != VALKEY_OK) { - printf("error: err=%d errstr=%s\n", cc->err, cc->errstr); + printf("error: err=%d errstr=%s\n", acc->err, acc->errstr); } - status = valkeyClusterAsyncCommand(cc, setCallback, (char *)"THE_ID", + status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"THE_ID", "SET %s %s", "key2", "value2"); if (status != VALKEY_OK) { - printf("error: err=%d errstr=%s\n", cc->err, cc->errstr); + printf("error: err=%d errstr=%s\n", acc->err, acc->errstr); } - status = valkeyClusterAsyncCommand(cc, getCallback, (char *)"THE_ID", + status = valkeyClusterAsyncCommand(acc, getCallback, (char *)"THE_ID", "GET %s", "key2"); if (status != VALKEY_OK) { - printf("error: err=%d errstr=%s\n", cc->err, cc->errstr); + printf("error: err=%d errstr=%s\n", acc->err, acc->errstr); } printf("Dispatch..\n"); event_base_dispatch(base); printf("Done..\n"); - valkeyClusterAsyncFree(cc); + valkeyClusterAsyncFree(acc); event_base_free(base); return 0; } diff --git a/examples/cluster-clientside-caching-async.c b/examples/cluster-clientside-caching-async.c index f1c11181..865dcb53 100644 --- a/examples/cluster-clientside-caching-async.c +++ b/examples/cluster-clientside-caching-async.c @@ -127,11 +127,8 @@ void disconnectCallback(const valkeyAsyncContext *ac, int status) { /* Helper to modify keys using a separate client. */ void modifyKey(const char *key, const char *value) { printf("Modify key: '%s'\n", key); - valkeyClusterContext *cc = valkeyClusterContextInit(); - int status = valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - assert(status == VALKEY_OK); - status = valkeyClusterConnect2(cc); - assert(status == VALKEY_OK); + valkeyClusterContext *cc = valkeyClusterConnect(CLUSTER_NODE); + assert(cc); valkeyReply *reply = valkeyClusterCommand(cc, "SET %s %s", key, value); assert(reply != NULL); @@ -143,24 +140,22 @@ void modifyKey(const char *key, const char *value) { int main(int argc, char **argv) { (void)argc; (void)argv; - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); + struct event_base *base = event_base_new(); + + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.async_connect_nc_cb = connectCallbackNC; + options.async_disconnect_cb = disconnectCallback; + valkeyClusterSetOptionUseLibevent(&options, base); + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); assert(acc); int status; - status = valkeyClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC); - assert(status == VALKEY_OK); - status = valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); - assert(status == VALKEY_OK); - status = valkeyClusterSetEventCallback(acc->cc, eventCallback, acc); - assert(status == VALKEY_OK); - status = valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - assert(status == VALKEY_OK); - - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); + status = valkeyClusterAsyncSetEventCallback(acc, eventCallback, acc); assert(status == VALKEY_OK); - status = valkeyClusterAsyncConnect2(acc); + status = valkeyClusterAsyncConnect(acc); assert(status == VALKEY_OK); event_base_dispatch(base); diff --git a/examples/cluster-simple.c b/examples/cluster-simple.c index 25bfd361..56274992 100644 --- a/examples/cluster-simple.c +++ b/examples/cluster-simple.c @@ -8,11 +8,12 @@ int main(int argc, char **argv) { UNUSED(argv); struct timeval timeout = {1, 500000}; // 1.5s - valkeyClusterContext *cc = valkeyClusterContextInit(); - valkeyClusterSetOptionAddNodes(cc, "127.0.0.1:7000"); - valkeyClusterSetOptionConnectTimeout(cc, timeout); - valkeyClusterSetOptionRouteUseSlots(cc); - valkeyClusterConnect2(cc); + valkeyClusterOptions options = {0}; + options.initial_nodes = "127.0.0.1:7000"; + options.connect_timeout = &timeout; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); if (!cc) { printf("Error: Allocation failure\n"); exit(-1); diff --git a/examples/cluster-tls.c b/examples/cluster-tls.c index 526dadfe..ee8cab82 100644 --- a/examples/cluster-tls.c +++ b/examples/cluster-tls.c @@ -25,13 +25,13 @@ int main(int argc, char **argv) { struct timeval timeout = {1, 500000}; // 1.5s - valkeyClusterContext *cc = valkeyClusterContextInit(); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE_TLS); - valkeyClusterSetOptionConnectTimeout(cc, timeout); - valkeyClusterSetOptionRouteUseSlots(cc); - valkeyClusterSetOptionParseSlaves(cc); - valkeyClusterSetOptionEnableTLS(cc, tls); - valkeyClusterConnect2(cc); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_TLS; + options.connect_timeout = &timeout; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + valkeyClusterSetOptionEnableTLS(&options, tls); + + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); if (!cc) { printf("Error: Allocation failure\n"); exit(-1); diff --git a/include/valkey/adapters/ae.h b/include/valkey/adapters/ae.h index 5d8849d9..143680e2 100644 --- a/include/valkey/adapters/ae.h +++ b/include/valkey/adapters/ae.h @@ -140,14 +140,15 @@ static int valkeyAeAttachAdapter(valkeyAsyncContext *ac, void *loop) { } VALKEY_UNUSED -static int valkeyClusterAeAttach(valkeyClusterAsyncContext *acc, - aeEventLoop *loop) { - if (acc == NULL || loop == NULL) { +static int valkeyClusterSetOptionUseAe(valkeyClusterOptions *options, + aeEventLoop *loop) { + if (options == NULL || loop == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyAeAttachAdapter; - acc->attach_data = loop; + options->attach_fn = valkeyAeAttachAdapter; + options->attach_data = loop; return VALKEY_OK; } + #endif /* VALKEY_ADAPTERS_AE_H */ diff --git a/include/valkey/adapters/glib.h b/include/valkey/adapters/glib.h index acd04120..23d52b0f 100644 --- a/include/valkey/adapters/glib.h +++ b/include/valkey/adapters/glib.h @@ -153,14 +153,14 @@ static int valkeyGlibAttachAdapter(valkeyAsyncContext *ac, void *context) { } VALKEY_UNUSED -static int valkeyClusterGlibAttach(valkeyClusterAsyncContext *acc, - GMainContext *context) { - if (acc == NULL) { // A NULL context is accepted. +static int valkeyClusterSetOptionUseGlib(valkeyClusterOptions *options, + GMainContext *context) { + if (options == NULL) { // A NULL context is accepted. return VALKEY_ERR; } - acc->attach_fn = valkeyGlibAttachAdapter; - acc->attach_data = context; + options->attach_fn = valkeyGlibAttachAdapter; + options->attach_data = context; return VALKEY_OK; } diff --git a/include/valkey/adapters/ivykis.h b/include/valkey/adapters/ivykis.h index cccdb228..bbd16295 100644 --- a/include/valkey/adapters/ivykis.h +++ b/include/valkey/adapters/ivykis.h @@ -85,17 +85,17 @@ static int valkeyIvykisAttach(valkeyAsyncContext *ac) { } /* Internal adapter function with correct function signature. */ -static int valkeyClusterIvykisAttachAdapter(valkeyAsyncContext *ac, VALKEY_UNUSED void *) { +static int valkeyIvykisAttachAdapter(valkeyAsyncContext *ac, VALKEY_UNUSED void *) { return valkeyIvykisAttach(ac); } VALKEY_UNUSED -static int valkeyClusterIvykisAttach(valkeyClusterAsyncContext *acc) { - if (acc == NULL) { +static int valkeyClusterSetOptionUseIvykis(valkeyClusterOptions *options) { + if (options == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyClusterIvykisAttachAdapter; + options->attach_fn = valkeyIvykisAttachAdapter; return VALKEY_OK; } diff --git a/include/valkey/adapters/libev.h b/include/valkey/adapters/libev.h index 1a6ee282..5c86d3c4 100644 --- a/include/valkey/adapters/libev.h +++ b/include/valkey/adapters/libev.h @@ -193,14 +193,14 @@ static int valkeyLibevAttachAdapter(valkeyAsyncContext *ac, void *loop) { } VALKEY_UNUSED -static int valkeyClusterLibevAttach(valkeyClusterAsyncContext *acc, - struct ev_loop *loop) { - if (acc == NULL || loop == NULL) { +static int valkeyClusterSetOptionUseLibev(valkeyClusterOptions *options, + struct ev_loop *loop) { + if (options == NULL || loop == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyLibevAttachAdapter; - acc->attach_data = loop; + options->attach_fn = valkeyLibevAttachAdapter; + options->attach_data = loop; return VALKEY_OK; } diff --git a/include/valkey/adapters/libevent.h b/include/valkey/adapters/libevent.h index d2ab4fdf..9496ae43 100644 --- a/include/valkey/adapters/libevent.h +++ b/include/valkey/adapters/libevent.h @@ -182,14 +182,15 @@ static int valkeyLibeventAttachAdapter(valkeyAsyncContext *ac, void *base) { } VALKEY_UNUSED -static int valkeyClusterLibeventAttach(valkeyClusterAsyncContext *acc, - struct event_base *base) { - if (acc == NULL || base == NULL) { +static int valkeyClusterSetOptionUseLibevent(valkeyClusterOptions *options, + struct event_base *base) { + if (options == NULL || base == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyLibeventAttachAdapter; - acc->attach_data = base; + options->attach_fn = valkeyLibeventAttachAdapter; + options->attach_data = base; return VALKEY_OK; } + #endif /* VALKEY_ADAPTERS_LIBEVENT_H */ diff --git a/include/valkey/adapters/libhv.h b/include/valkey/adapters/libhv.h index 5a13aab3..027f80cc 100644 --- a/include/valkey/adapters/libhv.h +++ b/include/valkey/adapters/libhv.h @@ -129,14 +129,14 @@ static int valkeyLibhvAttachAdapter(valkeyAsyncContext *ac, void *loop) { } VALKEY_UNUSED -static int valkeyClusterLibhvAttach(valkeyClusterAsyncContext *acc, - hloop_t *loop) { - if (acc == NULL || loop == NULL) { +static int valkeyClusterSetOptionUseLibhv(valkeyClusterOptions *options, + hloop_t *loop) { + if (options == NULL || loop == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyLibhvAttachAdapter; - acc->attach_data = loop; + options->attach_fn = valkeyLibhvAttachAdapter; + options->attach_data = loop; return VALKEY_OK; } diff --git a/include/valkey/adapters/libsdevent.h b/include/valkey/adapters/libsdevent.h index 25fb29db..bc676b1d 100644 --- a/include/valkey/adapters/libsdevent.h +++ b/include/valkey/adapters/libsdevent.h @@ -184,14 +184,14 @@ static int valkeyLibsdeventAttachAdapter(valkeyAsyncContext *ac, void *event) { } VALKEY_UNUSED -static int valkeyClusterLibsdeventAttach(valkeyClusterAsyncContext *acc, - struct sd_event *event) { - if (acc == NULL || event == NULL) { +static int valkeyClusterSetOptionUseLibsdevent(valkeyClusterOptions *options, + struct sd_event *event) { + if (options == NULL || event == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyLibsdeventAttachAdapter; - acc->attach_data = event; + options->attach_fn = valkeyLibsdeventAttachAdapter; + options->attach_data = event; return VALKEY_OK; } diff --git a/include/valkey/adapters/libuv.h b/include/valkey/adapters/libuv.h index d443e19c..d94a33b0 100644 --- a/include/valkey/adapters/libuv.h +++ b/include/valkey/adapters/libuv.h @@ -203,14 +203,15 @@ static int valkeyLibuvAttachAdapter(valkeyAsyncContext *ac, void *loop) { } VALKEY_UNUSED -static int valkeyClusterLibuvAttach(valkeyClusterAsyncContext *acc, - uv_loop_t *loop) { - if (acc == NULL || loop == NULL) { +static int valkeyClusterSetOptionUseLibuv(valkeyClusterOptions *options, + uv_loop_t *loop) { + if (options == NULL || loop == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyLibuvAttachAdapter; - acc->attach_data = loop; + options->attach_fn = valkeyLibuvAttachAdapter; + options->attach_data = loop; return VALKEY_OK; } + #endif /* VALKEY_ADAPTERS_LIBUV_H */ diff --git a/include/valkey/adapters/macosx.h b/include/valkey/adapters/macosx.h index 962e2b71..130021b1 100644 --- a/include/valkey/adapters/macosx.h +++ b/include/valkey/adapters/macosx.h @@ -149,14 +149,14 @@ static int valkeyMacOSAttachAdapter(valkeyAsyncContext *ac, void *loop) { } VALKEY_UNUSED -static int valkeyClusterMacOSAttach(valkeyClusterAsyncContext *acc, - CFRunLoopRef loop) { - if (acc == NULL || loop == NULL) { +static int valkeyClusterSetOptionUseMacOS(valkeyClusterOptions *options, + CFRunLoopRef loop) { + if (options == NULL || loop == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyMacOSAttachAdapter; - acc->attach_data = loop; + options->attach_fn = valkeyMacOSAttachAdapter; + options->attach_data = loop; return VALKEY_OK; } diff --git a/include/valkey/adapters/poll.h b/include/valkey/adapters/poll.h index 544a2532..cf32a3a7 100644 --- a/include/valkey/adapters/poll.h +++ b/include/valkey/adapters/poll.h @@ -202,12 +202,12 @@ static int valkeyPollAttachAdapter(valkeyAsyncContext *ac, VALKEY_UNUSED void *u } VALKEY_UNUSED -static int valkeyClusterPollAttach(valkeyClusterAsyncContext *acc) { - if (acc == NULL) { +static int valkeyClusterSetOptionUsePoll(valkeyClusterOptions *options) { + if (options == NULL) { return VALKEY_ERR; } - acc->attach_fn = valkeyPollAttachAdapter; + options->attach_fn = valkeyPollAttachAdapter; return VALKEY_OK; } diff --git a/include/valkey/cluster.h b/include/valkey/cluster.h index 9a7d283d..f560736e 100644 --- a/include/valkey/cluster.h +++ b/include/valkey/cluster.h @@ -44,18 +44,6 @@ #define VALKEY_ROLE_MASTER 1 #define VALKEY_ROLE_SLAVE 2 -/* Configuration flags */ -#define VALKEYCLUSTER_FLAG_NULL 0x0 -/* Flag to enable parsing of slave nodes. Currently not used, but the - information is added to its master node structure. */ -#define VALKEYCLUSTER_FLAG_ADD_SLAVE 0x1000 -/* Flag to enable routing table updates using the command 'cluster slots'. - * Default is the 'cluster nodes' command. */ -#define VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS 0x4000 -/* Flag specific to the async API which means that the user requested a - * client disconnect or free. */ -#define VALKEYCLUSTER_FLAG_DISCONNECTING 0x8000 - /* Events, for valkeyClusterSetEventCallback() */ #define VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED 1 #define VALKEYCLUSTER_EVENT_READY 2 @@ -157,33 +145,58 @@ typedef struct valkeyClusterNodeIterator { char opaque_data[VALKEY_NODE_ITERATOR_SIZE]; } valkeyClusterNodeIterator; +/* Configuration options: + * Enable slotmap updates using the command CLUSTER SLOTS. + * Default is the CLUSTER NODES command. */ +#define VALKEY_OPT_USE_CLUSTER_SLOTS 0x1000 +/* Enable parsing of replica nodes. Currently not used, but the + * information is added to its primary node structure. */ +#define VALKEY_OPT_USE_REPLICAS 0x2000 +/* Use a blocking slotmap update after an initial async connect. */ +#define VALKEY_OPT_BLOCKING_INITIAL_UPDATE 0x4000 + +typedef struct { + const char *initial_nodes; + int options; /* Bit field of VALKEY_OPT_xxx */ + const struct timeval *connect_timeout; /* Timeout value for connect, no timeout if NULL. */ + const struct timeval *command_timeout; /* Timeout value for commands, no timeout if NULL. */ + const char *username; /* Authentication username. */ + const char *password; /* Authentication password. */ + int max_retry_count; /* Allowed retry attempts. */ + + /* TLS set by valkeyClusterSetOptionEnableTLS. */ + void *tls; + int (*tls_init_fn)(struct valkeyContext *, struct valkeyTLSContext *); + + /* Common callbacks. */ + void (*event_callback)(const struct valkeyClusterContext *cc, int event, + void *privdata); + void *event_privdata; + + /* Synchronous API callbacks */ + void (*connect_callback)(const valkeyContext *c, + int status); + + /* Async API event engine adapter. */ + int (*attach_fn)(valkeyAsyncContext *ac, void *attach_data); + void *attach_data; + + /* Async API callbacks. */ + valkeyConnectCallback *async_connect_cb; + valkeyConnectCallbackNC *async_connect_nc_cb; /* non-const callback */ + valkeyDisconnectCallback *async_disconnect_cb; +} valkeyClusterOptions; + /* * Synchronous API */ -valkeyClusterContext *valkeyClusterConnect(const char *addrs, int flags); +valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options); +valkeyClusterContext *valkeyClusterConnect(const char *addrs); valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs, - const struct timeval tv, - int flags); -int valkeyClusterConnect2(valkeyClusterContext *cc); - -valkeyClusterContext *valkeyClusterContextInit(void); + const struct timeval tv); void valkeyClusterFree(valkeyClusterContext *cc); -/* Configuration options */ -int valkeyClusterSetOptionAddNodes(valkeyClusterContext *cc, const char *addrs); -int valkeyClusterSetOptionUsername(valkeyClusterContext *cc, - const char *username); -int valkeyClusterSetOptionPassword(valkeyClusterContext *cc, - const char *password); -int valkeyClusterSetOptionParseSlaves(valkeyClusterContext *cc); -int valkeyClusterSetOptionRouteUseSlots(valkeyClusterContext *cc); -int valkeyClusterSetOptionConnectTimeout(valkeyClusterContext *cc, - const struct timeval tv); -int valkeyClusterSetOptionTimeout(valkeyClusterContext *cc, - const struct timeval tv); -int valkeyClusterSetOptionMaxRetry(valkeyClusterContext *cc, - int max_retry_count); /* A hook for connect and reconnect attempts, e.g. for applying additional * socket options. This is called just after connect, before TLS handshake and * Valkey authentication. @@ -194,15 +207,18 @@ int valkeyClusterSetOptionMaxRetry(valkeyClusterContext *cc, * On failed connection attempt, this callback is called with `status` set to * `VALKEY_ERR`. The `err` field in the `valkeyContext` can be used to find out * the cause of the error. */ -int valkeyClusterSetConnectCallback(valkeyClusterContext *cc, - void(fn)(const valkeyContext *c, - int status)); +int valkeyClusterSetOptionConnectCallback(valkeyClusterOptions *options, + void(fn)(const valkeyContext *c, + int status)); /* A hook for events. */ -int valkeyClusterSetEventCallback(valkeyClusterContext *cc, - void(fn)(const valkeyClusterContext *cc, - int event, void *privdata), - void *privdata); +int valkeyClusterSetOptionEventCallback(valkeyClusterOptions *options, + void(fn)(const valkeyClusterContext *cc, + int event, void *privdata), + void *privdata); + +/* Options configurable in runtime. */ +int valkeyClusterSetOptionTimeout(valkeyClusterContext *cc, const struct timeval tv); /* Blocking * The following functions will block for a reply, or return NULL if there was @@ -269,22 +285,23 @@ valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc, * Asynchronous API */ -valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(void); +valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClusterOptions *options); +void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc); void valkeyClusterAsyncFree(valkeyClusterAsyncContext *acc); +valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOptions *options); +int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc); /* Connect an initiated context. */ + int valkeyClusterAsyncSetConnectCallback(valkeyClusterAsyncContext *acc, valkeyConnectCallback *fn); int valkeyClusterAsyncSetConnectCallbackNC(valkeyClusterAsyncContext *acc, valkeyConnectCallbackNC *fn); int valkeyClusterAsyncSetDisconnectCallback(valkeyClusterAsyncContext *acc, valkeyDisconnectCallback *fn); - -/* Connect and update slotmap, will block until complete. */ -valkeyClusterAsyncContext *valkeyClusterAsyncConnect(const char *addrs, - int flags); -/* Connect and update slotmap asynchronously using configured event engine. */ -int valkeyClusterAsyncConnect2(valkeyClusterAsyncContext *acc); -void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc); +int valkeyClusterAsyncSetEventCallback(valkeyClusterAsyncContext *acc, + void(fn)(const valkeyClusterContext *cc, + int event, void *privdata), + void *privdata); /* Commands */ int valkeyClusterAsyncCommand(valkeyClusterAsyncContext *acc, diff --git a/include/valkey/cluster_tls.h b/include/valkey/cluster_tls.h index a081f98f..96e04adf 100644 --- a/include/valkey/cluster_tls.h +++ b/include/valkey/cluster_tls.h @@ -40,7 +40,7 @@ extern "C" { /** * Configuration option to enable TLS negotiation on a context. */ -int valkeyClusterSetOptionEnableTLS(valkeyClusterContext *cc, +int valkeyClusterSetOptionEnableTLS(valkeyClusterOptions *options, valkeyTLSContext *tls); #ifdef __cplusplus diff --git a/include/valkey/valkey.h b/include/valkey/valkey.h index d6e4fc9c..e9e0e00b 100644 --- a/include/valkey/valkey.h +++ b/include/valkey/valkey.h @@ -169,6 +169,7 @@ enum valkeyConnectionType { #define VALKEY_OPT_PREFER_IPV4 0x20 /* Prefer IPv4 in DNS lookups. */ #define VALKEY_OPT_PREFER_IPV6 0x40 /* Prefer IPv6 in DNS lookups. */ #define VALKEY_OPT_PREFER_IP_UNSPEC (VALKEY_OPT_PREFER_IPV4 | VALKEY_OPT_PREFER_IPV6) +#define VALKEY_OPT_LAST_SA_OPTION 0x40 /* Last defined standalone option. */ /* In Unix systems a file descriptor is a regular signed int, with -1 * representing an invalid descriptor. In Windows it is a SOCKET diff --git a/src/cluster.c b/src/cluster.c index 0cefd10d..93818130 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -49,6 +49,14 @@ #include #include +/* Make sure standalone and cluster options don't overlap. */ +vk_static_assert(VALKEY_OPT_USE_CLUSTER_SLOTS > VALKEY_OPT_LAST_SA_OPTION); + +#define VALKEY_FLAG_USE_CLUSTER_SLOTS 0x1 +#define VALKEY_FLAG_PARSE_REPLICAS 0x2 +#define VALKEY_FLAG_DISCONNECTING 0x4 +#define VALKEY_FLAG_BLOCKING_INITIAL_UPDATE 0x8 + // Cluster errors are offset by 100 to be sufficiently out of range of // standard Valkey errors #define VALKEY_ERR_CLUSTER_TOO_MANY_RETRIES 100 @@ -101,6 +109,13 @@ static void cluster_slot_destroy(cluster_slot *slot); static int updateNodesAndSlotmap(valkeyClusterContext *cc, dict *nodes); static int updateSlotMapAsync(valkeyClusterAsyncContext *acc, valkeyAsyncContext *ac); +static int valkeyClusterSetOptionAddNodes(valkeyClusterContext *cc, const char *addrs); +static int valkeyClusterSetOptionUsername(valkeyClusterContext *cc, + const char *username); +static int valkeyClusterSetOptionPassword(valkeyClusterContext *cc, + const char *password); +static int valkeyClusterSetOptionConnectTimeout(valkeyClusterContext *cc, + const struct timeval tv); void listClusterNodeDestructor(void *val) { freeValkeyClusterNode(val); } @@ -654,7 +669,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { } slot = NULL; - } else if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { + } else if (cc->flags & VALKEY_FLAG_PARSE_REPLICAS) { slave = node_get_with_slots(cc, elem_ip, elem_port, VALKEY_ROLE_SLAVE); if (slave == NULL) { @@ -915,7 +930,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { dict *nodes = NULL; int slot_ranges_found = 0; - int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE; + int add_replicas = cc->flags & VALKEY_FLAG_PARSE_REPLICAS; dict *replicas = NULL; if (reply->type != VALKEY_REPLY_STRING) { @@ -1003,7 +1018,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { /* Sends CLUSTER SLOTS or CLUSTER NODES to the node with context c. */ static int clusterUpdateRouteSendCommand(valkeyClusterContext *cc, valkeyContext *c) { - const char *cmd = (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS ? + const char *cmd = (cc->flags & VALKEY_FLAG_USE_CLUSTER_SLOTS ? VALKEY_COMMAND_CLUSTER_SLOTS : VALKEY_COMMAND_CLUSTER_NODES); if (valkeyAppendCommand(c, cmd) != VALKEY_OK) { @@ -1035,7 +1050,7 @@ static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc, } dict *nodes; - if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) { + if (cc->flags & VALKEY_FLAG_USE_CLUSTER_SLOTS) { nodes = parse_cluster_slots(cc, reply); } else { nodes = parse_cluster_nodes(cc, reply); @@ -1232,7 +1247,7 @@ int valkeyClusterUpdateSlotmap(valkeyClusterContext *cc) { return VALKEY_ERR; } -valkeyClusterContext *valkeyClusterContextInit(void) { +static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions *options) { valkeyClusterContext *cc; cc = vk_calloc(1, sizeof(valkeyClusterContext)); @@ -1252,6 +1267,50 @@ valkeyClusterContext *valkeyClusterContextInit(void) { cc->requests->free = listCommandFree; cc->max_retry_count = CLUSTER_DEFAULT_MAX_RETRY_COUNT; + if (options->options & VALKEY_OPT_USE_CLUSTER_SLOTS) { + cc->flags |= VALKEY_FLAG_USE_CLUSTER_SLOTS; + } + if (options->options & VALKEY_OPT_USE_REPLICAS) { + cc->flags |= VALKEY_FLAG_PARSE_REPLICAS; + } + if (options->options & VALKEY_OPT_BLOCKING_INITIAL_UPDATE) { + cc->flags |= VALKEY_FLAG_BLOCKING_INITIAL_UPDATE; + } + if (options->max_retry_count > 0) { + cc->max_retry_count = options->max_retry_count; + } + if (options->initial_nodes != NULL && + valkeyClusterSetOptionAddNodes(cc, options->initial_nodes) != VALKEY_OK) { + return cc; /* err and errstr already set. */ + } + if (options->connect_timeout != NULL && + valkeyClusterSetOptionConnectTimeout(cc, *options->connect_timeout) != VALKEY_OK) { + return cc; /* err and errstr already set. */ + } + if (options->command_timeout != NULL && + valkeyClusterSetOptionTimeout(cc, *options->command_timeout) != VALKEY_OK) { + return cc; /* err and errstr already set. */ + } + if (options->username != NULL && + valkeyClusterSetOptionUsername(cc, options->username) != VALKEY_OK) { + return cc; /* err and errstr already set. */ + } + if (options->password != NULL && + valkeyClusterSetOptionPassword(cc, options->password) != VALKEY_OK) { + return cc; /* err and errstr already set. */ + } + if (options->connect_callback) { + cc->on_connect = options->connect_callback; + } + if (options->event_callback) { + cc->event_callback = options->event_callback; + cc->event_privdata = options->event_privdata; + } + if (options->tls) { + cc->tls = options->tls; + cc->tls_init_fn = options->tls_init_fn; + } + return cc; } @@ -1276,52 +1335,32 @@ void valkeyClusterFree(valkeyClusterContext *cc) { vk_free(cc); } -static valkeyClusterContext * -valkeyClusterConnectInternal(valkeyClusterContext *cc, const char *addrs) { - if (valkeyClusterSetOptionAddNodes(cc, addrs) != VALKEY_OK) { - return cc; - } - valkeyClusterUpdateSlotmap(cc); - return cc; -} - -valkeyClusterContext *valkeyClusterConnect(const char *addrs, int flags) { - valkeyClusterContext *cc; - - cc = valkeyClusterContextInit(); - +valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options) { + valkeyClusterContext *cc = valkeyClusterContextInit(options); if (cc == NULL) { return NULL; } + /* Only connect if options are ok. */ + if (cc->err == 0) { + valkeyClusterUpdateSlotmap(cc); + } + return cc; +} - cc->flags = flags; +valkeyClusterContext *valkeyClusterConnect(const char *addrs) { + valkeyClusterOptions options = {0}; + options.initial_nodes = addrs; - return valkeyClusterConnectInternal(cc, addrs); + return valkeyClusterConnectWithOptions(&options); } valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs, - const struct timeval tv, - int flags) { - valkeyClusterContext *cc; + const struct timeval tv) { + valkeyClusterOptions options = {0}; + options.initial_nodes = addrs; + options.connect_timeout = &tv; - cc = valkeyClusterContextInit(); - - if (cc == NULL) { - return NULL; - } - - cc->flags = flags; - - if (cc->connect_timeout == NULL) { - cc->connect_timeout = vk_malloc(sizeof(struct timeval)); - if (cc->connect_timeout == NULL) { - return NULL; - } - } - - memcpy(cc->connect_timeout, &tv, sizeof(struct timeval)); - - return valkeyClusterConnectInternal(cc, addrs); + return valkeyClusterConnectWithOptions(&options); } static int valkeyClusterSetOptionAddNode(valkeyClusterContext *cc, const char *addr) { @@ -1417,8 +1456,8 @@ static int valkeyClusterSetOptionAddNode(valkeyClusterContext *cc, const char *a return VALKEY_ERR; } -int valkeyClusterSetOptionAddNodes(valkeyClusterContext *cc, - const char *addrs) { +static int valkeyClusterSetOptionAddNodes(valkeyClusterContext *cc, + const char *addrs) { int ret; sds *address = NULL; int address_count = 0; @@ -1462,8 +1501,8 @@ int valkeyClusterSetOptionAddNodes(valkeyClusterContext *cc, * Disabled by default. Can be disabled again by providing an * empty string or a null pointer. */ -int valkeyClusterSetOptionUsername(valkeyClusterContext *cc, - const char *username) { +static int valkeyClusterSetOptionUsername(valkeyClusterContext *cc, + const char *username) { if (cc == NULL) { return VALKEY_ERR; } @@ -1488,8 +1527,8 @@ int valkeyClusterSetOptionUsername(valkeyClusterContext *cc, * Configure a password used when connecting to password-protected * Valkey instances. (See Valkey AUTH command) */ -int valkeyClusterSetOptionPassword(valkeyClusterContext *cc, - const char *password) { +static int valkeyClusterSetOptionPassword(valkeyClusterContext *cc, + const char *password) { if (cc == NULL) { return VALKEY_ERR; @@ -1511,30 +1550,8 @@ int valkeyClusterSetOptionPassword(valkeyClusterContext *cc, return VALKEY_OK; } -int valkeyClusterSetOptionParseSlaves(valkeyClusterContext *cc) { - - if (cc == NULL) { - return VALKEY_ERR; - } - - cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE; - - return VALKEY_OK; -} - -int valkeyClusterSetOptionRouteUseSlots(valkeyClusterContext *cc) { - - if (cc == NULL) { - return VALKEY_ERR; - } - - cc->flags |= VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS; - - return VALKEY_OK; -} - -int valkeyClusterSetOptionConnectTimeout(valkeyClusterContext *cc, - const struct timeval tv) { +static int valkeyClusterSetOptionConnectTimeout(valkeyClusterContext *cc, + const struct timeval tv) { if (cc == NULL) { return VALKEY_ERR; @@ -1614,35 +1631,6 @@ int valkeyClusterSetOptionTimeout(valkeyClusterContext *cc, return VALKEY_OK; } -int valkeyClusterSetOptionMaxRetry(valkeyClusterContext *cc, - int max_retry_count) { - if (cc == NULL || max_retry_count <= 0) { - return VALKEY_ERR; - } - - cc->max_retry_count = max_retry_count; - - return VALKEY_OK; -} - -int valkeyClusterConnect2(valkeyClusterContext *cc) { - - if (cc == NULL) { - return VALKEY_ERR; - } - - if (dictSize(cc->nodes) == 0) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "server address not configured"); - return VALKEY_ERR; - } - /* Clear a previously set shutdown flag since we allow a - * reconnection of an async context using this API (legacy). */ - cc->flags &= ~VALKEYCLUSTER_FLAG_DISCONNECTING; - - return valkeyClusterUpdateSlotmap(cc); -} - valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc, valkeyClusterNode *node) { valkeyContext *c = NULL; @@ -2112,23 +2100,23 @@ static int prepareCommand(valkeyClusterContext *cc, struct cmd *command) { return VALKEY_OK; } -int valkeyClusterSetConnectCallback(valkeyClusterContext *cc, - void(fn)(const valkeyContext *c, - int status)) { - if (cc->on_connect == NULL) { - cc->on_connect = fn; +int valkeyClusterSetOptionConnectCallback(valkeyClusterOptions *options, + void(fn)(const valkeyContext *c, + int status)) { + if (options->connect_callback == NULL) { + options->connect_callback = fn; return VALKEY_OK; } return VALKEY_ERR; } -int valkeyClusterSetEventCallback(valkeyClusterContext *cc, - void(fn)(const valkeyClusterContext *cc, - int event, void *privdata), - void *privdata) { - if (cc->event_callback == NULL) { - cc->event_callback = fn; - cc->event_privdata = privdata; +int valkeyClusterSetOptionEventCallback(valkeyClusterOptions *options, + void(fn)(const valkeyClusterContext *cc, + int event, void *privdata), + void *privdata) { + if (options->event_callback == NULL) { + options->event_callback = fn; + options->event_privdata = privdata; return VALKEY_OK; } return VALKEY_ERR; @@ -2800,11 +2788,11 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc, return ac; } -valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(void) { +valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOptions *options) { valkeyClusterContext *cc; valkeyClusterAsyncContext *acc; - cc = valkeyClusterContextInit(); + cc = valkeyClusterContextInit(options); if (cc == NULL) { return NULL; } @@ -2815,34 +2803,51 @@ valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(void) { return NULL; } + if (options->async_connect_cb != NULL) { + acc->onConnect = options->async_connect_cb; + } + if (options->async_connect_nc_cb != NULL) { + acc->onConnectNC = options->async_connect_nc_cb; + } + if (options->async_disconnect_cb != NULL) { + acc->onDisconnect = options->async_disconnect_cb; + } + if (options->attach_fn != NULL) { + acc->attach_fn = options->attach_fn; + acc->attach_data = options->attach_data; + } + return acc; } -valkeyClusterAsyncContext *valkeyClusterAsyncConnect(const char *addrs, - int flags) { - - valkeyClusterContext *cc; - valkeyClusterAsyncContext *acc; - - cc = valkeyClusterConnect(addrs, flags); - if (cc == NULL) { - return NULL; - } - - acc = valkeyClusterAsyncInitialize(cc); +valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClusterOptions *options) { + valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(options); if (acc == NULL) { - valkeyClusterFree(cc); return NULL; } + valkeyClusterAsyncConnect(acc); return acc; } -int valkeyClusterAsyncConnect2(valkeyClusterAsyncContext *acc) { +int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) { /* An attach function for an async event library is required. */ if (acc->attach_fn == NULL) { return VALKEY_ERR; } + + /* Clear a previously set shutdown flag to allow a + * reconnection of an async context using this API. */ + acc->cc->flags &= ~VALKEY_FLAG_DISCONNECTING; + + /* Blocking or non-blocking initial slotmap update. */ + if (acc->cc->flags & VALKEY_FLAG_BLOCKING_INITIAL_UPDATE) { + if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) { + valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr); + return VALKEY_ERR; + } + return VALKEY_OK; + } return updateSlotMapAsync(acc, NULL /*any node*/); } @@ -2874,6 +2879,18 @@ int valkeyClusterAsyncSetDisconnectCallback(valkeyClusterAsyncContext *acc, return VALKEY_ERR; } +int valkeyClusterAsyncSetEventCallback(valkeyClusterAsyncContext *acc, + void(fn)(const valkeyClusterContext *cc, + int event, void *privdata), + void *privdata) { + if (acc->cc->event_callback == NULL) { + acc->cc->event_callback = fn; + acc->cc->event_privdata = privdata; + return VALKEY_OK; + } + return VALKEY_ERR; +} + /* Reply callback function for CLUSTER SLOTS */ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r, void *privdata) { @@ -2969,7 +2986,7 @@ static int updateSlotMapAsync(valkeyClusterAsyncContext *acc, /* Don't allow concurrent slot map updates. */ return VALKEY_ERR; } - if (acc->cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING) { + if (acc->cc->flags & VALKEY_FLAG_DISCONNECTING) { /* No slot map updates during a cluster client disconnect. */ return VALKEY_ERR; } @@ -2988,7 +3005,7 @@ static int updateSlotMapAsync(valkeyClusterAsyncContext *acc, /* Send a command depending of config */ int status; - if (acc->cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) { + if (acc->cc->flags & VALKEY_FLAG_USE_CLUSTER_SLOTS) { status = valkeyAsyncCommand(ac, clusterSlotsReplyCallback, acc, VALKEY_COMMAND_CLUSTER_SLOTS); } else { @@ -3061,7 +3078,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r, } /* Skip retry handling when not expected, or during a client disconnect. */ - if (cad->retry_count == NO_RETRY || cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING) + if (cad->retry_count == NO_RETRY || cc->flags & VALKEY_FLAG_DISCONNECTING) goto done; error_type = cluster_reply_error_type(reply); @@ -3174,7 +3191,7 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, cc = acc->cc; /* Don't accept new commands when the client is about to disconnect. */ - if (cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING) { + if (cc->flags & VALKEY_FLAG_DISCONNECTING) { valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER, "disconnecting"); return VALKEY_ERR; } @@ -3255,7 +3272,7 @@ int valkeyClusterAsyncFormattedCommandToNode(valkeyClusterAsyncContext *acc, struct cmd *command = NULL; /* Don't accept new commands when the client is about to disconnect. */ - if (cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING) { + if (cc->flags & VALKEY_FLAG_DISCONNECTING) { valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER, "disconnecting"); return VALKEY_ERR; } @@ -3437,7 +3454,7 @@ void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc) { } cc = acc->cc; - cc->flags |= VALKEYCLUSTER_FLAG_DISCONNECTING; + cc->flags |= VALKEY_FLAG_DISCONNECTING; dictIterator di; dictInitIterator(&di, cc->nodes); @@ -3460,7 +3477,7 @@ void valkeyClusterAsyncFree(valkeyClusterAsyncContext *acc) { return; valkeyClusterContext *cc = acc->cc; - cc->flags |= VALKEYCLUSTER_FLAG_DISCONNECTING; + cc->flags |= VALKEY_FLAG_DISCONNECTING; valkeyClusterFree(cc); vk_free(acc); diff --git a/src/cluster_tls.c b/src/cluster_tls.c index 7a0df24b..47a0f01b 100644 --- a/src/cluster_tls.c +++ b/src/cluster_tls.c @@ -28,14 +28,12 @@ */ #include "cluster_tls.h" -int valkeyClusterSetOptionEnableTLS(valkeyClusterContext *cc, +int valkeyClusterSetOptionEnableTLS(valkeyClusterOptions *options, valkeyTLSContext *tls) { - if (cc == NULL || tls == NULL) { + if (options == NULL || tls == NULL) { return VALKEY_ERR; } - - cc->tls = tls; - cc->tls_init_fn = &valkeyInitiateTLSWithContext; - + options->tls = tls; + options->tls_init_fn = &valkeyInitiateTLSWithContext; return VALKEY_OK; } diff --git a/tests/clusterclient.c b/tests/clusterclient.c index 29b9bbf8..b70a4f25 100644 --- a/tests/clusterclient.c +++ b/tests/clusterclient.c @@ -88,18 +88,19 @@ int main(int argc, char **argv) { struct timeval timeout = {1, 500000}; // 1.5s - valkeyClusterContext *cc = valkeyClusterContextInit(); - valkeyClusterSetOptionAddNodes(cc, initnode); - valkeyClusterSetOptionConnectTimeout(cc, timeout); + valkeyClusterOptions options = {0}; + options.initial_nodes = initnode; + options.connect_timeout = &timeout; if (use_cluster_slots) { - valkeyClusterSetOptionRouteUseSlots(cc); + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; } if (show_events) { - valkeyClusterSetEventCallback(cc, eventCallback, NULL); + options.event_callback = eventCallback; } - if (valkeyClusterConnect2(cc) != VALKEY_OK) { - printf("Connect error: %s\n", cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + if (cc == NULL || cc->err) { + printf("Connect error: %s\n", cc ? cc->errstr : "OOM"); exit(2); } diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index f1449004..ef139d9f 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -247,33 +247,32 @@ int main(int argc, char **argv) { } const char *initnode = argv[optind]; struct timeval timeout = {0, 500000}; + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterSetOptionAddNodes(acc->cc, initnode); - valkeyClusterSetOptionTimeout(acc->cc, timeout); - valkeyClusterSetOptionConnectTimeout(acc->cc, timeout); - valkeyClusterSetOptionMaxRetry(acc->cc, 1); + valkeyClusterOptions options = {0}; + options.initial_nodes = initnode; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.connect_timeout = &timeout; + options.command_timeout = &timeout; + options.max_retry_count = 1; if (use_cluster_slots) { - valkeyClusterSetOptionRouteUseSlots(acc->cc); + options.options |= VALKEY_OPT_USE_CLUSTER_SLOTS; } if (show_events) { - valkeyClusterSetEventCallback(acc->cc, eventCallback, NULL); + options.event_callback = eventCallback; } if (show_connection_events) { - valkeyClusterAsyncSetConnectCallback(acc, connectCallback); - valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); + options.async_connect_cb = connectCallback; + options.async_disconnect_cb = disconnectCallback; } + valkeyClusterSetOptionUseLibevent(&options, base); - if (valkeyClusterConnect2(acc->cc) != VALKEY_OK) { - printf("Connect error: %s\n", acc->cc->errstr); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + if (acc == NULL || acc->err != 0) { + printf("Connect error: %s\n", acc ? acc->errstr : "OOM"); exit(2); } - struct event_base *base = event_base_new(); - int status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); - /* Schedule a read from stdin and send next command */ event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); diff --git a/tests/clusterclient_reconnect_async.c b/tests/clusterclient_reconnect_async.c index 05e973bf..1ed43869 100644 --- a/tests/clusterclient_reconnect_async.c +++ b/tests/clusterclient_reconnect_async.c @@ -27,16 +27,15 @@ void connectToValkey(valkeyClusterAsyncContext *acc) { /* reset context in case of reconnect */ valkeyClusterAsyncDisconnect(acc); - int status = valkeyClusterConnect2(acc->cc); - if (status == VALKEY_OK) { + if (valkeyClusterAsyncConnect(acc) == VALKEY_OK) { // cluster mode - } else if (acc->cc->err && - strcmp(acc->cc->errstr, VALKEY_ENOCLUSTER) == 0) { + } else if (acc->err && + strcmp(acc->errstr, VALKEY_ENOCLUSTER) == 0) { printf("[no cluster]\n"); - acc->cc->err = 0; - memset(acc->cc->errstr, '\0', strlen(acc->cc->errstr)); + acc->err = 0; + memset(acc->errstr, '\0', strlen(acc->errstr)); } else { - printf("Connect error: %s\n", acc->cc->errstr); + printf("Connect error: %s\n", acc->errstr); exit(-1); } } @@ -95,15 +94,16 @@ int main(int argc, char **argv) { exit(1); } const char *initnode = argv[1]; + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterSetOptionAddNodes(acc->cc, initnode); - valkeyClusterSetOptionRouteUseSlots(acc->cc); + valkeyClusterOptions options = {0}; + options.initial_nodes = initnode; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - int status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); + assert(acc); connectToValkey(acc); // schedule reading from stdin and sending next command diff --git a/tests/ct_async.c b/tests/ct_async.c index 99b4d438..45511682 100644 --- a/tests/ct_async.c +++ b/tests/ct_async.c @@ -68,34 +68,22 @@ void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) { } int main(void) { + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - - int status; - status = valkeyClusterAsyncSetConnectCallback(acc, connectCallback); - assert(status == VALKEY_OK); - status = valkeyClusterAsyncSetConnectCallback(acc, connectCallback); - assert(status == VALKEY_ERR); /* Re-registration not accepted */ - status = valkeyClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC); - assert(status == VALKEY_ERR); /* Re-registration not accepted */ - - status = valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); - assert(status == VALKEY_OK); - status = valkeyClusterSetEventCallback(acc->cc, eventCallback, acc); - assert(status == VALKEY_OK); - status = valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - assert(status == VALKEY_OK); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.async_connect_cb = connectCallback; + options.async_disconnect_cb = disconnectCallback; + valkeyClusterSetOptionUseLibevent(&options, base); - /* Expect error when connecting without an attached event library. */ - status = valkeyClusterAsyncConnect2(acc); - assert(status == VALKEY_ERR); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); + assert(acc); - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); + /* Set an event callback that uses acc as privdata */ + int status = valkeyClusterAsyncSetEventCallback(acc, eventCallback, acc); assert(status == VALKEY_OK); - status = valkeyClusterAsyncConnect2(acc); + status = valkeyClusterAsyncConnect(acc); assert(status == VALKEY_OK); event_base_dispatch(base); diff --git a/tests/ct_async_glib.c b/tests/ct_async_glib.c index 8aafca4f..a8f9c124 100644 --- a/tests/ct_async_glib.c +++ b/tests/ct_async_glib.c @@ -41,17 +41,18 @@ int main(int argc, char **argv) { GMainContext *context = NULL; mainloop = g_main_loop_new(context, FALSE); - valkeyClusterAsyncContext *acc = - valkeyClusterAsyncConnect(CLUSTER_NODE, VALKEYCLUSTER_FLAG_NULL); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.async_connect_cb = connectCallback; + options.async_disconnect_cb = disconnectCallback; + valkeyClusterSetOptionUseGlib(&options, context); + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); assert(acc); ASSERT_MSG(acc->err == 0, acc->errstr); - int status = valkeyClusterGlibAttach(acc, context); - assert(status == VALKEY_OK); - - valkeyClusterAsyncSetConnectCallback(acc, connectCallback); - valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); - + int status; status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"id", "SET key value"); ASSERT_MSG(status == VALKEY_OK, acc->errstr); diff --git a/tests/ct_async_libev.c b/tests/ct_async_libev.c index f6522475..47a055d9 100644 --- a/tests/ct_async_libev.c +++ b/tests/ct_async_libev.c @@ -35,18 +35,18 @@ int main(int argc, char **argv) { UNUSED(argc); UNUSED(argv); - valkeyClusterAsyncContext *acc = - valkeyClusterAsyncConnect(CLUSTER_NODE, VALKEYCLUSTER_FLAG_NULL); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.async_connect_cb = connectCallback; + options.async_disconnect_cb = disconnectCallback; + valkeyClusterSetOptionUseLibev(&options, EV_DEFAULT); + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); assert(acc); ASSERT_MSG(acc->err == 0, acc->errstr); int status; - status = valkeyClusterLibevAttach(acc, EV_DEFAULT); - assert(status == VALKEY_OK); - - valkeyClusterAsyncSetConnectCallback(acc, connectCallback); - valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); - status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"ID", "SET key value"); ASSERT_MSG(status == VALKEY_OK, acc->errstr); diff --git a/tests/ct_async_libuv.c b/tests/ct_async_libuv.c index 4bc0b5bb..668fae74 100644 --- a/tests/ct_async_libuv.c +++ b/tests/ct_async_libuv.c @@ -36,19 +36,20 @@ int main(int argc, char **argv) { UNUSED(argc); UNUSED(argv); - valkeyClusterAsyncContext *acc = - valkeyClusterAsyncConnect(CLUSTER_NODE, VALKEYCLUSTER_FLAG_NULL); + uv_loop_t *loop = uv_default_loop(); + + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.async_connect_cb = connectCallback; + options.async_disconnect_cb = disconnectCallback; + valkeyClusterSetOptionUseLibuv(&options, loop); + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); assert(acc); ASSERT_MSG(acc->err == 0, acc->errstr); int status; - uv_loop_t *loop = uv_default_loop(); - status = valkeyClusterLibuvAttach(acc, loop); - assert(status == VALKEY_OK); - - valkeyClusterAsyncSetConnectCallback(acc, connectCallback); - valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback); - status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"ID", "SET key value"); ASSERT_MSG(status == VALKEY_OK, acc->errstr); diff --git a/tests/ct_commands.c b/tests/ct_commands.c index 2d462bf7..cf3e5e35 100644 --- a/tests/ct_commands.c +++ b/tests/ct_commands.c @@ -462,15 +462,12 @@ void test_multi(valkeyClusterContext *cc) { int main(void) { struct timeval timeout = {0, 500000}; - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.connect_timeout = &timeout; - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - valkeyClusterSetOptionConnectTimeout(cc, timeout); - - int status; - status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); load_valkey_version(cc); test_bitfield(cc); diff --git a/tests/ct_connection.c b/tests/ct_connection.c index ef8f35dc..ac7020a2 100644 --- a/tests/ct_connection.c +++ b/tests/ct_connection.c @@ -29,15 +29,13 @@ void reset_counters(void) { // Connecting to a password protected cluster and // providing a correct password. void test_password_ok(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionPassword(cc, CLUSTER_PASSWORD); - valkeyClusterSetConnectCallback(cc, connect_callback); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.password = CLUSTER_PASSWORD; + options.connect_callback = connect_callback; - int status; - status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); assert(connect_success_counter == 1); // for CLUSTER NODES load_valkey_version(cc); assert(connect_success_counter == 2); // for checking valkey version @@ -58,14 +56,12 @@ void test_password_ok(void) { // Connecting to a password protected cluster and // providing wrong password. void test_password_wrong(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionPassword(cc, "faultypass"); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.password = "faultypass"; - int status; - status = valkeyClusterConnect2(cc); - assert(status == VALKEY_ERR); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + assert(cc); assert(cc->err == VALKEY_ERR_OTHER); if (valkey_version_less_than(6, 0)) @@ -79,14 +75,12 @@ void test_password_wrong(void) { // Connecting to a password protected cluster and // not providing any password. void test_password_missing(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE_WITH_PASSWORD); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + // No password set. - // A password is not configured.. - int status; - status = valkeyClusterConnect2(cc); - assert(status == VALKEY_ERR); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + assert(cc); assert(cc->err == VALKEY_ERR_OTHER); assert(strncmp(cc->errstr, "NOAUTH", 6) == 0); @@ -101,50 +95,13 @@ void test_username_ok(void) { return; // Connect to the cluster using username and password - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionUsername(cc, CLUSTER_USERNAME); - valkeyClusterSetOptionPassword(cc, CLUSTER_PASSWORD); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.username = CLUSTER_USERNAME; + options.password = CLUSTER_PASSWORD; - int ret = valkeyClusterConnect2(cc); - ASSERT_MSG(ret == VALKEY_OK, cc->errstr); - - // Test connection - valkeyReply *reply = valkeyClusterCommand(cc, "SET key1 Hello"); - CHECK_REPLY_OK(cc, reply); - freeReplyObject(reply); - - valkeyClusterFree(cc); -} - -// Test of disabling the use of username after it was enabled. -void test_username_disabled(void) { - if (valkey_version_less_than(6, 0)) - return; - - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionUsername(cc, "missing-user"); - valkeyClusterSetOptionPassword(cc, CLUSTER_PASSWORD); - - // Connect using 'AUTH ' should fail - int ret = valkeyClusterConnect2(cc); - assert(ret == VALKEY_ERR); - assert(cc->err == VALKEY_ERR_OTHER); - assert(strncmp(cc->errstr, "WRONGPASS invalid username-password pair", - 40) == 0); - - // Disable use of username (2 alternatives) - ret = valkeyClusterSetOptionUsername(cc, NULL); - ASSERT_MSG(ret == VALKEY_OK, cc->errstr); - ret = valkeyClusterSetOptionUsername(cc, ""); - ASSERT_MSG(ret == VALKEY_OK, cc->errstr); - - // Connect using 'AUTH ' should pass - ret = valkeyClusterConnect2(cc); - ASSERT_MSG(ret == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); // Test connection valkeyReply *reply = valkeyClusterCommand(cc, "SET key1 Hello"); @@ -156,23 +113,20 @@ void test_username_disabled(void) { // Connect and handle two clusters simultaneously void test_multicluster(void) { - int ret; valkeyReply *reply; // Connect to first cluster - valkeyClusterContext *cc1 = valkeyClusterContextInit(); + valkeyClusterContext *cc1 = valkeyClusterConnect(CLUSTER_NODE); assert(cc1); - valkeyClusterSetOptionAddNodes(cc1, CLUSTER_NODE); - ret = valkeyClusterConnect2(cc1); - ASSERT_MSG(ret == VALKEY_OK, cc1->errstr); + ASSERT_MSG(cc1->err == 0, cc1->errstr); // Connect to second cluster - valkeyClusterContext *cc2 = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.password = CLUSTER_PASSWORD; + valkeyClusterContext *cc2 = valkeyClusterConnectWithOptions(&options); assert(cc2); - valkeyClusterSetOptionAddNodes(cc2, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionPassword(cc2, CLUSTER_PASSWORD); - ret = valkeyClusterConnect2(cc2); - ASSERT_MSG(ret == VALKEY_OK, cc2->errstr); + ASSERT_MSG(cc2->err == 0, cc2->errstr); // Set keys differently in clusters reply = valkeyClusterCommand(cc1, "SET key Hello1"); @@ -207,16 +161,14 @@ void test_multicluster(void) { void test_connect_timeout(void) { struct timeval timeout = {0, 200000}; - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - /* Configure a non-routable IP address and a timeout */ - valkeyClusterSetOptionAddNodes(cc, "192.168.0.0:7000"); - valkeyClusterSetOptionConnectTimeout(cc, timeout); - valkeyClusterSetConnectCallback(cc, connect_callback); + valkeyClusterOptions options = {0}; + options.initial_nodes = "192.168.0.0:7000"; + options.connect_timeout = &timeout; + options.connect_callback = connect_callback; - int status = valkeyClusterConnect2(cc); - assert(status == VALKEY_ERR); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + assert(cc); assert(cc->err == VALKEY_ERR_IO); assert(strcmp(cc->errstr, "Connection timed out") == 0); assert(connect_success_counter == 0); @@ -230,13 +182,12 @@ void test_connect_timeout(void) { void test_command_timeout(void) { struct timeval timeout = {0, 10000}; - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - valkeyClusterSetOptionTimeout(cc, timeout); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.command_timeout = &timeout; - int status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); valkeyClusterNodeIterator ni; valkeyClusterInitNodeIterator(&ni, cc); @@ -263,14 +214,8 @@ void test_command_timeout(void) { /* Connect and configure a command timeout while connected. */ void test_command_timeout_set_while_connected(void) { - struct timeval timeout = {0, 10000}; - - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - - int status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnect(CLUSTER_NODE); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); valkeyClusterNodeIterator ni; valkeyClusterInitNodeIterator(&ni, cc); @@ -283,6 +228,7 @@ void test_command_timeout_set_while_connected(void) { freeReplyObject(reply); /* Set command timeout while connected */ + struct timeval timeout = {0, 10000}; valkeyClusterSetOptionTimeout(cc, timeout); reply = valkeyClusterCommandToNode(cc, node, "DEBUG SLEEP 0.2"); @@ -343,26 +289,23 @@ void commandCallback(valkeyClusterAsyncContext *cc, void *r, void *privdata) { // Connecting to a password protected cluster using // the async API, providing correct password. void test_async_password_ok(void) { - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionPassword(acc->cc, CLUSTER_PASSWORD); - struct event_base *base = event_base_new(); - valkeyClusterLibeventAttach(acc, base); - int ret; - ret = valkeyClusterConnect2(acc->cc); - assert(ret == VALKEY_OK); - assert(acc->err == 0); - assert(acc->cc->err == 0); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.password = CLUSTER_PASSWORD; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); // Test connection ExpectedResult r = { .type = VALKEY_REPLY_STATUS, .str = "OK", .disconnect = true}; - ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); + int ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); assert(ret == VALKEY_OK); event_base_dispatch(base); @@ -374,16 +317,17 @@ void test_async_password_ok(void) { /* Connect to a password protected cluster using the wrong password. An eventloop is not attached since it is not needed is this case. */ void test_async_password_wrong(void) { - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionPassword(acc->cc, "faultypass"); + struct event_base *base = event_base_new(); - int ret; - ret = valkeyClusterConnect2(acc->cc); - assert(ret == VALKEY_ERR); - assert(acc->err == VALKEY_OK); // TODO: This must be wrong! - assert(acc->cc->err == VALKEY_ERR_OTHER); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.password = "faultypass"; + valkeyClusterSetOptionUseLibevent(&options, base); + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc); + assert(acc->err == VALKEY_ERR_OTHER); if (valkey_version_less_than(6, 0)) assert(strcmp(acc->cc->errstr, "ERR invalid password") == 0); else @@ -391,79 +335,80 @@ void test_async_password_wrong(void) { // No connection ExpectedResult r; - ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); + int ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); assert(ret == VALKEY_ERR); assert(acc->err == VALKEY_ERR_OTHER); assert(strcmp(acc->errstr, "slotmap not available") == 0); valkeyClusterAsyncFree(acc); + event_base_free(base); } /* Connect to a password protected cluster without providing a password. An eventloop is not attached since it is not needed is this case. */ void test_async_password_missing(void) { - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE_WITH_PASSWORD); + struct event_base *base = event_base_new(); + + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); // Password not configured - int ret; - ret = valkeyClusterConnect2(acc->cc); - assert(ret == VALKEY_ERR); - assert(acc->err == VALKEY_OK); // TODO: This must be wrong! - assert(acc->cc->err == VALKEY_ERR_OTHER); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc); + assert(acc->err == VALKEY_ERR_OTHER); assert(strncmp(acc->cc->errstr, "NOAUTH", 6) == 0); // No connection ExpectedResult r; - ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); + int ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); assert(ret == VALKEY_ERR); assert(acc->err == VALKEY_ERR_OTHER); assert(strcmp(acc->errstr, "slotmap not available") == 0); valkeyClusterAsyncFree(acc); + event_base_free(base); } // Connect to a cluster and authenticate using username and password void test_async_username_ok(void) { if (valkey_version_less_than(6, 0)) return; + struct event_base *base = event_base_new(); // Connect to the cluster using username and password - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionUsername(acc->cc, "missing-user"); - valkeyClusterSetOptionPassword(acc->cc, CLUSTER_PASSWORD); - - struct event_base *base = event_base_new(); - valkeyClusterLibeventAttach(acc, base); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + options.username = "missing-user"; + options.password = CLUSTER_PASSWORD; + valkeyClusterSetOptionUseLibevent(&options, base); // Connect using wrong username should fail - int ret = valkeyClusterConnect2(acc->cc); - assert(ret == VALKEY_ERR); - assert(acc->cc->err == VALKEY_ERR_OTHER); - assert(strncmp(acc->cc->errstr, "WRONGPASS invalid username-password pair", + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc); + assert(acc->err == VALKEY_ERR_OTHER); + assert(strncmp(acc->errstr, "WRONGPASS invalid username-password pair", 40) == 0); + valkeyClusterAsyncFree(acc); // Set correct username - ret = valkeyClusterSetOptionUsername(acc->cc, CLUSTER_USERNAME); - ASSERT_MSG(ret == VALKEY_OK, acc->cc->errstr); + options.username = CLUSTER_USERNAME; // Connect using correct username should pass - ret = valkeyClusterConnect2(acc->cc); - assert(ret == VALKEY_OK); + acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc); assert(acc->err == 0); - assert(acc->cc->err == 0); // Test connection ExpectedResult r = { .type = VALKEY_REPLY_STATUS, .str = "OK", .disconnect = true}; - ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); + int ret = valkeyClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello"); assert(ret == VALKEY_OK); event_base_dispatch(base); @@ -474,40 +419,34 @@ void test_async_username_ok(void) { // Connect and handle two clusters simultaneously using the async API void test_async_multicluster(void) { - int ret; - - valkeyClusterAsyncContext *acc1 = valkeyClusterAsyncContextInit(); - assert(acc1); - valkeyClusterAsyncSetConnectCallback(acc1, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc1, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc1->cc, CLUSTER_NODE); - - valkeyClusterAsyncContext *acc2 = valkeyClusterAsyncContextInit(); - assert(acc2); - valkeyClusterAsyncSetConnectCallback(acc2, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc2, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc2->cc, CLUSTER_NODE_WITH_PASSWORD); - valkeyClusterSetOptionPassword(acc2->cc, CLUSTER_PASSWORD); - struct event_base *base = event_base_new(); - valkeyClusterLibeventAttach(acc1, base); - valkeyClusterLibeventAttach(acc2, base); + + valkeyClusterOptions options1 = {0}; + options1.initial_nodes = CLUSTER_NODE; + options1.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options1.async_connect_cb = callbackExpectOk; + options1.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options1, base); // Connect to first cluster - ret = valkeyClusterConnect2(acc1->cc); - assert(ret == VALKEY_OK); - assert(acc1->err == 0); - assert(acc1->cc->err == 0); + valkeyClusterAsyncContext *acc1 = valkeyClusterAsyncConnectWithOptions(&options1); + ASSERT_MSG(acc1 && acc1->err == 0, acc1 ? acc1->errstr : "OOM"); + + valkeyClusterOptions options2 = {0}; + options2.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options2.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options2.password = CLUSTER_PASSWORD; + options2.async_connect_cb = callbackExpectOk; + options2.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options2, base); // Connect to second cluster - ret = valkeyClusterConnect2(acc2->cc); - assert(ret == VALKEY_OK); - assert(acc2->err == 0); - assert(acc2->cc->err == 0); + valkeyClusterAsyncContext *acc2 = valkeyClusterAsyncConnectWithOptions(&options2); + ASSERT_MSG(acc2 && acc2->err == 0, acc2 ? acc2->errstr : "OOM"); // Set keys differently in clusters ExpectedResult r1 = {.type = VALKEY_REPLY_STATUS, .str = "OK"}; - ret = valkeyClusterAsyncCommand(acc1, commandCallback, &r1, "SET key A"); + int ret = valkeyClusterAsyncCommand(acc1, commandCallback, &r1, "SET key A"); assert(ret == VALKEY_OK); ExpectedResult r2 = {.type = VALKEY_REPLY_STATUS, .str = "OK"}; @@ -540,22 +479,20 @@ void test_async_multicluster(void) { /* Connect to a non-routable address which results in a connection timeout. */ void test_async_connect_timeout(void) { + struct event_base *base = event_base_new(); struct timeval timeout = {0, 200000}; - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - + valkeyClusterOptions options = {0}; /* Configure a non-routable IP address and a timeout */ - valkeyClusterSetOptionAddNodes(acc->cc, "192.168.0.0:7000"); - valkeyClusterSetOptionConnectTimeout(acc->cc, timeout); + options.initial_nodes = "192.168.0.0:7000"; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.connect_timeout = &timeout; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - valkeyClusterLibeventAttach(acc, base); - - int status = valkeyClusterConnect2(acc->cc); - assert(status == VALKEY_ERR); - assert(acc->cc->err == VALKEY_ERR_IO); - assert(strcmp(acc->cc->errstr, "Connection timed out") == 0); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc); + assert(acc->err == VALKEY_ERR_IO); + assert(strcmp(acc->errstr, "Connection timed out") == 0); event_base_dispatch(base); @@ -565,19 +502,17 @@ void test_async_connect_timeout(void) { /* Connect using a pre-configured command timeout */ void test_async_command_timeout(void) { + struct event_base *base = event_base_new(); struct timeval timeout = {0, 10000}; - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - valkeyClusterSetOptionTimeout(acc->cc, timeout); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.command_timeout = &timeout; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - valkeyClusterLibeventAttach(acc, base); - - int status = valkeyClusterConnect2(acc->cc); - assert(status == VALKEY_OK); - assert(acc->cc->err == 0); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; valkeyClusterInitNodeIterator(&ni, acc->cc); @@ -587,8 +522,8 @@ void test_async_command_timeout(void) { /* Simulate a command timeout and expect a timeout error */ ExpectedResult r = { .noreply = true, .errstr = "Timeout", .disconnect = true}; - status = valkeyClusterAsyncCommandToNode(acc, node, commandCallback, &r, - "DEBUG SLEEP 0.2"); + int status = valkeyClusterAsyncCommandToNode(acc, node, commandCallback, &r, + "DEBUG SLEEP 0.2"); assert(status == VALKEY_OK); event_base_dispatch(base); @@ -603,7 +538,6 @@ int main(void) { test_password_wrong(); test_password_missing(); test_username_ok(); - test_username_disabled(); test_multicluster(); test_connect_timeout(); test_command_timeout(); diff --git a/tests/ct_connection_ipv6.c b/tests/ct_connection_ipv6.c index aeee635b..39711f3d 100644 --- a/tests/ct_connection_ipv6.c +++ b/tests/ct_connection_ipv6.c @@ -10,23 +10,15 @@ // Successful connection an IPv6 cluster void test_successful_ipv6_connection(void) { - - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - - int status; struct timeval timeout = {0, 500000}; // 0.5s - status = valkeyClusterSetOptionConnectTimeout(cc, timeout); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); - - status = valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE_IPV6); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); - status = valkeyClusterSetOptionRouteUseSlots(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE_IPV6; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.connect_timeout = &timeout; - status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); valkeyReply *reply; reply = (valkeyReply *)valkeyClusterCommand(cc, "SET key_ipv6 value"); diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 53b8a718..277a0a4e 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -114,75 +114,34 @@ void test_alloc_failure_handling(void) { // Override allocators valkeySetAllocators(&ha); - // Context init + struct timeval timeout = {0, 500000}; + + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.connect_timeout = &timeout; + options.command_timeout = &timeout; + options.options = VALKEY_OPT_USE_REPLICAS; + + // Connect valkeyClusterContext *cc; { for (int i = 0; i < 3; ++i) { successfulAllocations = i; - cc = valkeyClusterContextInit(); + cc = valkeyClusterConnectWithOptions(&options); assert(cc == NULL); } - successfulAllocations = 3; - cc = valkeyClusterContextInit(); - assert(cc); - } - cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE; - - // Add nodes - { - for (int i = 0; i < 9; ++i) { - prepare_allocation_test(cc, i); - result = valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - assert(result == VALKEY_ERR); - ASSERT_STR_EQ(cc->errstr, "Out of memory"); - } - - prepare_allocation_test(cc, 9); - result = valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - assert(result == VALKEY_OK); - } - - // Set connect timeout - { - struct timeval timeout = {0, 500000}; - - prepare_allocation_test(cc, 0); - result = valkeyClusterSetOptionConnectTimeout(cc, timeout); - assert(result == VALKEY_ERR); - ASSERT_STR_EQ(cc->errstr, "Out of memory"); - - prepare_allocation_test(cc, 1); - result = valkeyClusterSetOptionConnectTimeout(cc, timeout); - assert(result == VALKEY_OK); - } - - // Set request timeout - { - struct timeval timeout = {0, 500000}; - - prepare_allocation_test(cc, 0); - result = valkeyClusterSetOptionTimeout(cc, timeout); - assert(result == VALKEY_ERR); - ASSERT_STR_EQ(cc->errstr, "Out of memory"); - - prepare_allocation_test(cc, 1); - result = valkeyClusterSetOptionTimeout(cc, timeout); - assert(result == VALKEY_OK); - } - - // Connect - { - for (int i = 0; i < 88; ++i) { - prepare_allocation_test(cc, i); - result = valkeyClusterConnect2(cc); - assert(result == VALKEY_ERR); + for (int i = 3; i < 102; ++i) { + successfulAllocations = i; + cc = valkeyClusterConnectWithOptions(&options); + assert(cc); ASSERT_STR_EQ(cc->errstr, "Out of memory"); + valkeyClusterFree(cc); } - prepare_allocation_test(cc, 88); - result = valkeyClusterConnect2(cc); - assert(result == VALKEY_OK); + successfulAllocations = 102; + cc = valkeyClusterConnectWithOptions(&options); + assert(cc && cc->err == 0); } // Command @@ -484,64 +443,39 @@ void test_alloc_failure_handling_async(void) { // Override allocators valkeySetAllocators(&ha); - // Context init - valkeyClusterAsyncContext *acc; - { - for (int i = 0; i < 4; ++i) { - successfulAllocations = 0; - acc = valkeyClusterAsyncContextInit(); - assert(acc == NULL); - } - successfulAllocations = 4; - acc = valkeyClusterAsyncContextInit(); - assert(acc); - } - acc->cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE; + struct event_base *base = event_base_new(); + assert(base); - // Set callbacks - { - prepare_allocation_test_async(acc, 0); - result = valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - assert(result == VALKEY_OK); - result = valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - assert(result == VALKEY_OK); - } + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + options.options = VALKEY_OPT_USE_REPLICAS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + valkeyClusterSetOptionUseLibevent(&options, base); - // Add nodes + // Connect + valkeyClusterAsyncContext *acc; { - for (int i = 0; i < 9; ++i) { - prepare_allocation_test(acc->cc, i); - result = valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - assert(result == VALKEY_ERR); - ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); + for (int i = 0; i < 13; ++i) { + successfulAllocations = i; + acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc == NULL); } - prepare_allocation_test(acc->cc, 9); - result = valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - assert(result == VALKEY_OK); - } - - // Connect - { - for (int i = 0; i < 86; ++i) { - prepare_allocation_test(acc->cc, i); - result = valkeyClusterConnect2(acc->cc); - assert(result == VALKEY_ERR); - ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); + for (int i = 13; i < 99; ++i) { + successfulAllocations = i; + acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_STR_EQ(acc->errstr, "Out of memory"); + assert(acc != NULL); + valkeyClusterAsyncFree(acc); } - prepare_allocation_test(acc->cc, 86); - result = valkeyClusterConnect2(acc->cc); - assert(result == VALKEY_OK); + successfulAllocations = 99; + acc = valkeyClusterAsyncConnectWithOptions(&options); + assert(acc && acc->err == 0); } - struct event_base *base = event_base_new(); - assert(base); - - successfulAllocations = 0; - result = valkeyClusterLibeventAttach(acc, base); - assert(result == VALKEY_OK); - // Async command 1 ExpectedResult r1 = {.type = VALKEY_REPLY_STATUS, .str = "OK"}; { diff --git a/tests/ct_pipeline.c b/tests/ct_pipeline.c index d5a062dd..c1a3b122 100644 --- a/tests/ct_pipeline.c +++ b/tests/ct_pipeline.c @@ -12,16 +12,13 @@ // Test of two pipelines using sync API void test_pipeline(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; - int status; - status = valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); - - status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); + int status; status = valkeyClusterAppendCommand(cc, "SET foo one"); ASSERT_MSG(status == VALKEY_OK, cc->errstr); status = valkeyClusterAppendCommand(cc, "SET bar two"); @@ -91,20 +88,19 @@ void commandCallback(valkeyClusterAsyncContext *cc, void *r, void *privdata) { // nature of an event loop. Therefore, unlike the synchronous API, there is only // a single way to send commands. void test_async_pipeline(void) { - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); + struct event_base *base = event_base_new(); - int status; - status = valkeyClusterConnect2(acc->cc); - ASSERT_MSG(status == VALKEY_OK, acc->errstr); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); + int status; ExpectedResult r1 = {.type = VALKEY_REPLY_STATUS, .str = "OK"}; status = valkeyClusterAsyncCommand(acc, commandCallback, &r1, "SET foo six"); diff --git a/tests/ct_specific_nodes.c b/tests/ct_specific_nodes.c index 22223be7..5fb4ca14 100644 --- a/tests/ct_specific_nodes.c +++ b/tests/ct_specific_nodes.c @@ -314,21 +314,19 @@ void commandCallback(valkeyClusterAsyncContext *cc, void *r, void *privdata) { } void test_async_to_single_node(void) { - int status; + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - valkeyClusterSetOptionMaxRetry(acc->cc, 1); - valkeyClusterSetOptionRouteUseSlots(acc->cc); - status = valkeyClusterConnect2(acc->cc); - ASSERT_MSG(status == VALKEY_OK, acc->errstr); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.max_retry_count = 1; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; valkeyClusterInitNodeIterator(&ni, acc->cc); @@ -336,8 +334,8 @@ void test_async_to_single_node(void) { assert(node); ExpectedResult r1 = {.type = VALKEY_REPLY_INTEGER, .disconnect = true}; - status = valkeyClusterAsyncCommandToNode(acc, node, commandCallback, &r1, - "DBSIZE"); + int status = valkeyClusterAsyncCommandToNode(acc, node, commandCallback, &r1, + "DBSIZE"); ASSERT_MSG(status == VALKEY_OK, acc->errstr); event_base_dispatch(base); @@ -347,21 +345,19 @@ void test_async_to_single_node(void) { } void test_async_formatted_to_single_node(void) { - int status; + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - valkeyClusterSetOptionMaxRetry(acc->cc, 1); - valkeyClusterSetOptionRouteUseSlots(acc->cc); - status = valkeyClusterConnect2(acc->cc); - ASSERT_MSG(status == VALKEY_OK, acc->errstr); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.max_retry_count = 1; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; valkeyClusterInitNodeIterator(&ni, acc->cc); @@ -370,7 +366,7 @@ void test_async_formatted_to_single_node(void) { ExpectedResult r1 = {.type = VALKEY_REPLY_INTEGER, .disconnect = true}; char command[] = "*1\r\n$6\r\nDBSIZE\r\n"; - status = valkeyClusterAsyncFormattedCommandToNode( + int status = valkeyClusterAsyncFormattedCommandToNode( acc, node, commandCallback, &r1, command, strlen(command)); ASSERT_MSG(status == VALKEY_OK, acc->errstr); @@ -381,21 +377,19 @@ void test_async_formatted_to_single_node(void) { } void test_async_command_argv_to_single_node(void) { - int status; + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - valkeyClusterSetOptionMaxRetry(acc->cc, 1); - valkeyClusterSetOptionRouteUseSlots(acc->cc); - status = valkeyClusterConnect2(acc->cc); - ASSERT_MSG(status == VALKEY_OK, acc->errstr); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.max_retry_count = 1; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; valkeyClusterInitNodeIterator(&ni, acc->cc); @@ -403,7 +397,7 @@ void test_async_command_argv_to_single_node(void) { assert(node); ExpectedResult r1 = {.type = VALKEY_REPLY_INTEGER, .disconnect = true}; - status = valkeyClusterAsyncCommandArgvToNode( + int status = valkeyClusterAsyncCommandArgvToNode( acc, node, commandCallback, &r1, 1, (const char *[]){"DBSIZE"}, (size_t[]){6}); ASSERT_MSG(status == VALKEY_OK, acc->errstr); @@ -415,25 +409,24 @@ void test_async_command_argv_to_single_node(void) { } void test_async_to_all_nodes(void) { - int status; + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - valkeyClusterSetOptionMaxRetry(acc->cc, 1); - valkeyClusterSetOptionRouteUseSlots(acc->cc); - status = valkeyClusterConnect2(acc->cc); - ASSERT_MSG(status == VALKEY_OK, acc->errstr); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.max_retry_count = 1; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNodeIterator ni; valkeyClusterInitNodeIterator(&ni, acc->cc); + int status; ExpectedResult r1 = {.type = VALKEY_REPLY_INTEGER}; valkeyClusterNode *node; @@ -457,28 +450,26 @@ void test_async_to_all_nodes(void) { } void test_async_transaction(void) { - int status; + struct event_base *base = event_base_new(); - valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(); - assert(acc); - valkeyClusterAsyncSetConnectCallback(acc, callbackExpectOk); - valkeyClusterAsyncSetDisconnectCallback(acc, callbackExpectOk); - valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); - valkeyClusterSetOptionMaxRetry(acc->cc, 1); - valkeyClusterSetOptionRouteUseSlots(acc->cc); - status = valkeyClusterConnect2(acc->cc); - ASSERT_MSG(status == VALKEY_OK, acc->errstr); + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.max_retry_count = 1; + options.async_connect_cb = callbackExpectOk; + options.async_disconnect_cb = callbackExpectOk; + valkeyClusterSetOptionUseLibevent(&options, base); - struct event_base *base = event_base_new(); - status = valkeyClusterLibeventAttach(acc, base); - assert(status == VALKEY_OK); + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); valkeyClusterNode *node = valkeyClusterGetNodeByKey(acc->cc, (char *)"foo"); assert(node); ExpectedResult r1 = {.type = VALKEY_REPLY_STATUS, .str = "OK"}; - status = valkeyClusterAsyncCommandToNode(acc, node, commandCallback, &r1, - "MULTI"); + int status = valkeyClusterAsyncCommandToNode(acc, node, commandCallback, &r1, + "MULTI"); ASSERT_MSG(status == VALKEY_OK, acc->errstr); ExpectedResult r2 = {.type = VALKEY_REPLY_STATUS, .str = "QUEUED"}; @@ -505,15 +496,13 @@ void test_async_transaction(void) { } int main(void) { - int status; + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.max_retry_count = 1; - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - valkeyClusterSetOptionRouteUseSlots(cc); - valkeyClusterSetOptionMaxRetry(cc, 1); - status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); + valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&options); + ASSERT_MSG(cc && cc->err == 0, cc ? cc->errstr : "OOM"); load_valkey_version(cc); // Synchronous API diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index 5d80f12c..66c996fd 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -36,14 +36,15 @@ valkeyReply *create_cluster_nodes_reply(const char *bulkstr) { /* Parse a cluster nodes reply from a basic deployment. */ void test_parse_cluster_nodes(bool parse_replicas) { - valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + if (parse_replicas) + options.options |= VALKEY_OPT_USE_REPLICAS; + + valkeyClusterContext *cc = valkeyClusterContextInit(&options); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; - if (parse_replicas) - cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE; - valkeyReply *reply = create_cluster_nodes_reply( "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004@31004,hostname4 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected\n" "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,hostname2 master - 0 1426238316232 2 connected 5461-10922\n" @@ -120,7 +121,8 @@ void test_parse_cluster_nodes(bool parse_replicas) { } void test_parse_cluster_nodes_during_failover(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = valkeyClusterContextInit(&options); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -183,7 +185,8 @@ void test_parse_cluster_nodes_during_failover(void) { /* Skip nodes with no address, i.e with address :0 */ void test_parse_cluster_nodes_with_noaddr(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = valkeyClusterContextInit(&options); valkeyClusterNode *node; dictIterator di; @@ -210,7 +213,8 @@ void test_parse_cluster_nodes_with_noaddr(void) { /* Parse replies with additional importing and migrating information. */ void test_parse_cluster_nodes_with_special_slot_entries(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = valkeyClusterContextInit(&options); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; @@ -248,13 +252,14 @@ void test_parse_cluster_nodes_with_special_slot_entries(void) { /* Parse a cluster nodes reply containing a primary with multiple replicas. */ void test_parse_cluster_nodes_with_multiple_replicas(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = valkeyClusterContextInit(&options); valkeyClusterNode *node; cluster_slot *slot; dictIterator di; listIter li; - cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE; + cc->flags |= VALKEY_FLAG_PARSE_REPLICAS; valkeyReply *reply = create_cluster_nodes_reply( "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004@31004,hostname4 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected\n" @@ -311,7 +316,8 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) { /* Give error when parsing erroneous data. */ void test_parse_cluster_nodes_with_parse_error(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = valkeyClusterContextInit(&options); valkeyReply *reply; dict *nodes; @@ -348,7 +354,8 @@ void test_parse_cluster_nodes_with_parse_error(void) { /* Redis pre-v4.0 returned node addresses without the clusterbus port, * i.e. `ip:port` instead of `ip:port@cport` */ void test_parse_cluster_nodes_with_legacy_format(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterOptions options = {0}; + valkeyClusterContext *cc = valkeyClusterContextInit(&options); valkeyClusterNode *node; dictIterator di;