Skip to content

Commit

Permalink
valkey-cli auto-exit from subscribed mode (#1432)
Browse files Browse the repository at this point in the history
Resolves issue with valkey-cli not auto exiting from subscribed mode on
reaching zero pub/sub subscription (previously filed on Redis)
redis/redis#12592

---------

Signed-off-by: Nikhil Manglore <[email protected]>
  • Loading branch information
Nikhil-Manglore authored Jan 8, 2025
1 parent 0a89571 commit 9e02049
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 4 deletions.
45 changes: 41 additions & 4 deletions src/valkey-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ static struct config {
int shutdown;
int monitor_mode;
int pubsub_mode;
int pubsub_unsharded_count; /* channels and patterns */
int pubsub_sharded_count; /* shard channels */
int blocking_state_aborted; /* used to abort monitor_mode and pubsub_mode. */
int latency_mode;
int latency_dist_mode;
Expand Down Expand Up @@ -2229,6 +2231,28 @@ static int cliReadReply(int output_raw_strings) {
return REDIS_OK;
}

/* Helper method to handle pubsub subscription/unsubscription. */
static void handlePubSubMode(redisReply *reply) {
char *cmd = reply->element[0]->str;
int count = reply->element[2]->integer;

/* Update counts based on the command type */
if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) {
config.pubsub_unsharded_count = count;
} else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) {
config.pubsub_sharded_count = count;
}

/* Update pubsub mode based on the current counts */
if (config.pubsub_unsharded_count + config.pubsub_sharded_count == 0 && config.pubsub_mode) {
config.pubsub_mode = 0;
cliRefreshPrompt();
} else if (config.pubsub_unsharded_count + config.pubsub_sharded_count > 0 && !config.pubsub_mode) {
config.pubsub_mode = 1;
cliRefreshPrompt();
}
}

/* Simultaneously wait for pubsub messages from the server and input on stdin. */
static void cliWaitForMessagesOrStdin(void) {
int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) || getenv("FAKETTY"));
Expand All @@ -2246,7 +2270,13 @@ static void cliWaitForMessagesOrStdin(void) {
sds out = cliFormatReply(reply, config.output, 0);
fwrite(out, sdslen(out), 1, stdout);
fflush(stdout);

if (isPubsubPush(reply)) {
handlePubSubMode(reply);
}

sdsfree(out);
freeReplyObject(reply);
}
} while (reply);

Expand Down Expand Up @@ -2397,13 +2427,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
fflush(stdout);
if (config.pubsub_mode || num_expected_pubsub_push > 0) {
if (isPubsubPush(config.last_reply)) {
handlePubSubMode(config.last_reply);

if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) {
/* This pushed message confirms the
* [p|s][un]subscribe command. */
if (is_subscribe && !config.pubsub_mode) {
config.pubsub_mode = 1;
cliRefreshPrompt();
}
if (--num_expected_pubsub_push > 0) {
continue; /* We need more of these. */
}
Expand Down Expand Up @@ -3117,6 +3145,13 @@ void cliSetPreferences(char **argv, int argc, int interactive) {
else {
printf("%sunknown valkey-cli preference '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]);
}
} else if (!strcasecmp(argv[0], ":get") && argc >= 2) {
if (!strcasecmp(argv[1], "pubsub")) {
printf("%d\n", config.pubsub_mode);
} else {
printf("%sunknown valkey-cli get option '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]);
}
fflush(stdout);
} else {
printf("%sunknown valkey-cli internal command '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[0]);
}
Expand Down Expand Up @@ -9495,6 +9530,8 @@ int main(int argc, char **argv) {
config.shutdown = 0;
config.monitor_mode = 0;
config.pubsub_mode = 0;
config.pubsub_unsharded_count = 0;
config.pubsub_sharded_count = 0;
config.blocking_state_aborted = 0;
config.latency_mode = 0;
config.latency_dist_mode = 0;
Expand Down
226 changes: 226 additions & 0 deletions tests/integration/valkey-cli.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,232 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS
assert_equal "a\n1\nb\n2\nc\n3" [exec {*}$cmdline ZRANGE new_zset 0 -1 WITHSCORES]
}
test "valkey-cli pubsub mode with single standard channel subscription" {
set fd [open_cli]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
write_cli $fd "SUBSCRIBE ch1"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "UNSUBSCRIBE ch1"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "valkey-cli pubsub mode with multiple standard channel subscriptions" {
set fd [open_cli]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
write_cli $fd "SUBSCRIBE ch1 ch2 ch3"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "UNSUBSCRIBE"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "valkey-cli pubsub mode with single shard channel subscription" {
set fd [open_cli]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
write_cli $fd "SSUBSCRIBE schannel1"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "SUNSUBSCRIBE schannel1"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "valkey-cli pubsub mode with multiple shard channel subscriptions" {
set fd [open_cli]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
write_cli $fd "SSUBSCRIBE schannel1 schannel2 schannel3"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "SUNSUBSCRIBE"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "valkey-cli pubsub mode with single pattern channel subscription" {
set fd [open_cli]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
write_cli $fd "PSUBSCRIBE pattern1*"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "PUNSUBSCRIBE pattern1*"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "valkey-cli pubsub mode with multiple pattern channel subscriptions" {
set fd [open_cli]
write_cli $fd "PSUBSCRIBE pattern1* pattern2* pattern3*"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "PUNSUBSCRIBE"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "valkey-cli pubsub mode when subscribing to the same channel" {
set fd [open_cli]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
write_cli $fd "SUBSCRIBE ch1 ch1"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "UNSUBSCRIBE"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "valkey-cli pubsub mode with multiple subscription types" {
set fd [open_cli]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
write_cli $fd "SUBSCRIBE ch1 ch2 ch3"
set response [read_cli $fd]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "PSUBSCRIBE pattern*"
set response [read_cli $fd]
set lines [split $response "\n"]
assert_equal "psubscribe" [lindex $lines 0]
assert_equal "pattern*" [lindex $lines 1]
assert_equal "4" [lindex $lines 2]
write_cli $fd "SSUBSCRIBE schannel"
set response [read_cli $fd]
set lines [split $response "\n"]
assert_equal "ssubscribe" [lindex $lines 0]
assert_equal "schannel" [lindex $lines 1]
assert_equal "1" [lindex $lines 2]
write_cli $fd "PUNSUBSCRIBE pattern*"
set response [read_cli $fd]
set lines [split $response "\n"]
assert_equal "punsubscribe" [lindex $lines 0]
assert_equal "pattern*" [lindex $lines 1]
assert_equal "3" [lindex $lines 2]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "SUNSUBSCRIBE schannel"
set response [read_cli $fd]
set lines [split $response "\n"]
assert_equal "sunsubscribe" [lindex $lines 0]
assert_equal "schannel" [lindex $lines 1]
assert_equal "0" [lindex $lines 2]
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "1" $pubsub_status
write_cli $fd "UNSUBSCRIBE"
set response [read_cli $fd]
# Verify pubsub mode is no longer active
write_cli $fd ":get pubsub"
set pubsub_status [string trim [read_cli $fd]]
assert_equal "0" $pubsub_status
close_cli $fd
}
test "Valid Connection Scheme: redis://" {
set cmdline [valkeycliuri "redis://" [srv host] [srv port]]
assert_equal {PONG} [exec {*}$cmdline PING]
Expand Down

0 comments on commit 9e02049

Please sign in to comment.