From 25d584eb5b11ca5d79e5b7146253790ecdaa97fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 4 Oct 2024 11:43:16 +0200 Subject: [PATCH 1/9] Refactor internal function parse_cluster_nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parse the string without using sdssplitlen which allocates memory for each element. Parse each line once to get a valkeyClusterNode. Signed-off-by: Björn Svensson --- src/cluster.c | 407 ++++++++++++++---------------- tests/ct_out_of_memory_handling.c | 8 +- 2 files changed, 200 insertions(+), 215 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index f3dad34b..2ae2476d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -461,74 +461,6 @@ static valkeyClusterNode *node_get_with_slots(valkeyClusterContext *cc, return NULL; } -/** - * Return a new node with the "cluster nodes" command reply. - */ -static valkeyClusterNode *node_get_with_nodes(valkeyClusterContext *cc, - sds *node_infos, int info_count, - uint8_t role) { - char *p = NULL; - valkeyClusterNode *node = NULL; - - if (info_count < 8) { - return NULL; - } - - node = createValkeyClusterNode(); - if (node == NULL) { - goto oom; - } - - if (role == VALKEY_ROLE_MASTER) { - node->slots = listCreate(); - if (node->slots == NULL) { - goto oom; - } - - node->slots->free = listClusterSlotDestructor; - } - - /* Handle field */ - node->name = node_infos[0]; - node_infos[0] = NULL; /* Ownership moved */ - - /* Handle field - * Remove @cport... since addr is used as a dict key which should be : */ - if ((p = strchr(node_infos[1], PORT_CPORT_SEPARATOR)) != NULL) { - sdsrange(node_infos[1], 0, p - node_infos[1] - 1 /* skip @ */); - } - node->addr = node_infos[1]; - node_infos[1] = NULL; /* Ownership moved */ - - node->role = role; - - /* Get the ip part */ - if ((p = strrchr(node->addr, IP_PORT_SEPARATOR)) == NULL) { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "server address is incorrect, port separator missing."); - goto error; - } - node->host = sdsnewlen(node->addr, p - node->addr); - if (node->host == NULL) { - goto oom; - } - p++; // remove found separator character - - /* Get the port part */ - node->port = vk_atoi(p, strlen(p)); - - return node; - -oom: - valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - // passthrough - -error: - freeValkeyClusterNode(node); - return NULL; -} - static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { dictEntry *de_f, *de_t; valkeyClusterNode *node_f, *node_t; @@ -852,23 +784,169 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { return NULL; } +/* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if + * the 'replica_master_id' argument is NULL, otherwise replicas are parsed and + * its master_id is given via 'replica_master_id'. */ +static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, + valkeyClusterNode **parsed_node, char **replica_master_id) { + char *p, *id = NULL, *addr = NULL, *flags = NULL, *master_id = NULL, + *link_state = NULL, *slots = NULL; + // clang-format off + /* Find required fields. */ + int i = 0; + while ((p = strchr(line, ' ')) != NULL) { + *p = '\0'; + switch(i++){ + case 0: id = line; break; + case 1: addr = line; break; + case 2: flags = line; break; + case 3: master_id = line; break; + case 7: link_state = line; break; + } + line = p + 1; /* Start of next field. */ + if (i == 8) { slots = line; break; } + } + if (i == 7 && line[0] != '\0') link_state = line; + // clang-format on + + if (link_state == NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Mandatory fields missing"); + return VALKEY_ERR; + } + + /* Parse flags. */ + uint8_t role = VALKEY_ROLE_NULL; + while (*flags != '\0') { + if ((p = strchr(flags, ',')) != NULL) + *p = '\0'; + if (memcmp(flags, "master", 6) == 0) { + role = VALKEY_ROLE_MASTER; + break; + } + if (memcmp(flags, "slave", 5) == 0) { + role = VALKEY_ROLE_SLAVE; + break; + } + if (p == NULL) /* No more flags. */ + break; + flags = p + 1; /* Start of next flag. */ + } + if (role == VALKEY_ROLE_NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unknown role"); + return VALKEY_ERR; + } + + /* Only parse replicas when requested. */ + if (role == VALKEY_ROLE_SLAVE && replica_master_id == NULL) { + *parsed_node = NULL; + return VALKEY_OK; + } + + valkeyClusterNode *node = createValkeyClusterNode(); + if (node == NULL) { + goto oom; + } + node->role = role; + node->name = sdsnew(id); + if (node->name == NULL) + goto oom; + + /* Handle address field + * Remove @cport... since addr is used as a dict key which should be : */ + if ((p = strchr(addr, PORT_CPORT_SEPARATOR)) != NULL) { + *p = '\0'; + } + node->addr = sdsnew(addr); + if (node->addr == NULL) + goto oom; + + /* Get the host part */ + if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address"); + freeValkeyClusterNode(node); + return VALKEY_ERR; + } + *p = '\0'; + + /* Skip nodes where address starts with ":0", i.e. 'noaddr'. */ + if (strlen(addr) == 0) { + freeValkeyClusterNode(node); + *parsed_node = NULL; + return VALKEY_OK; + } + node->host = sdsnew(addr); + if (node->host == NULL) + goto oom; + + /* Get the port. */ + p++; // Skip separator character. + node->port = vk_atoi(p, strlen(p)); + + /* No slot parsing needed for replicas, but return master id. */ + if (node->role == VALKEY_ROLE_SLAVE) { + *replica_master_id = master_id; + *parsed_node = node; + return VALKEY_OK; + } + + node->slots = listCreate(); + if (node->slots == NULL) + goto oom; + node->slots->free = listClusterSlotDestructor; + + /* Parse slots when available. */ + if (slots == NULL) { + *parsed_node = node; + return VALKEY_OK; + } + /* Parse each slot element. */ + while (*slots != '\0') { + if ((p = strchr(slots, ' ')) != NULL) + *p = '\0'; + char *entry = slots; + if (entry[0] == '[') + break; /* Skip importing/migrating slots at string end. */ + + int slot_start, slot_end; + char *sp = strchr(entry, '-'); + if (sp == NULL) { + slot_start = vk_atoi(entry, strlen(entry)); + slot_end = slot_start; + } else { + *sp = '\0'; + slot_start = vk_atoi(entry, strlen(entry)); + entry = sp + 1; // Skip '-' + slot_end = vk_atoi(entry, strlen(entry)); + } + + /* Create a slot entry owned by the node. */ + cluster_slot *slot = cluster_slot_create(node); + if (slot == NULL) + goto oom; + slot->start = (uint32_t)slot_start; + slot->end = (uint32_t)slot_end; + + if (p == NULL) /* Check if this was the last entry. */ + break; + slots = p + 1; /* Start of next entry. */ + } + *parsed_node = node; + return VALKEY_OK; + +oom: + freeValkeyClusterNode(node); + valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); + return VALKEY_ERR; +} + /** * Parse the "cluster nodes" command reply to nodes dict. */ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { - int ret; dict *nodes = NULL; dict *nodes_name = NULL; - valkeyClusterNode *master, *slave; - cluster_slot *slot; - char *pos, *start, *end, *line_start, *line_end; - char *role; - int role_len; - int slot_start, slot_end, slot_ranges_found = 0; - sds *part = NULL, *slot_start_end = NULL; - int count_part = 0, count_slot_start_end = 0; - int k; - int len; + int slot_ranges_found = 0; + int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE; if (reply->type != VALKEY_REPLY_STRING) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); @@ -880,148 +958,57 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { goto oom; } - start = reply->str; - end = start + reply->len; - - line_start = start; + char *lines = reply->str; /* NULL terminated string. */ + char *p, *line; + while ((p = strchr(lines, '\n')) != NULL) { + *p = '\0'; + line = lines; + lines = p + 1; /* Start of next line. */ - for (pos = start; pos < end; pos++) { - if (*pos == '\n') { - line_end = pos - 1; - len = line_end - line_start; - - part = sdssplitlen(line_start, len + 1, " ", 1, &count_part); - if (part == NULL) { + char *master_id; + valkeyClusterNode *node; + if (parse_cluster_nodes_line(cc, line, &node, add_replicas ? &master_id : NULL) != VALKEY_OK) + goto error; + if (node == NULL) + continue; /* Line skipped. */ + if (node->role == VALKEY_ROLE_MASTER) { + sds key = sdsnew(node->addr); + if (key == NULL) { + freeValkeyClusterNode(node); goto oom; } - - if (count_part < 8) { + if (dictFind(nodes, key) != NULL) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "split cluster nodes error"); + "Duplicate addresses in cluster nodes response"); + sdsfree(key); + freeValkeyClusterNode(node); goto error; } - - // if the address string starts with ":0", skip this node. - if (sdslen(part[1]) >= 2 && memcmp(part[1], ":0", 2) == 0) { - sdsfreesplitres(part, count_part); - count_part = 0; - part = NULL; - - start = pos + 1; - line_start = start; - pos = start; - - continue; - } - - if (sdslen(part[2]) >= 7 && memcmp(part[2], "myself,", 7) == 0) { - role_len = sdslen(part[2]) - 7; - role = part[2] + 7; - } else { - role_len = sdslen(part[2]); - role = part[2]; + if (dictAdd(nodes, key, node) != DICT_OK) { + sdsfree(key); + freeValkeyClusterNode(node); + goto oom; } + slot_ranges_found += listLength(node->slots); - // add master node - if (role_len >= 6 && memcmp(role, "master", 6) == 0) { - master = node_get_with_nodes(cc, part, count_part, - VALKEY_ROLE_MASTER); - if (master == NULL) { + if (add_replicas) { + if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) { goto error; } - - sds key = sdsnewlen(master->addr, sdslen(master->addr)); - if (key == NULL) { - freeValkeyClusterNode(master); - goto oom; - } - if (dictFind(nodes, key) != NULL) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Duplicate addresses in cluster nodes response"); - sdsfree(key); - freeValkeyClusterNode(master); - goto error; - } - if (dictAdd(nodes, key, master) != DICT_OK) { - sdsfree(key); - freeValkeyClusterNode(master); - goto oom; - } - - if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { - ret = cluster_master_slave_mapping_with_name( - cc, &nodes_name, master, master->name); - if (ret != VALKEY_OK) { - goto error; - } - } - - for (k = 8; k < count_part; k++) { - slot_start_end = sdssplitlen(part[k], sdslen(part[k]), "-", - 1, &count_slot_start_end); - if (slot_start_end == NULL) { - goto oom; - } - - if (count_slot_start_end == 1) { - slot_start = vk_atoi(slot_start_end[0], - sdslen(slot_start_end[0])); - slot_end = slot_start; - } else if (count_slot_start_end == 2) { - slot_start = vk_atoi(slot_start_end[0], - sdslen(slot_start_end[0])); - slot_end = vk_atoi(slot_start_end[1], - sdslen(slot_start_end[1])); - } else { - slot_start = -1; - slot_end = -1; - } - - sdsfreesplitres(slot_start_end, count_slot_start_end); - count_slot_start_end = 0; - slot_start_end = NULL; - - if (slot_start < 0 || slot_end < 0 || - slot_start > slot_end || - slot_end >= VALKEYCLUSTER_SLOTS) { - continue; - } - slot_ranges_found += 1; - - slot = cluster_slot_create(master); - if (slot == NULL) { - goto oom; - } - - slot->start = (uint32_t)slot_start; - slot->end = (uint32_t)slot_end; - } - } - // add slave node - else if ((cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) && - (role_len >= 5 && memcmp(role, "slave", 5) == 0)) { - slave = node_get_with_nodes(cc, part, count_part, - VALKEY_ROLE_SLAVE); - if (slave == NULL) { - goto error; - } - - ret = cluster_master_slave_mapping_with_name(cc, &nodes_name, - slave, part[3]); - if (ret != VALKEY_OK) { - freeValkeyClusterNode(slave); - goto error; - } + } else { + assert(node->role == VALKEY_ROLE_SLAVE); + sds id = sdsnew(master_id); + if (id == NULL) { + freeValkeyClusterNode(node); + goto oom; } - - sdsfreesplitres(part, count_part); - count_part = 0; - part = NULL; - - start = pos + 1; - line_start = start; - pos = start; + if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) { + freeValkeyClusterNode(node); + sdsfree(id); + goto error; + } + sdsfree(id); } } @@ -1039,8 +1026,6 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { // passthrough error: - sdsfreesplitres(part, count_part); - sdsfreesplitres(slot_start_end, count_slot_start_end); if (nodes_name != NULL) { /* Only free parsed replicas since the `nodes` dict owns primary nodes. */ dictIterator di; diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 0a56a4f8..6877c8d8 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -171,14 +171,14 @@ void test_alloc_failure_handling(void) { // Connect { - for (int i = 0; i < 148; ++i) { + for (int i = 0; i < 91; ++i) { prepare_allocation_test(cc, i); result = valkeyClusterConnect2(cc); assert(result == VALKEY_ERR); ASSERT_STR_EQ(cc->errstr, "Out of memory"); } - prepare_allocation_test(cc, 148); + prepare_allocation_test(cc, 91); result = valkeyClusterConnect2(cc); assert(result == VALKEY_OK); } @@ -521,14 +521,14 @@ void test_alloc_failure_handling_async(void) { // Connect { - for (int i = 0; i < 146; ++i) { + for (int i = 0; i < 89; ++i) { prepare_allocation_test(acc->cc, i); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_ERR); ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); } - prepare_allocation_test(acc->cc, 146); + prepare_allocation_test(acc->cc, 89); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_OK); } From a41a55fa28445f2e3ec23e3e92dbe38f5818419e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 8 Oct 2024 19:04:03 +0200 Subject: [PATCH 2/9] Refactor replica handling in CLUSTER NODES parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Store parsed replicas in a separate dict during parsing, and move them to their primary when all lines are parsed. This dict owns the memory for added nodes until moved. Previously the dict contained references to both replicas and primaries and a bit harder to handle. Signed-off-by: Björn Svensson --- src/cluster.c | 217 +++++++++++++----------------- tests/ct_out_of_memory_handling.c | 8 +- tests/ut_slotmap_update.c | 8 +- 3 files changed, 103 insertions(+), 130 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 2ae2476d..e4a17843 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -504,101 +504,6 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { } } -static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc, - dict **nodes, - valkeyClusterNode *node, - sds master_name) { - int ret; - dictEntry *di; - valkeyClusterNode *node_old; - listNode *lnode; - - if (node == NULL || master_name == NULL) { - return VALKEY_ERR; - } - - if (*nodes == NULL) { - *nodes = dictCreate(&clusterNodesRefDictType, NULL); - if (*nodes == NULL) { - goto oom; - } - } - - di = dictFind(*nodes, master_name); - if (di == NULL) { - sds key = sdsnewlen(master_name, sdslen(master_name)); - if (key == NULL) { - goto oom; - } - ret = dictAdd(*nodes, key, node); - if (ret != DICT_OK) { - sdsfree(key); - goto oom; - } - - } else { - node_old = dictGetEntryVal(di); - if (node_old == NULL) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "dict get value null"); - return VALKEY_ERR; - } - - if (node->role == VALKEY_ROLE_MASTER && - node_old->role == VALKEY_ROLE_MASTER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "two masters have the same name"); - return VALKEY_ERR; - } else if (node->role == VALKEY_ROLE_MASTER && - node_old->role == VALKEY_ROLE_SLAVE) { - if (node->slaves == NULL) { - node->slaves = listCreate(); - if (node->slaves == NULL) { - goto oom; - } - - node->slaves->free = listClusterNodeDestructor; - } - - if (node_old->slaves != NULL) { - while (listLength(node_old->slaves) > 0) { - lnode = listFirst(node_old->slaves); - if (listAddNodeHead(node->slaves, lnode->value) == NULL) { - goto oom; - } - node_old->slaves->free = NULL; - listDelNode(node_old->slaves, lnode); - } - listRelease(node_old->slaves); - node_old->slaves = NULL; - } - - if (listAddNodeHead(node->slaves, node_old) == NULL) { - goto oom; - } - dictSetHashVal(*nodes, di, node); - - } else if (node->role == VALKEY_ROLE_SLAVE) { - if (node_old->slaves == NULL) { - node_old->slaves = listCreate(); - if (node_old->slaves == NULL) { - goto oom; - } - - node_old->slaves->free = listClusterNodeDestructor; - } - if (listAddNodeTail(node_old->slaves, node) == NULL) { - goto oom; - } - } - } - - return VALKEY_OK; - -oom: - valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - return VALKEY_ERR; -} - /** * Parse the "cluster slots" command reply to nodes dict. */ @@ -784,6 +689,93 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { return NULL; } +/* Store a parsed replica node in given dict using the primary_id as key. + * Additional replicas for a primary are stored within the first replica. */ +static int store_replica_node(dict **replicas, char *primary_id, valkeyClusterNode *node) { + if (*replicas == NULL) { + *replicas = dictCreate(&clusterNodesDictType, NULL); + if (replicas == NULL) + return VALKEY_ERR; + } + + sds key = sdsnew(primary_id); + if (key == NULL) + return VALKEY_ERR; + + dictEntry *de = dictFind(*replicas, key); + if (de == NULL) { + if (dictAdd(*replicas, key, node) != DICT_OK) { + sdsfree(key); + return VALKEY_ERR; + } + return VALKEY_OK; + } + + /* Store replica node in the existing replica node. */ + sdsfree(key); + valkeyClusterNode *n = dictGetEntryVal(de); + if (n->slaves == NULL) { + n->slaves = listCreate(); + if (n->slaves == NULL) + return VALKEY_ERR; + n->slaves->free = listClusterNodeDestructor; + } + if (listAddNodeTail(n->slaves, node) == NULL) + return VALKEY_ERR; + + return VALKEY_OK; +} + +/* Move parsed replica nodes from the collection to related primary. */ +static int move_replica_nodes(dict *replicas, dict *nodes) { + if (replicas == NULL) + return VALKEY_OK; + + dictIterator di; + dictInitIterator(&di, nodes); + dictEntry *de; + while ((de = dictNext(&di))) { + valkeyClusterNode *primary = dictGetEntryVal(de); + + /* Move all replica nodes related to this primary. */ + dictEntry *der = dictFind(replicas, primary->name); + if (der != NULL) { + if (primary->slaves == NULL) { + primary->slaves = listCreate(); + if (primary->slaves == NULL) { + return VALKEY_ERR; + } + primary->slaves->free = listClusterNodeDestructor; + } + + /* Move all replicas stored in the first parsed replica. */ + valkeyClusterNode *replica = dictGetEntryVal(der); + if (replica->slaves != NULL) { + while (listLength(replica->slaves) > 0) { + listNode *node = listFirst(replica->slaves); + if (listAddNodeTail(primary->slaves, node->value) == NULL) { + return VALKEY_ERR; + } + /* Delete element without freeing the moved cluster node. */ + replica->slaves->free = NULL; + listDelNode(replica->slaves, node); + replica->slaves->free = listClusterNodeDestructor; + } + listRelease(replica->slaves); + replica->slaves = NULL; + } + /* Move replica that was parsed first. */ + if (listAddNodeHead(primary->slaves, replica) == NULL) { + return VALKEY_ERR; + } + /* All replicas for this primary moved, set dict value + * to NULL avoiding freeing the moved memory. */ + dictSetHashVal(replicas, der, NULL); + } + } + return VALKEY_OK; +} + /* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if * the 'replica_master_id' argument is NULL, otherwise replicas are parsed and * its master_id is given via 'replica_master_id'. */ @@ -944,9 +936,9 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, */ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { dict *nodes = NULL; - dict *nodes_name = NULL; int slot_ranges_found = 0; int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE; + dict *replicas = NULL; if (reply->type != VALKEY_REPLY_STRING) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); @@ -991,24 +983,12 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { } slot_ranges_found += listLength(node->slots); - if (add_replicas) { - if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) { - goto error; - } - } } else { assert(node->role == VALKEY_ROLE_SLAVE); - sds id = sdsnew(master_id); - if (id == NULL) { + if (store_replica_node(&replicas, master_id, node) != VALKEY_OK) { freeValkeyClusterNode(node); goto oom; } - if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) { - freeValkeyClusterNode(node); - sdsfree(id); - goto error; - } - sdsfree(id); } } @@ -1017,7 +997,11 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { goto error; } - dictRelease(nodes_name); + /* Move parsed replica nodes to related primary nodes. */ + if (move_replica_nodes(replicas, nodes) != VALKEY_OK) { + goto oom; + } + dictRelease(replicas); return nodes; @@ -1026,18 +1010,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { // passthrough error: - if (nodes_name != NULL) { - /* Only free parsed replicas since the `nodes` dict owns primary nodes. */ - dictIterator di; - dictInitIterator(&di, nodes_name); - dictEntry *de; - while ((de = dictNext(&di))) { - valkeyClusterNode *node = dictGetEntryVal(de); - if (node->role == VALKEY_ROLE_SLAVE) - freeValkeyClusterNode(node); - } - dictRelease(nodes_name); - } + dictRelease(replicas); dictRelease(nodes); return NULL; } diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 6877c8d8..201599fa 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -171,14 +171,14 @@ void test_alloc_failure_handling(void) { // Connect { - for (int i = 0; i < 91; ++i) { + for (int i = 0; i < 88; ++i) { prepare_allocation_test(cc, i); result = valkeyClusterConnect2(cc); assert(result == VALKEY_ERR); ASSERT_STR_EQ(cc->errstr, "Out of memory"); } - prepare_allocation_test(cc, 91); + prepare_allocation_test(cc, 88); result = valkeyClusterConnect2(cc); assert(result == VALKEY_OK); } @@ -521,14 +521,14 @@ void test_alloc_failure_handling_async(void) { // Connect { - for (int i = 0; i < 89; ++i) { + for (int i = 0; i < 86; ++i) { prepare_allocation_test(acc->cc, i); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_ERR); ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); } - prepare_allocation_test(acc->cc, 89); + prepare_allocation_test(acc->cc, 86); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_OK); } diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index b1cc848b..5d80f12c 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -289,14 +289,14 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) { assert(strcmp(node->addr, "127.0.0.1:30004") == 0); assert(node->role == VALKEY_ROLE_SLAVE); node = listNodeValue(listNext(&li)); - assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0); - assert(strcmp(node->addr, "127.0.0.1:30006") == 0); - assert(node->role == VALKEY_ROLE_SLAVE); - node = listNodeValue(listNext(&li)); assert(strcmp(node->name, "6ec23923021cf3ffec47632106199cb7f496ce01") == 0); assert(strcmp(node->addr, "127.0.0.1:30005") == 0); assert(node->role == VALKEY_ROLE_SLAVE); node = listNodeValue(listNext(&li)); + assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0); + assert(strcmp(node->addr, "127.0.0.1:30006") == 0); + assert(node->role == VALKEY_ROLE_SLAVE); + node = listNodeValue(listNext(&li)); assert(strcmp(node->name, "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1") == 0); assert(strcmp(node->addr, "127.0.0.1:30002") == 0); assert(node->role == VALKEY_ROLE_SLAVE); From 1205064508b2633b2dc9b1c3d9eb82a8efbf1a47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 15 Oct 2024 11:21:54 +0200 Subject: [PATCH 3/9] fixup: inconsistent naming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use primary and replica in added code. Signed-off-by: Björn Svensson --- src/cluster.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e4a17843..a42627c6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -780,8 +780,8 @@ static int move_replica_nodes(dict *replicas, dict *nodes) { * the 'replica_master_id' argument is NULL, otherwise replicas are parsed and * its master_id is given via 'replica_master_id'. */ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, - valkeyClusterNode **parsed_node, char **replica_master_id) { - char *p, *id = NULL, *addr = NULL, *flags = NULL, *master_id = NULL, + valkeyClusterNode **parsed_node, char **parsed_primary_id) { + char *p, *id = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL, *link_state = NULL, *slots = NULL; // clang-format off /* Find required fields. */ @@ -792,7 +792,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, case 0: id = line; break; case 1: addr = line; break; case 2: flags = line; break; - case 3: master_id = line; break; + case 3: primary_id = line; break; case 7: link_state = line; break; } line = p + 1; /* Start of next field. */ @@ -829,7 +829,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, } /* Only parse replicas when requested. */ - if (role == VALKEY_ROLE_SLAVE && replica_master_id == NULL) { + if (role == VALKEY_ROLE_SLAVE && parsed_primary_id == NULL) { *parsed_node = NULL; return VALKEY_OK; } @@ -876,7 +876,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, /* No slot parsing needed for replicas, but return master id. */ if (node->role == VALKEY_ROLE_SLAVE) { - *replica_master_id = master_id; + *parsed_primary_id = primary_id; *parsed_node = node; return VALKEY_OK; } @@ -957,9 +957,9 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { line = lines; lines = p + 1; /* Start of next line. */ - char *master_id; + char *primary_id; valkeyClusterNode *node; - if (parse_cluster_nodes_line(cc, line, &node, add_replicas ? &master_id : NULL) != VALKEY_OK) + if (parse_cluster_nodes_line(cc, line, &node, add_replicas ? &primary_id : NULL) != VALKEY_OK) goto error; if (node == NULL) continue; /* Line skipped. */ @@ -985,7 +985,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { } else { assert(node->role == VALKEY_ROLE_SLAVE); - if (store_replica_node(&replicas, master_id, node) != VALKEY_OK) { + if (store_replica_node(&replicas, primary_id, node) != VALKEY_OK) { freeValkeyClusterNode(node); goto oom; } From 430502452dad3a36b8d261c0cd34ba96c5f535fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 15 Oct 2024 11:39:46 +0200 Subject: [PATCH 4/9] fixup: change store_replica_node() api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Probably less odd. Signed-off-by: Björn Svensson --- src/cluster.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index a42627c6..737b48c1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -691,20 +691,14 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { /* Store a parsed replica node in given dict using the primary_id as key. * Additional replicas for a primary are stored within the first replica. */ -static int store_replica_node(dict **replicas, char *primary_id, valkeyClusterNode *node) { - if (*replicas == NULL) { - *replicas = dictCreate(&clusterNodesDictType, NULL); - if (replicas == NULL) - return VALKEY_ERR; - } - +static int store_replica_node(dict *replicas, char *primary_id, valkeyClusterNode *node) { sds key = sdsnew(primary_id); if (key == NULL) return VALKEY_ERR; - dictEntry *de = dictFind(*replicas, key); + dictEntry *de = dictFind(replicas, key); if (de == NULL) { - if (dictAdd(*replicas, key, node) != DICT_OK) { + if (dictAdd(replicas, key, node) != DICT_OK) { sdsfree(key); return VALKEY_ERR; } @@ -985,7 +979,13 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { } else { assert(node->role == VALKEY_ROLE_SLAVE); - if (store_replica_node(&replicas, primary_id, node) != VALKEY_OK) { + if (replicas == NULL) { + if ((replicas = dictCreate(&clusterNodesDictType, NULL)) == NULL) { + freeValkeyClusterNode(node); + goto oom; + } + } + if (store_replica_node(replicas, primary_id, node) != VALKEY_OK) { freeValkeyClusterNode(node); goto oom; } From 40ecf3ffa5a54442f95938a3b56a1c54cd531b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 15 Oct 2024 15:06:48 +0200 Subject: [PATCH 5/9] fixup: update comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- src/cluster.c | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 737b48c1..53c1031c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -720,7 +720,8 @@ static int store_replica_node(dict *replicas, char *primary_id, valkeyClusterNod return VALKEY_OK; } -/* Move parsed replica nodes from the collection to related primary. */ +/* Move parsed replica nodes to its primary node, which holds a list of replica + * nodes. The `replicas` dict shall contain nodes with primary_id as key. */ static int move_replica_nodes(dict *replicas, dict *nodes) { if (replicas == NULL) return VALKEY_OK; @@ -770,15 +771,19 @@ static int move_replica_nodes(dict *replicas, dict *nodes) { return VALKEY_OK; } -/* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if - * the 'replica_master_id' argument is NULL, otherwise replicas are parsed and - * its master_id is given via 'replica_master_id'. */ +/* Parse a node from a single CLUSTER NODES line. Returns an allocated + * valkeyClusterNode as a pointer in `parsed_node`. + * Only parse primary nodes if the `parsed_primary_id` argument is NULL, + * otherwise replicas are also parsed and its primary_id is returned by pointer + * via 'parsed_primary_id'. */ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, valkeyClusterNode **parsed_node, char **parsed_primary_id) { char *p, *id = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL, *link_state = NULL, *slots = NULL; + /* Find required fields and keep a pointer to each field: + * [ ...] + */ // clang-format off - /* Find required fields. */ int i = 0; while ((p = strchr(line, ' ')) != NULL) { *p = '\0'; @@ -800,7 +805,8 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, return VALKEY_ERR; } - /* Parse flags. */ + /* Parse flags, a comma separated list of following flags: + * myself, master, slave, fail?, fail, handshake, noaddr, nofailover, noflags. */ uint8_t role = VALKEY_ROLE_NULL; while (*flags != '\0') { if ((p = strchr(flags, ',')) != NULL) @@ -837,8 +843,8 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, if (node->name == NULL) goto oom; - /* Handle address field - * Remove @cport... since addr is used as a dict key which should be : */ + /* Parse the address field: + * Remove @cport.. to get : which is our dict key. */ if ((p = strchr(addr, PORT_CPORT_SEPARATOR)) != NULL) { *p = '\0'; } From 463611ff863260550f2e34685f63ecb9450038c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 28 Oct 2024 10:04:34 +0100 Subject: [PATCH 6/9] fixup: remove unused dictType MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- src/cluster.c | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 53c1031c..485f68b5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -145,19 +145,6 @@ dictType clusterNodesDictType = { dictClusterNodeDestructor /* val destructor */ }; -/* Referenced cluster node hash table - * maps node id (437c719f5.....) to a valkeyClusterNode - * No ownership of valkeyClusterNode memory - */ -dictType clusterNodesRefDictType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL /* val destructor */ -}; - void listCommandFree(void *command) { struct cmd *cmd = command; command_destroy(cmd); From 9b9245f4f18d68278bdcb5c36e1c47fc40625642 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 28 Oct 2024 12:08:11 +0100 Subject: [PATCH 7/9] fixup: cleanup how replicas are kept while parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change old way of storing replicas while parsing cluster nodes. Simply keep lists of cluster nodes in the replica dict, which then can be moved to the nodes dict. Signed-off-by: Björn Svensson --- src/cluster.c | 98 +++++++++++++++++++++++---------------------------- 1 file changed, 45 insertions(+), 53 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 485f68b5..4c1371e8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -132,6 +132,12 @@ void dictClusterNodeDestructor(void *privdata, void *val) { freeValkeyClusterNode(val); } +/* Destructor function for clusterNodeListDictType. */ +void dictClusterNodeListDestructor(void *privdata, void *val) { + DICT_NOTUSED(privdata); + listRelease(val); +} + /* Cluster node hash table * maps node address (1.2.3.4:6379) to a valkeyClusterNode * Has ownership of valkeyClusterNode memory @@ -145,6 +151,16 @@ dictType clusterNodesDictType = { dictClusterNodeDestructor /* val destructor */ }; +/* Hash table dictType to map node address to a list of valkeyClusterNodes. */ +dictType clusterNodeListDictType = { + dictSdsHash, /* hashFunction */ + NULL, /* keyDup */ + NULL, /* valDup */ + dictSdsKeyCompare, /* keyCompare */ + dictSdsDestructor, /* keyDestructor */ + dictClusterNodeListDestructor /* valDestructor */ +}; + void listCommandFree(void *command) { struct cmd *cmd = command; command_destroy(cmd); @@ -676,40 +692,42 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { return NULL; } -/* Store a parsed replica node in given dict using the primary_id as key. - * Additional replicas for a primary are stored within the first replica. */ -static int store_replica_node(dict *replicas, char *primary_id, valkeyClusterNode *node) { +/* Keep lists of parsed replica nodes in a dict using the primary_id as key. */ +static int retain_replica_node(dict *replicas, char *primary_id, valkeyClusterNode *node) { sds key = sdsnew(primary_id); if (key == NULL) return VALKEY_ERR; + struct hilist *replicaList; + dictEntry *de = dictFind(replicas, key); if (de == NULL) { - if (dictAdd(replicas, key, node) != DICT_OK) { + /* Create list to hold replicas for a primary. */ + replicaList = listCreate(); + if (replicaList == NULL) { sdsfree(key); return VALKEY_ERR; } - return VALKEY_OK; - } - - /* Store replica node in the existing replica node. */ - sdsfree(key); - valkeyClusterNode *n = dictGetEntryVal(de); - if (n->slaves == NULL) { - n->slaves = listCreate(); - if (n->slaves == NULL) + replicaList->free = listClusterNodeDestructor; + if (dictAdd(replicas, key, replicaList) != DICT_OK) { + sdsfree(key); + listRelease(replicaList); return VALKEY_ERR; - n->slaves->free = listClusterNodeDestructor; + } + } else { + sdsfree(key); + replicaList = dictGetEntryVal(de); } - if (listAddNodeTail(n->slaves, node) == NULL) + + if (listAddNodeTail(replicaList, node) == NULL) return VALKEY_ERR; return VALKEY_OK; } -/* Move parsed replica nodes to its primary node, which holds a list of replica - * nodes. The `replicas` dict shall contain nodes with primary_id as key. */ -static int move_replica_nodes(dict *replicas, dict *nodes) { +/* Store parsed replica nodes in the primary nodes, which holds a list of replica + * nodes. The `replicas` dict shall contain lists of nodes with primary_id as key. */ +static int store_replica_nodes(dict *nodes, dict *replicas) { if (replicas == NULL) return VALKEY_OK; @@ -719,39 +737,12 @@ static int move_replica_nodes(dict *replicas, dict *nodes) { while ((de = dictNext(&di))) { valkeyClusterNode *primary = dictGetEntryVal(de); - /* Move all replica nodes related to this primary. */ + /* Move replica nodes related to this primary. */ dictEntry *der = dictFind(replicas, primary->name); if (der != NULL) { - if (primary->slaves == NULL) { - primary->slaves = listCreate(); - if (primary->slaves == NULL) { - return VALKEY_ERR; - } - primary->slaves->free = listClusterNodeDestructor; - } - - /* Move all replicas stored in the first parsed replica. */ - valkeyClusterNode *replica = dictGetEntryVal(der); - if (replica->slaves != NULL) { - while (listLength(replica->slaves) > 0) { - listNode *node = listFirst(replica->slaves); - if (listAddNodeTail(primary->slaves, node->value) == NULL) { - return VALKEY_ERR; - } - /* Delete element without freeing the moved cluster node. */ - replica->slaves->free = NULL; - listDelNode(replica->slaves, node); - replica->slaves->free = listClusterNodeDestructor; - } - listRelease(replica->slaves); - replica->slaves = NULL; - } - /* Move replica that was parsed first. */ - if (listAddNodeHead(primary->slaves, replica) == NULL) { - return VALKEY_ERR; - } - /* All replicas for this primary moved, set dict value - * to NULL avoiding freeing the moved memory. */ + assert(primary->slaves == NULL); + /* Move replica list from replicas dict to nodes dict. */ + primary->slaves = dictGetEntryVal(der); dictSetHashVal(replicas, der, NULL); } } @@ -973,12 +964,13 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { } else { assert(node->role == VALKEY_ROLE_SLAVE); if (replicas == NULL) { - if ((replicas = dictCreate(&clusterNodesDictType, NULL)) == NULL) { + if ((replicas = dictCreate(&clusterNodeListDictType, NULL)) == NULL) { freeValkeyClusterNode(node); goto oom; } } - if (store_replica_node(replicas, primary_id, node) != VALKEY_OK) { + /* Retain parsed replica nodes until all primaries are parsed. */ + if (retain_replica_node(replicas, primary_id, node) != VALKEY_OK) { freeValkeyClusterNode(node); goto oom; } @@ -990,8 +982,8 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { goto error; } - /* Move parsed replica nodes to related primary nodes. */ - if (move_replica_nodes(replicas, nodes) != VALKEY_OK) { + /* Store the retained replica nodes in primary nodes. */ + if (store_replica_nodes(nodes, replicas) != VALKEY_OK) { goto oom; } dictRelease(replicas); From a0917229a1a24dedc3b9bb8beeac865a3765cf14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 28 Oct 2024 13:18:29 +0100 Subject: [PATCH 8/9] Update src/cluster.c MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Viktor Söderqvist Signed-off-by: Björn Svensson --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 4c1371e8..59668c6a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -765,7 +765,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, int i = 0; while ((p = strchr(line, ' ')) != NULL) { *p = '\0'; - switch(i++){ + switch (i++){ case 0: id = line; break; case 1: addr = line; break; case 2: flags = line; break; From 4437c8881af9604459c4944518695d4557c6e175 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 28 Oct 2024 13:26:15 +0100 Subject: [PATCH 9/9] Update src/cluster.c MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 59668c6a..f5959e6e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -765,7 +765,7 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, int i = 0; while ((p = strchr(line, ' ')) != NULL) { *p = '\0'; - switch (i++){ + switch (i++) { case 0: id = line; break; case 1: addr = line; break; case 2: flags = line; break;