Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-const connect callback #205

Merged
merged 10 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,21 @@ aliased to `redisConnectCallback`:
void(const redisAsyncContext *ac, int status);
```

Alternatively, if `hiredis` >= v1.1.0 is used, you set a connect callback
that will be passed a non-const `redisAsyncContext*` on invocation (e.g.
to be able to set a push callback on it).

```c
int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
redisConnectCallbackNC *fn);
```

The callback function should have the following prototype,
aliased to `redisConnectCallbackNC`:
```c
void(redisAsyncContext *ac, int status);
```

On a connection attempt, the `status` argument is set to `REDIS_OK`
when the connection was successful.
The file description of the connection socket can be retrieved
Expand Down
5 changes: 5 additions & 0 deletions examples/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ target_link_libraries(example_async
hiredis_cluster::hiredis_cluster
${EVENT_LIBRARY})

add_executable(clientside_caching_async clientside_caching_async.c)
target_link_libraries(clientside_caching_async
hiredis_cluster::hiredis_cluster
${EVENT_LIBRARY})

# Executable: tls
if(ENABLE_SSL)
find_package(hiredis_ssl REQUIRED)
Expand Down
167 changes: 167 additions & 0 deletions examples/src/clientside_caching_async.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Simple example how to enable client tracking to implement client side caching.
* Tracking can be enabled via a registered connect callback and invalidation
* messages are received via the registered push callback.
* The disconnect callback should also be used as an indication of invalidation.
*/
#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define CLUSTER_NODE "127.0.0.1:7000"
#define KEY "key:1"

void pushCallback(redisAsyncContext *ac, void *r);
void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata);
void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata);
void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata);
void modifyKey(const char *key, const char *value);

/* The connect callback enables RESP3 and client tracking.
The non-const connect callback is used since we want to
set the push callback in the hiredis context. */
void connectCallbackNC(redisAsyncContext *ac, int status) {
assert(status == REDIS_OK);
redisAsyncSetPushCallback(ac, pushCallback);
redisAsyncCommand(ac, NULL, NULL, "HELLO 3");
redisAsyncCommand(ac, NULL, NULL, "CLIENT TRACKING ON");
printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

/* The event callback issues a 'SET' command when the client is ready to accept
commands. A reply is expected via a call to 'setCallback()' */
void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
(void)cc;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;

/* We send our commands when the client is ready to accept commands. */
if (event == HIRCLUSTER_EVENT_READY) {
printf("Client is ready to accept commands\n");

int status =
redisClusterAsyncCommand(acc, setCallback, NULL, "SET %s 1", KEY);
assert(status == REDIS_OK);
}
}

/* Message callback for 'SET' commands. Issues a 'GET' command and a reply is
expected as a call to 'getCallback1()' */
void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);
printf("Callback for 'SET', reply: %s\n", reply->str);

int status =
redisClusterAsyncCommand(acc, getCallback1, NULL, "GET %s", KEY);
assert(status == REDIS_OK);
}

/* Message callback for the first 'GET' command. Modifies the key to
trigger Redis to send a key invalidation message and then sends another
'GET' command. The invalidation message is received via the registered
push callback. */
void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);

printf("Callback for first 'GET', reply: %s\n", reply->str);

/* Modify the key from another client which will invalidate a cached value.
Redis will send an invalidation message via a push message. */
modifyKey(KEY, "99");

int status =
redisClusterAsyncCommand(acc, getCallback2, NULL, "GET %s", KEY);
assert(status == REDIS_OK);
}

/* Push message callback handling invalidation messages. */
void pushCallback(redisAsyncContext *ac, void *r) {
redisReply *reply = r;
if (!(reply->type == REDIS_REPLY_PUSH && reply->elements == 2 &&
reply->element[0]->type == REDIS_REPLY_STRING &&
!strncmp(reply->element[0]->str, "invalidate", 10) &&
reply->element[1]->type == REDIS_REPLY_ARRAY)) {
/* Not an 'invalidate' message. Ignore. */
return;
}
redisReply *payload = reply->element[1];
size_t i;
for (i = 0; i < payload->elements; i++) {
redisReply *key = payload->element[i];
if (key->type == REDIS_REPLY_STRING)
printf("Invalidate key '%.*s'\n", (int)key->len, key->str);
else if (key->type == REDIS_REPLY_NIL)
printf("Invalidate all\n");
}
}

/* Message callback for 'GET' commands. Exits program. */
void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);

printf("Callback for second 'GET', reply: %s\n", reply->str);

/* Exit the eventloop after a couple of sent commands. */
redisClusterAsyncDisconnect(acc);
}

/* A disconnect callback should invalidate all cached keys. */
void disconnectCallback(const redisAsyncContext *ac, int status) {
assert(status == REDIS_OK);
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);

printf("Invalidate all\n");
}

/* Helper to modify keys using a separate client. */
void modifyKey(const char *key, const char *value) {
printf("Modify key: '%s'\n", key);
redisClusterContext *cc = redisClusterContextInit();
int status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
assert(status == REDIS_OK);
status = redisClusterConnect2(cc);
assert(status == REDIS_OK);

redisReply *reply = redisClusterCommand(cc, "SET %s %s", key, value);
assert(reply != NULL);
freeReplyObject(reply);

redisClusterFree(cc);
}

int main(int argc, char **argv) {
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);

int status;
status = redisClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC);
assert(status == REDIS_OK);
status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == REDIS_OK);
status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == REDIS_OK);
status = redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
assert(status == REDIS_OK);

struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);

status = redisClusterAsyncConnect2(acc);
assert(status == REDIS_OK);

event_base_dispatch(base);

redisClusterAsyncFree(acc);
event_base_free(base);
return 0;
}
2 changes: 1 addition & 1 deletion examples/using_cmake_separate/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ script_dir=$(realpath "${0%/*}")
repo_dir=$(git rev-parse --show-toplevel)

# Download hiredis
hiredis_version=1.0.2
hiredis_version=1.1.0
curl -L https://github.com/redis/hiredis/archive/v${hiredis_version}.tar.gz | tar -xz -C ${script_dir}

# Build and install downloaded hiredis using CMake
Expand Down
27 changes: 23 additions & 4 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -3715,6 +3715,11 @@ redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc,
if (acc->onConnect) {
redisAsyncSetConnectCallback(ac, acc->onConnect);
}
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
else if (acc->onConnectNC) {
redisAsyncSetConnectCallbackNC(ac, acc->onConnectNC);
}
#endif

if (acc->onDisconnect) {
redisAsyncSetDisconnectCallback(ac, acc->onDisconnect);
Expand Down Expand Up @@ -3775,12 +3780,26 @@ int redisClusterAsyncConnect2(redisClusterAsyncContext *acc) {

int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
redisConnectCallback *fn) {
if (acc->onConnect == NULL) {
acc->onConnect = fn;
return REDIS_OK;
if (acc->onConnect != NULL)
return REDIS_ERR;
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
if (acc->onConnectNC != NULL)
return REDIS_ERR;
#endif
acc->onConnect = fn;
return REDIS_OK;
}

#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
redisConnectCallbackNC *fn) {
if (acc->onConnectNC != NULL || acc->onConnect != NULL) {
return REDIS_ERR;
}
return REDIS_ERR;
acc->onConnectNC = fn;
return REDIS_OK;
}
#endif

int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc,
redisDisconnectCallback *fn) {
Expand Down
14 changes: 14 additions & 0 deletions hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@
#define HIRCLUSTER_EVENT_READY 2
#define HIRCLUSTER_EVENT_FREE_CONTEXT 3

/* The non-const connect callback API is not available when:
* - using hiredis prior v.1.1.0; or
* - built on Windows since hiredis_cluster.def can't have conditional definitions. */
#if !(HIREDIS_MAJOR >= 1 && HIREDIS_MINOR >= 1) || _WIN32
#define HIRCLUSTER_NO_NONCONST_CONNECT_CB
#endif

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -159,6 +166,9 @@ typedef struct redisClusterAsyncContext {

/* Called when the first write event was received. */
redisConnectCallback *onConnect;
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
redisConnectCallbackNC *onConnectNC;
#endif

} redisClusterAsyncContext;

Expand Down Expand Up @@ -286,6 +296,10 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc);

int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
redisConnectCallback *fn);
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
redisConnectCallbackNC *fn);
#endif
int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc,
redisDisconnectCallback *fn);

Expand Down
18 changes: 18 additions & 0 deletions tests/ct_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ void connectCallback(const redisAsyncContext *ac, int status) {
printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
void connectCallbackNC(redisAsyncContext *ac, int status) {
UNUSED(ac);
UNUSED(status);
/* The testcase expects a failure during registration of this
non-const connect callback and it should never be called. */
assert(0);
}
#endif

void disconnectCallback(const redisAsyncContext *ac, int status) {
ASSERT_MSG(status == REDIS_OK, ac->errstr);
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
Expand Down Expand Up @@ -66,6 +76,14 @@ int main(void) {
int status;
status = redisClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == REDIS_OK);
status = redisClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == REDIS_ERR); /* Re-registration not accepted */

#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
status = redisClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC);
assert(status == REDIS_ERR); /* Re-registration not accepted */
#endif

status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == REDIS_OK);
status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);
Expand Down
Loading