Skip to content

Commit

Permalink
[WIP] ping with swap_info
Browse files Browse the repository at this point in the history
  • Loading branch information
VCgege committed Dec 20, 2024
1 parent d07ee94 commit 50e8cee
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 45 deletions.
4 changes: 3 additions & 1 deletion src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -1997,7 +1997,9 @@ void swapInfoCommand(client *c);
void swapBuildSwapInfoSstAgeLimitCmd(robj *argv[3], long long sst_age_limit);
void swapDestorySwapInfoSstAgeLimitCmd(robj *argv[3]);

void swapPropagateSwapInfo(int argc, robj **argv);
void swapPropagateSwapInfo(sds ping_argv);
sds swapEncodeSwapInfo(int argc, sds *argv);
sds *swapDecodeSwapInfo(sds argv);

/* Repl */
int submitReplClientRequests(client *c);
Expand Down
72 changes: 32 additions & 40 deletions src/ctrip_swap_repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,59 +392,51 @@ sds genSwapReplInfoString(sds info) {
return info;
}

bool isSwapInfoSupported(void) {
/* SWAP.INFO SST-AGE-LIMIT <sst-age-limit> */
void swapBuildSwapInfoSstAgeLimitCmd(sds *argv[3], long long sst_age_limit) {

if (server.swap_swap_info_supported == SWAP_INFO_SUPPORTED_YES) return true;
if (server.swap_swap_info_supported == SWAP_INFO_SUPPORTED_NO) return false;
argv[0] = shared.swap_info->ptr;
argv[1] = shared.sst_age_limit->ptr;
argv[2] = sdsfromlonglong(sst_age_limit);
}

/* SWAP_INFO_SUPPORTED_AUTO */
/* depends on capa of all slaves,
* once there is one slave without capa of swap.info, return false. */
void swapDestorySwapInfoSstAgeLimitCmd(sds *argv[3]) {
sdsfree(argv[2]);
}

listNode *ln;
listIter li;
/* The swap.info argv, propagate system info to slave with ping command.
* SWAP.INFO <subcommand> [<arg> [value] [opt] ...]
*
* subcommand supported:
* SWAP.INFO SST-AGE-LIMIT <sst-age-limit> */
sds swapEncodeSwapInfo(int argc, sds *argv) {

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
sds ping_argv = sdsempty();

if (!(slave->slave_capa & SLAVE_CAPA_SWAP_INFO)) {
return false;
}

for (int i = 0; i< argc; i++) {
ping_argv = sdscat(ping_argv, argv);
ping_argv = sdscat(ping_argv, " "); /* spilt by space */
}
return true;

return ping_argv;
}

/* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
void swapBuildSwapInfoSstAgeLimitCmd(robj *argv[3], long long sst_age_limit) {
sds *swapDecodeSwapInfo(sds argv) {

argv[0] = shared.swap_info;
argv[1] = shared.sst_age_limit;
argv[2] = createStringObjectFromLongLong(sst_age_limit);
}

void swapDestorySwapInfoSstAgeLimitCmd(robj *argv[3]) {
decrRefCount(argv[2]);
}
/* SWAP.INFO SST-AGE-LIMIT <sst-age-limit> */

/* The swap.info command, propagate system info to slave.
* SWAP.INFO <subcommand> [<arg> [value] [opt] ...]
*
* subcommand supported:
* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
void swapPropagateSwapInfo(int argc, robj **argv) {
return NULL ;
}

if (!isSwapInfoSupported()) return;
void swapPropagateSwapInfo(sds ping_argv) {

if (argc < 2) {
return;
} else if (argc == 3 && !strcasecmp(argv[1]->ptr,"SST-AGE-LIMIT")) {
/* SWAP.INFO SST-AGE-LIMIT <sst age limit> */
goto propagate;
}
return;
robj *argv[2];

propagate:
replicationFeedSlaves(server.slaves,0,argv,argc);
argv[0] = shared.ping;
argv[1] = createStringObject(ping_argv, sdslen(ping_argv));

replicationFeedSlaves(server.slaves,0,argv,2);
return;
}
13 changes: 9 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2486,9 +2486,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
run_with_period(1000*(int)server.swap_swap_info_slave_period) {
/* propagate sst age limit */
if (server.swap_ttl_compact_enabled) {
robj *argv[3];
sds *argv[3];
swapBuildSwapInfoSstAgeLimitCmd(argv, server.swap_ttl_compact_ctx->expire_stats->sst_age_limit);
swapPropagateSwapInfo(3, argv);
sds swap_info = swapEncodeSwapInfo(3, argv);
swapPropagateSwapInfo(swap_info);
swapDestorySwapInfoSstAgeLimitCmd(argv);
}
}
Expand Down Expand Up @@ -4900,10 +4901,14 @@ void pingCommand(client *c) {
else
addReplyBulk(c,c->argv[1]);
} else {
if (c->argc == 1)
if (c->argc == 1) {
addReply(c,shared.pong);
else
} else {
sds *swap_info = swapDecodeSwapInfo(c->argv[1]->ptr);
/* apply swap info ... */
/* ... */
addReplyBulk(c,c->argv[1]);
}
}
}

Expand Down

0 comments on commit 50e8cee

Please sign in to comment.