Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't initiate new connections during a cluster client disconnect (async API) #73

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/valkey/valkeycluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
/* Flag to enable routing table updates using the command 'cluster slots'.
* Default is the 'cluster nodes' command. */
#define VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS 0x4000
/* Flag specific to the async API which means that the user requested a
* client disconnect or free. */
#define VALKEYCLUSTER_FLAG_DISCONNECTING 0x8000

/* Events, for valkeyClusterSetEventCallback() */
#define VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED 1
Expand Down
29 changes: 25 additions & 4 deletions src/valkeycluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,10 @@ int valkeyClusterConnect2(valkeyClusterContext *cc) {
"server address not configured");
return VALKEY_ERR;
}
/* Clear a previously set shutdown flag since we allow a
* reconnection of an async context using this API (legacy). */
cc->flags &= ~VALKEYCLUSTER_FLAG_DISCONNECTING;

return valkeyClusterUpdateSlotmap(cc);
}

Expand Down Expand Up @@ -3171,6 +3175,10 @@ static int updateSlotMapAsync(valkeyClusterAsyncContext *acc,
/* Don't allow concurrent slot map updates. */
return VALKEY_ERR;
}
if (acc->cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING) {
/* No slot map updates during a cluster client disconnect. */
return VALKEY_ERR;
}

if (ac == NULL) {
if (acc->cc->nodes == NULL) {
Expand Down Expand Up @@ -3263,7 +3271,8 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r,
goto done;
}

if (cad->retry_count == NO_RETRY) /* Skip retry handling */
/* Skip retry handling when not expected, or during a client disconnect. */
if (cad->retry_count == NO_RETRY || cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING)
goto done;

error_type = cluster_reply_error_type(reply);
Expand Down Expand Up @@ -3383,6 +3392,12 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc,

cc = acc->cc;

/* Don't accept new commands when the client is about to disconnect. */
if (cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING) {
valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER, "disconnecting");
return VALKEY_ERR;
}

if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
Expand Down Expand Up @@ -3457,20 +3472,24 @@ int valkeyClusterAsyncFormattedCommandToNode(valkeyClusterAsyncContext *acc,
valkeyClusterCallbackFn *fn,
void *privdata, char *cmd,
int len) {
valkeyClusterContext *cc;
valkeyClusterContext *cc = acc->cc;
valkeyAsyncContext *ac;
int status;
cluster_async_data *cad = NULL;
struct cmd *command = NULL;

/* Don't accept new commands when the client is about to disconnect. */
if (cc->flags & VALKEYCLUSTER_FLAG_DISCONNECTING) {
valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER, "disconnecting");
return VALKEY_ERR;
}

ac = actx_get_by_node(acc, node);
if (ac == NULL) {
/* Specific error already set */
return VALKEY_ERR;
}

cc = acc->cc;

if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
Expand Down Expand Up @@ -3646,6 +3665,7 @@ void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc) {
}

cc = acc->cc;
cc->flags |= VALKEYCLUSTER_FLAG_DISCONNECTING;

if (cc->nodes == NULL) {
return;
Expand Down Expand Up @@ -3675,6 +3695,7 @@ void valkeyClusterAsyncFree(valkeyClusterAsyncContext *acc) {
}

cc = acc->cc;
cc->flags |= VALKEYCLUSTER_FLAG_DISCONNECTING;

valkeyClusterFree(cc);

Expand Down
8 changes: 8 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,12 @@ if (LIBEVENT_LIBRARY)
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/slots-not-served-test-async.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME client-disconnect-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/client-disconnect-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME client-disconnect-without-slotmap-update-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/client-disconnect-without-slotmap-update-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
endif()
26 changes: 26 additions & 0 deletions tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
* Will send following commands using the `..ToNode()` API and a
* cluster node iterator to send each command to all known nodes.
*
* !disconnect - Disconnect the client.
*
* An example input of first sending 2 commands and waiting for their responses,
* before sending a single command and waiting for its response:
*
Expand Down Expand Up @@ -139,6 +141,8 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
"!all in !resend not supported");
send_to_all = 1;
}
if (strcmp(cmd, "!disconnect") == 0)
valkeyClusterAsyncDisconnect(acc);
continue; /* Skip line */
}

Expand Down Expand Up @@ -202,16 +206,34 @@ void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) {
printf("Event: %s\n", e);
}

void connectCallback(const valkeyAsyncContext *ac, int status) {
const char *s = "";
if (status != VALKEY_OK)
s = "failed to ";
printf("Event: %sconnect to %s:%d\n", s, ac->c.tcp.host, ac->c.tcp.port);
}

void disconnectCallback(const valkeyAsyncContext *ac, int status) {
const char *s = "";
if (status != VALKEY_OK)
s = "failed to ";
printf("Event: %sdisconnect from %s:%d\n", s, ac->c.tcp.host,
ac->c.tcp.port);
}

int main(int argc, char **argv) {
int use_cluster_slots = 1; // Get topology via CLUSTER SLOTS
int show_events = 0;
int show_connection_events = 0;

int optind;
for (optind = 1; optind < argc && argv[optind][0] == '-'; optind++) {
if (strcmp(argv[optind], "--use-cluster-nodes") == 0) {
use_cluster_slots = 0; // Use the default CLUSTER NODES instead
} else if (strcmp(argv[optind], "--events") == 0) {
show_events = 1;
} else if (strcmp(argv[optind], "--connection-events") == 0) {
show_connection_events = 1;
} else {
fprintf(stderr, "Unknown argument: '%s'\n", argv[optind]);
}
Expand All @@ -237,6 +259,10 @@ int main(int argc, char **argv) {
if (show_events) {
valkeyClusterSetEventCallback(acc->cc, eventCallback, NULL);
}
if (show_connection_events) {
valkeyClusterAsyncSetConnectCallback(acc, connectCallback);
valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
}

if (valkeyClusterConnect2(acc->cc) != VALKEY_OK) {
printf("Connect error: %s\n", acc->cc->errstr);
Expand Down
75 changes: 75 additions & 0 deletions tests/scripts/client-disconnect-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/bin/bash
#
# Verify that a client disconnects from known nodes without following
# redirects, this to avoid reconnecting to already disconnected nodes.
# The client will not accept new commands thereafter.
#
# Usage: $0 /path/to/clusterclient-binary

clientprog=${1:-./clusterclient_async}
testname=client-disconnect-test

# Sync processes waiting for CONT signals.
perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' &
syncpid1=$!;

# Start simulated valkey node
timeout 5s ./simulated-valkey.pl -p 7401 -d --sigcont $syncpid1 <<'EOF' &
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 16383, ["127.0.0.1", 7401, "nodeid1"]]]
EXPECT CLOSE

EXPECT CONNECT
EXPECT ["SET", "foo", "initial"]
SEND +OK

EXPECT ["SET", "foo", "redirect"]
SEND -MOVED 12182 127.0.0.1:7402

EXPECT CLOSE
EOF
server1=$!

# Wait until node is ready to accept client connections
wait $syncpid1;

# Run client
timeout 4s "$clientprog" --connection-events 127.0.0.1:7401 > "$testname.out" <<'EOF'
SET foo initial

# Send a command that is expected to be redirected just before
# requesting a client disconnect.
!async
SET foo redirect
!disconnect
!sync

# Commands are not accepted after a disconnect.
SET foo not-accepted
EOF
clientexit=$?

# Wait for server to exit
wait $server1; server1exit=$?

# Check exit statuses
if [ $server1exit -ne 0 ]; then
echo "Simulated server #1 exited with status $server1exit"
exit $server1exit
fi
if [ $clientexit -ne 0 ]; then
echo "$clientprog exited with status $clientexit"
exit $clientexit
fi

expected="Event: connect to 127.0.0.1:7401
OK
MOVED 12182 127.0.0.1:7402
Event: disconnect from 127.0.0.1:7401
error: disconnecting"

echo "$expected" | diff -u - "$testname.out" || exit 99

# Clean up
rm "$testname.out"
104 changes: 104 additions & 0 deletions tests/scripts/client-disconnect-without-slotmap-update-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/bin/bash
#
# Verify that a client disconnects from known nodes without starting new
# slot map updates. A client shouldn't reconnect or connect to new nodes
# while shutting down.
#
# This testcase starts 2 cluster nodes and the client connects to both.
# The client triggers a disconnect right after a command has been sent,
# which will invoke its callback with a NULL reply. This NULL reply
# should not trigger a slot map update.
#
# Usage: $0 /path/to/clusterclient-binary

clientprog=${1:-./clusterclient_async}
testname=client-disconnect-without-slotmap-update-test

# Sync process just waiting for server to be ready to accept connection.
perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' &
syncpid1=$!
perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' &
syncpid2=$!

# Start simulated valkey node #1
timeout 5s ./simulated-valkey.pl -p 7401 -d --sigcont $syncpid1 <<'EOF' &
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 6000, ["127.0.0.1", 7401, "nodeid1"]],[6001, 16383, ["127.0.0.1", 7402, "nodeid2"]]]
EXPECT CLOSE

EXPECT CONNECT
EXPECT ["SET", "bar", "initial"]
SEND +OK

# Normally a slot map update is expected here, but not during disconnects.

EXPECT CLOSE
EOF
server1=$!

# Start simulated valkey node #2
timeout 5s ./simulated-valkey.pl -p 7402 -d --sigcont $syncpid2 <<'EOF' &
EXPECT CONNECT
EXPECT ["SET", "foo", "initial"]
SEND +OK

EXPECT ["SET", "foo", "null-reply"]
# The client will invoke callbacks for outstanding requests with a NULL reply.
# Normally this would trigger a slot map update, but not during disconnects.

EXPECT CLOSE
EOF
server2=$!

# Wait until both nodes are ready to accept client connections
wait $syncpid1 $syncpid2;

# Run client
timeout 4s "$clientprog" --connection-events 127.0.0.1:7401 > "$testname.out" <<'EOF'
SET foo initial
SET bar initial

# Make sure a slot map update is not throttled.
!sleep

# Send a command just before requesting a client disconnect.
# A NULL reply should not trigger a slot map update after a disconnect.
!async
SET foo null-reply
!disconnect
!sync

EOF
clientexit=$?

# Wait for servers to exit
wait $server1; server1exit=$?
wait $server2; server2exit=$?

# Check exit statuses
if [ $server1exit -ne 0 ]; then
echo "Simulated server #1 exited with status $server1exit"
exit $server1exit
fi
if [ $server2exit -ne 0 ]; then
echo "Simulated server #2 exited with status $server2exit"
exit $server2exit
fi
if [ $clientexit -ne 0 ]; then
echo "$clientprog exited with status $clientexit"
exit $clientexit
fi

expected="Event: connect to 127.0.0.1:7402
OK
Event: connect to 127.0.0.1:7401
OK
Event: disconnect from 127.0.0.1:7401
error: Timeout
Event: failed to disconnect from 127.0.0.1:7402"

echo "$expected" | diff -u - "$testname.out" || exit 99

# Clean up
rm "$testname.out"
Loading