Skip to content

Commit

Permalink
[fix] add swap info propagate mode
Browse files Browse the repository at this point in the history
1. original mode of swap info propagating only includes swap.info mode, which depends on X-pipe.
2. this commit extends ping argv for propagating swap info, which do not depend on any other component.
  • Loading branch information
VCgege committed Dec 24, 2024
1 parent fc9c9d2 commit 0f89592
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 31 deletions.
7 changes: 7 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ configEnum swap_info_supported_enum[] = {
{NULL, 0}
};

configEnum swap_info_propagate_mode_enum[] = {
{"ping", SWAP_INFO_PROPAGATE_BY_PING},
{"swap.info", SWAP_INFO_PROPAGATE_BY_SWAP_INFO},
{NULL, 0}
};

/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
Expand Down Expand Up @@ -2947,6 +2953,7 @@ standardConfig configs[] = {
createEnumConfig("swap-cuckoo-filter-bit-per-key", NULL, IMMUTABLE_CONFIG, cuckoo_filter_bit_type_enum, server.swap_cuckoo_filter_bit_type, CUCKOO_FILTER_BITS_PER_TAG_8, NULL, NULL),
createEnumConfig("swap-ratelimit-policy", NULL, MODIFIABLE_CONFIG, swap_ratelimit_policy_enum, server.swap_ratelimit_policy, SWAP_RATELIMIT_POLICY_PAUSE, NULL, NULL),
createEnumConfig("swap-swap-info-supported", NULL, MODIFIABLE_CONFIG, swap_info_supported_enum, server.swap_swap_info_supported, SWAP_INFO_SUPPORTED_AUTO, NULL, NULL),
createEnumConfig("swap-swap-info-propagate-mode", NULL, MODIFIABLE_CONFIG, swap_info_propagate_mode_enum, server.swap_swap_info_propagate_mode, SWAP_INFO_PROPAGATE_BY_PING, NULL, NULL),

/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
Expand Down
1 change: 1 addition & 0 deletions src/ctrip_swap.c
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ int swapTest(int argc, char **argv, int accurate) {
result += swapRordbTest(argc, argv, accurate);
result += swapDataBitmapTest(argc, argv, accurate);
result += wtdigestTest(argc, argv, accurate);
result += swapReplTest(argc, argv, accurate);
return result;
}
#endif
11 changes: 10 additions & 1 deletion src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -1994,10 +1994,18 @@ sds genSwapTtlCompactInfoString(sds info);

void swapInfoCommand(client *c);

/* swap info propagate mode */
#define SWAP_INFO_PROPAGATE_BY_PING 0
#define SWAP_INFO_PROPAGATE_BY_SWAP_INFO 1

void swapBuildSwapInfoSstAgeLimitCmd(robj *argv[3], long long sst_age_limit);
void swapDestorySwapInfoSstAgeLimitCmd(robj *argv[3]);

void swapPropagateSwapInfo(int argc, robj **argv);
void swapPropagateSwapInfoCmd(int argc, robj **argv);

sds swapEncodeSwapInfo(int swap_info_argc, sds *swap_info_argv);
sds *swapDecodeSwapInfo(sds argv, int *swap_info_argc);
void swapApplySwapInfo(int swap_info_argc, sds *swap_info_argv);

/* Repl */
int submitReplClientRequests(client *c);
Expand Down Expand Up @@ -2717,6 +2725,7 @@ int swapRordbTest(int argc, char *argv[], int accurate);
int roaringBitmapTest(int argc, char *argv[], int accurate);
int swapDataBitmapTest(int argc, char **argv, int accurate);
int wtdigestTest(int argc, char **argv, int accurate);
int swapReplTest(int argc, char **argv, int accurate);

int swapTest(int argc, char **argv, int accurate);

Expand Down
14 changes: 6 additions & 8 deletions src/ctrip_swap_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1313,15 +1313,13 @@ void swapInfoCommand(client *c) {
NULL};
addReplyHelp(c, help);
return;
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"SST-AGE-LIMIT")) {
/* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
long long sst_age_limit = 0;
int res = isObjectRepresentableAsLongLong(c->argv[2], &sst_age_limit);
if (res == C_OK) {
server.swap_ttl_compact_ctx->expire_stats->sst_age_limit = sst_age_limit;
} else {
sds *swap_info_argv = (sds*)zmalloc(sizeof(sds)*c->argc);
for (int i = 0; i < c->argc; i++) {
swap_info_argv[i] = c->argv[i]->ptr;
}
addReply(c,shared.ok);
return;
swapApplySwapInfo(c->argc, swap_info_argv);
zfree(swap_info_argv);
}

addReply(c,shared.ok);
Expand Down
104 changes: 87 additions & 17 deletions src/ctrip_swap_repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,36 +415,106 @@ bool isSwapInfoSupported(void) {
return true;
}

/* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
/* The swap.info command, propagate system info to slave.
* SWAP.INFO <subcommand> [<arg> [value] [opt] ...]
*
* subcommand supported:
* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
void swapBuildSwapInfoSstAgeLimitCmd(robj *argv[3], long long sst_age_limit) {

argv[0] = shared.swap_info;
argv[1] = shared.sst_age_limit;
argv[2] = createStringObjectFromLongLong(sst_age_limit);
argv[2] = createObject(OBJ_STRING, sdsfromlonglong(sst_age_limit));
}

void swapDestorySwapInfoSstAgeLimitCmd(robj *argv[3]) {
decrRefCount(argv[2]);
}

/* The swap.info command, propagate system info to slave.
* SWAP.INFO <subcommand> [<arg> [value] [opt] ...]
*
* subcommand supported:
* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
void swapPropagateSwapInfo(int argc, robj **argv) {
void swapPropagateSwapInfoCmd(int argc, robj **argv) {

if (server.swap_swap_info_propagate_mode == SWAP_INFO_PROPAGATE_BY_SWAP_INFO) {
if (!isSwapInfoSupported()) return;
replicationFeedSlaves(server.slaves, 0, argv, argc);
return;
}

/* SWAP_INFO_PROPAGATE_BY_PING */
sds *argv_str = zmalloc(sizeof(sds) * argc);
for (int i = 0; i < argc; i++) {
argv_str[i] = argv[i]->ptr;
}

sds ping_argv_str = swapEncodeSwapInfo(argc, argv_str);

robj *ping_argv[2];
ping_argv[0] = shared.ping;
ping_argv[1] = createObject(OBJ_STRING, ping_argv_str);

replicationFeedSlaves(server.slaves,0,ping_argv,2);
decrRefCount(ping_argv[1]);
zfree(argv_str);
return;
}

if (!isSwapInfoSupported()) return;
sds swapEncodeSwapInfo(int swap_info_argc, sds *swap_info_argv) {
return sdsjoinsds(swap_info_argv, swap_info_argc, " ", 1);
}

sds *swapDecodeSwapInfo(sds argv, int *swap_info_argc) {
return sdssplitlen(argv, sdslen(argv), " ", 1, swap_info_argc);
}

if (argc < 2) {
void swapApplySwapInfo(int swap_info_argc, sds *swap_info_argv) {

if (strcasecmp(swap_info_argv[0],"swap.info")) {
return;
} else if (argc == 3 && !strcasecmp(argv[1]->ptr,"SST-AGE-LIMIT")) {
}

if (swap_info_argc == 3 && !strcasecmp(swap_info_argv[1],"SST-AGE-LIMIT")) {
/* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
goto propagate;
long long sst_age_limit = 0;
if (isSdsRepresentableAsLongLong(swap_info_argv[2],&sst_age_limit) == C_OK) {
server.swap_ttl_compact_ctx->expire_stats->sst_age_limit = sst_age_limit;
}
return;
}
return;
}

propagate:
replicationFeedSlaves(server.slaves,0,argv,argc);
return;
#ifdef REDIS_TEST

int swapReplTest(int argc, char *argv[], int accurate) {
UNUSED(argc);
UNUSED(argv);
UNUSED(accurate);

int error = 0;

TEST("swap info: sst-limit-age encode decode") {
sds *swap_info_argv = zmalloc(sizeof(sds) * 3);
swap_info_argv[0] = sdsnew("SWAP.INFO");
swap_info_argv[1] = sdsnew("SST-AGE-LIMIT");
swap_info_argv[2] = sdsfromlonglong(1111);
sds swap_info = swapEncodeSwapInfo(3, swap_info_argv);

int argc = 0;
sds *argv = swapDecodeSwapInfo(swap_info, &argc);

test_assert(argc == 3);
test_assert(!sdscmp(argv[2], swap_info_argv[2]));
test_assert(!sdscmp(argv[1], swap_info_argv[1]));
test_assert(!sdscmp(argv[0], swap_info_argv[0]));

sdsfree(swap_info_argv[0]);
sdsfree(swap_info_argv[1]);
sdsfree(swap_info_argv[2]);
zfree(swap_info_argv);

sdsfree(swap_info);

sdsfreesplitres(argv, argc);
}

return error;
}

#endif
16 changes: 13 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2488,9 +2488,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.swap_ttl_compact_enabled) {
robj *argv[3];
swapBuildSwapInfoSstAgeLimitCmd(argv, server.swap_ttl_compact_ctx->expire_stats->sst_age_limit);
swapPropagateSwapInfo(3, argv);
swapPropagateSwapInfoCmd(3, argv);
swapDestorySwapInfoSstAgeLimitCmd(argv);
}

}

/* Fire the cron loop modules event. */
Expand Down Expand Up @@ -4900,10 +4901,19 @@ void pingCommand(client *c) {
else
addReplyBulk(c,c->argv[1]);
} else {
if (c->argc == 1)
if (c->argc == 1) {
addReply(c,shared.pong);
else
} else {
/* extend ping argv for swap.info */
if (!strncasecmp(c->argv[1]->ptr, "swap.info", 9)) {
int swap_info_argc = 0;
sds *swap_info_argv = swapDecodeSwapInfo(c->argv[1]->ptr, &swap_info_argc);
swapApplySwapInfo(swap_info_argc, swap_info_argv);
sdsfreesplitres(swap_info_argv, swap_info_argc);
}

addReplyBulk(c,c->argv[1]);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,7 @@ struct redisServer {

/* for swap.info command, which propagate system info to replica */
int swap_swap_info_supported;
int swap_swap_info_propagate_mode;
unsigned long long swap_swap_info_slave_period; /* Master send cmd swap.info to the slave every N seconds */
};

Expand Down
43 changes: 42 additions & 1 deletion tests/swap/unit/swap_info.tcl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
start_server {tags {"swap.info"}} {
start_server {tags {"swap.info"} overrides {swap-swap-info-propagate-mode {swap.info} }} {
test "SST-AGE-LIMIT" {
r swap.info SST-AGE-LIMIT 1111
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
Expand All @@ -19,4 +19,45 @@ start_server {tags {"swap.info"}} {
assert_equal OK [r swap.info SST-AGE-LIMIT 50 50]
assert_equal OK [r swap.info SST-AGE-LIMIT 50 50 50 err]
}
}

start_server {tags {"swap.info"} overrides {swap-swap-info-propagate-mode {ping} }} {
test "SST-AGE-LIMIT with ping" {
r ping "swap.info SST-AGE-LIMIT 1111"
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 1111

r ping "swap.info SST-AGE-LIMIT 0"
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 0

r ping "swap.info SST-AGE-LIMIT -2222"
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit -2222
}

test "no error for illegal ping argv of swap.info" {

r ping "swap.info SST-AGE-LIMIT 666"

assert_equal "swap.info SST-AGE 111" [r ping "swap.info SST-AGE 111"]
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 666

assert_equal "swap.info SST-AGE" [r ping "swap.info SST-AGE"]
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 666

assert_equal "swap.info" [r ping "swap.info"]
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 666

assert_equal "swap.info ss ss ss ss" [r ping "swap.info ss ss ss ss"]
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 666

assert_equal "swap.info " [r ping "swap.info "]
set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 666
}
}
11 changes: 10 additions & 1 deletion tests/swap/unit/ttl_compact.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ start_server {tags {"ttl-compact master propagate"} overrides {save ""}} {

start_server {overrides {swap-repl-rordb-sync {no}
swap-debug-evict-keys {0}
swap-ttl-compact-enabled {no} }} {
swap-ttl-compact-enabled {no}
swap-swap-info-propagate-mode {ping} }} {

set master_host [srv 0 host]
set master_port [srv 0 port]
Expand Down Expand Up @@ -312,6 +313,14 @@ start_server {tags {"ttl-compact master propagate"} overrides {save ""}} {

set sst_age_limit2 [get_info_property $slave Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit1 $sst_age_limit2

$master config set swap-swap-info-propagate-mode swap.info
$master swap.info SST-AGE-LIMIT 2222

after 1200

set sst_age_limit2 [get_info_property $slave Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit2 2222
}
}
}

0 comments on commit 0f89592

Please sign in to comment.