Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry when an async slotmap update fails #252

Merged
merged 15 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 89 additions & 81 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,37 +437,9 @@ static int authenticate(redisClusterContext *cc, redisContext *c) {
* Return a new node with the "cluster slots" command reply.
*/
static redisClusterNode *node_get_with_slots(redisClusterContext *cc,
redisReply *host_elem,
redisReply *port_elem,
char *host, int port,
uint8_t role) {
redisClusterNode *node = NULL;

if (host_elem == NULL || port_elem == NULL) {
return NULL;
}

if (host_elem->type != REDIS_REPLY_STRING || host_elem->len <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"node ip is not string.");
goto error;
}

if (port_elem->type != REDIS_REPLY_INTEGER || port_elem->integer <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"node port is not integer.");
goto error;
}

if (!hi_valid_port((int)port_elem->integer)) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"node port is not valid.");
goto error;
}

node = createRedisClusterNode();
redisClusterNode *node = createRedisClusterNode();
if (node == NULL) {
goto oom;
}
Expand All @@ -481,29 +453,26 @@ static redisClusterNode *node_get_with_slots(redisClusterContext *cc,
node->slots->free = listClusterSlotDestructor;
}

node->addr = sdsnewlen(host_elem->str, host_elem->len);
node->addr = sdsnew(host);
if (node->addr == NULL) {
goto oom;
}
node->addr = sdscatfmt(node->addr, ":%i", port_elem->integer);
node->addr = sdscatfmt(node->addr, ":%i", port);
if (node->addr == NULL) {
goto oom;
}
node->host = sdsnewlen(host_elem->str, host_elem->len);
node->host = sdsnew(host);
if (node->host == NULL) {
goto oom;
}
node->name = NULL;
node->port = (int)port_elem->integer;
node->port = port;
node->role = role;

return node;

oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough

error:
if (node != NULL) {
sdsfree(node->addr);
sdsfree(node->host);
Expand All @@ -516,8 +485,8 @@ static redisClusterNode *node_get_with_slots(redisClusterContext *cc,
* Return a new node with the "cluster nodes" command reply.
*/
static redisClusterNode *node_get_with_nodes(redisClusterContext *cc,
sds *node_infos, int info_count,
uint8_t role) {
redisContext *c, sds *node_infos,
int info_count, uint8_t role) {
char *p = NULL;
redisClusterNode *node = NULL;

Expand All @@ -530,12 +499,12 @@ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc,
goto oom;
}

node->role = role;
if (role == REDIS_ROLE_MASTER) {
node->slots = listCreate();
if (node->slots == NULL) {
goto oom;
}

node->slots->free = listClusterSlotDestructor;
}

Expand All @@ -548,27 +517,50 @@ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc,
if ((p = strchr(node_infos[1], PORT_CPORT_SEPARATOR)) != NULL) {
sdsrange(node_infos[1], 0, p - node_infos[1] - 1 /* skip @ */);
}
node->addr = node_infos[1];
node_infos[1] = NULL; /* Ownership moved */

node->role = role;

/* Get the ip part */
if ((p = strrchr(node->addr, IP_PORT_SEPARATOR)) == NULL) {
/* Find the port separator. */
if ((p = strrchr(node_infos[1], IP_PORT_SEPARATOR)) == NULL) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"server address is incorrect, port separator missing.");
goto error;
}
node->host = sdsnewlen(node->addr, p - node->addr);
if (node->host == NULL) {
goto oom;

/* Get the port (skip the found port separator). */
int port = hi_atoi(p + 1, strlen(p + 1));
if (port < 1 || port > UINT16_MAX) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "Invalid port");
goto error;
}
p++; // remove found separator character
node->port = port;

/* Get the port part */
node->port = hi_atoi(p, strlen(p));
/* Check that we received an ip/host address, i.e. the field does not
* start with the found port separator. */
if (node_infos[1] != p) {
node->addr = node_infos[1];
node_infos[1] = NULL; /* Ownership moved */

node->host = sdsnewlen(node->addr, p - node->addr);
if (node->host == NULL) {
goto oom;
}
} else {
/* We received an ip/host that is an empty string. According to the docs
* we can treat this as it means the same address we sent this command to. */
node->host = sdsnew(c->tcp.host);
if (node->host == NULL) {
goto oom;
}
/* Create a new addr field using correct host:port */
node->addr = sdsnew(node->host);
if (node->addr == NULL) {
goto oom;
}
node->addr = sdscatfmt(node->addr, ":%i", node->port);
if (node->addr == NULL) {
goto oom;
}
}
return node;

oom:
Expand Down Expand Up @@ -724,8 +716,8 @@ static int cluster_master_slave_mapping_with_name(redisClusterContext *cc,
/**
* Parse the "cluster slots" command reply to nodes dict.
*/
dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply,
int flags) {
static dict *parse_cluster_slots(redisClusterContext *cc, redisContext *c,
redisReply *reply) {
int ret;
cluster_slot *slot = NULL;
dict *nodes = NULL;
Expand Down Expand Up @@ -812,23 +804,38 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply,
elem_ip = elem_nodes->element[0];
elem_port = elem_nodes->element[1];

/* Validate ip and port elements. Accept a NULL value ip (NIL type)
* since we will handle the unknown endpoint special. */
if (elem_ip == NULL || elem_port == NULL ||
elem_ip->type != REDIS_REPLY_STRING ||
elem_port->type != REDIS_REPLY_INTEGER) {
(elem_ip->type != REDIS_REPLY_STRING &&
elem_ip->type != REDIS_REPLY_NIL) ||
elem_port->type != REDIS_REPLY_INTEGER ||
!hi_valid_port((int)elem_port->integer)) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"master ip or port is not correct.");
"node ip or port is not correct.");
goto error;
}

/* Get the received ip/host. According to the docs an unknown
* endpoint or an empty string can be treated as it means
* the same address as we sent this command to.
* An unknown endpoint has the type REDIS_REPLY_NIL and its
* length is initiated to zero. */
char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host;
if (host == NULL) {
goto oom;
}
int port = elem_port->integer;

// this is master.
if (idx == 2) {
sds address = sdsnewlen(elem_ip->str, elem_ip->len);
sds address = sdsnew(host);
if (address == NULL) {
goto oom;
}
address = sdscatfmt(address, ":%i", elem_port->integer);
address = sdscatfmt(address, ":%i", port);
if (address == NULL) {
goto oom;
}
Expand All @@ -848,8 +855,8 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply,
break;
}

master = node_get_with_slots(cc, elem_ip, elem_port,
REDIS_ROLE_MASTER);
master =
node_get_with_slots(cc, host, port, REDIS_ROLE_MASTER);
if (master == NULL) {
goto error;
}
Expand All @@ -873,9 +880,9 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply,
}

slot = NULL;
} else if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) {
slave = node_get_with_slots(cc, elem_ip, elem_port,
REDIS_ROLE_SLAVE);
} else if (cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) {
slave =
node_get_with_slots(cc, host, port, REDIS_ROLE_SLAVE);
if (slave == NULL) {
goto error;
}
Expand Down Expand Up @@ -918,8 +925,8 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply,
/**
* Parse the "cluster nodes" command reply to nodes dict.
*/
dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len,
int flags) {
static dict *parse_cluster_nodes(redisClusterContext *cc, redisContext *c,
redisReply *reply) {
int ret;
dict *nodes = NULL;
dict *nodes_name = NULL;
Expand All @@ -939,8 +946,8 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len,
goto oom;
}

start = str;
end = start + str_len;
start = reply->str;
end = start + reply->len;

line_start = start;

Expand Down Expand Up @@ -983,7 +990,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len,

// add master node
if (role_len >= 6 && memcmp(role, "master", 6) == 0) {
master = node_get_with_nodes(cc, part, count_part,
master = node_get_with_nodes(cc, c, part, count_part,
REDIS_ROLE_MASTER);
if (master == NULL) {
goto error;
Expand All @@ -1006,7 +1013,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len,
goto error;
}

if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) {
if (cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) {
ret = cluster_master_slave_mapping_with_name(
cc, &nodes_name, master, master->name);
if (ret != REDIS_OK) {
Expand Down Expand Up @@ -1035,7 +1042,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len,
;
} else {
// add open slot for master
if (flags & HIRCLUSTER_FLAG_ADD_OPENSLOT &&
if (cc->flags & HIRCLUSTER_FLAG_ADD_OPENSLOT &&
count_slot_start_end == 3 &&
sdslen(slot_start_end[0]) > 1 &&
sdslen(slot_start_end[1]) == 1 &&
Expand Down Expand Up @@ -1134,10 +1141,10 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len,

}
// add slave node
else if ((flags & HIRCLUSTER_FLAG_ADD_SLAVE) &&
else if ((cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) &&
(role_len >= 5 && memcmp(role, "slave", 5) == 0)) {
slave =
node_get_with_nodes(cc, part, count_part, REDIS_ROLE_SLAVE);
slave = node_get_with_nodes(cc, c, part, count_part,
REDIS_ROLE_SLAVE);
if (slave == NULL) {
goto error;
}
Expand Down Expand Up @@ -1234,7 +1241,7 @@ static int handleClusterSlotsReply(redisClusterContext *cc, redisContext *c) {
return REDIS_ERR;
}

dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
dict *nodes = parse_cluster_slots(cc, c, reply);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}
Expand Down Expand Up @@ -1266,7 +1273,7 @@ static int handleClusterNodesReply(redisClusterContext *cc, redisContext *c) {
return REDIS_ERR;
}

dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
dict *nodes = parse_cluster_nodes(cc, c, reply);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}
Expand Down Expand Up @@ -3849,15 +3856,15 @@ void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) {
}

redisClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
dict *nodes = parse_cluster_slots(cc, &ac->c, reply);
if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) {
/* Ignore failures for now */
/* Retry using available nodes */
updateSlotMapAsync(acc, NULL);
}
}

/* Reply callback function for CLUSTER NODES */
void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) {
UNUSED(ac);
redisReply *reply = (redisReply *)r;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;
acc->lastSlotmapUpdateAttempt = hi_usec_now();
Expand All @@ -3869,9 +3876,10 @@ void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) {
}

redisClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
dict *nodes = parse_cluster_nodes(cc, &ac->c, reply);
if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) {
/* Ignore failures for now */
/* Retry using available nodes */
updateSlotMapAsync(acc, NULL);
}
}

Expand Down
4 changes: 0 additions & 4 deletions hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,6 @@ int redisClusterUpdateSlotmap(redisClusterContext *cc);

/* Internal functions */
redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node);
struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str,
int str_len, int flags);
struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply,
int flags);

/*
* Asynchronous API
Expand Down
8 changes: 8 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,11 @@ add_test(NAME client-disconnect-without-slotmap-update-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/client-disconnect-without-slotmap-update-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connect-during-cluster-startup-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connect-during-cluster-startup-using-cluster-nodes-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
Loading
Loading