From 1eddd31ebb7a6e9a9fb0ee3ec76b7e1d199b91fd Mon Sep 17 00:00:00 2001 From: Stipe Tolj Date: Thu, 11 Oct 2018 11:17:12 +0200 Subject: [PATCH 1/5] support for Cygwin 2.x --- command.c | 1 + fmacros.h | 3 +++ hiredis.h | 1 + 3 files changed, 5 insertions(+) diff --git a/command.c b/command.c index e32091b..115bc25 100644 --- a/command.c +++ b/command.c @@ -1,5 +1,6 @@ #include #include +#include #include "command.h" #include "hiutil.h" diff --git a/fmacros.h b/fmacros.h index a3b1df0..5f78f07 100644 --- a/fmacros.h +++ b/fmacros.h @@ -12,6 +12,9 @@ #define _POSIX_C_SOURCE 200112L #elif defined(__linux__) || defined(__OpenBSD__) || defined(__NetBSD__) #define _XOPEN_SOURCE 600 +#elif defined(__CYGWIN__) +#define _XOPEN_SOURCE 600 +#define _POSIX_C_SOURCE 200112L #else #define _XOPEN_SOURCE #endif diff --git a/hiredis.h b/hiredis.h index 87f7366..0c2b7e8 100644 --- a/hiredis.h +++ b/hiredis.h @@ -37,6 +37,7 @@ #include /* for va_list */ #include /* for struct timeval */ #include /* uintXX_t, etc */ +#include #include "sds.h" /* for sds */ #define HIREDIS_MAJOR 0 From dea16482782a79dd24bfac5027868315034bd1b6 Mon Sep 17 00:00:00 2001 From: Stipe Tolj Date: Mon, 15 Oct 2018 11:26:29 +0200 Subject: [PATCH 2/5] adding synchronous example --- examples/example-sync.c | 68 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 examples/example-sync.c diff --git a/examples/example-sync.c b/examples/example-sync.c new file mode 100644 index 0000000..5a3345c --- /dev/null +++ b/examples/example-sync.c @@ -0,0 +1,68 @@ +/* + * example-sync.c + * + * Created on: 12.10.2018 + * Author: tolj + */ + +#include +#include +int main() +{ + char *key="key-a"; + char *field="field-1"; + char *key1="key1"; + char *value1="value-1"; + char *key2="key1"; + char *value2="value-1"; + redisClusterContext *cc; + + cc = redisClusterContextInit(); + redisClusterSetOptionAddNodes(cc, "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"); + redisClusterSetOptionParseSlaves(cc); + redisClusterConnect2(cc); + if(cc == NULL || cc->err) + { + printf("connect error : %s\n", cc == NULL ? "NULL" : cc->errstr); + return -1; + } + + redisReply *reply = redisClusterCommand(cc, "dbsize"); + if(reply == NULL) + { + printf("reply is null[%s]\n", cc->errstr); + redisClusterFree(cc); + return -1; + } + + printf("reply->type:%d\n", reply->type); + printf("reply->str:%s\n", reply->str); + + freeReplyObject(reply); + + redisReply *reply = redisClusterCommand(cc, "hmget %s %s", key, field); + if(reply == NULL) + { + printf("reply is null[%s]\n", cc->errstr); + redisClusterFree(cc); + return -1; + } + + printf("reply->type:%d", reply->type); + + freeReplyObject(reply); + + reply = redisClusterCommand(cc, "mset %s %s %s %s", key1, value1, key2, value2); + if(reply == NULL) + { + printf("reply is null[%s]\n", cc->errstr); + redisClusterFree(cc); + return -1; + } + + printf("reply->str:%s", reply->str); + + freeReplyObject(reply); + redisClusterFree(cc); + return 0; +} From db3d0d7db93e5d4957a5f520bcc80059b55a3cd3 Mon Sep 17 00:00:00 2001 From: stolj Date: Tue, 16 Oct 2018 18:09:06 +0200 Subject: [PATCH 3/5] support for redis 4 and introducing some more API functions --- .gitignore | 3 + Makefile | 5 +- command.c | 16 ++++ command.h | 2 + examples/example-sync.c | 21 ++--- hircluster.c | 196 +++++++++++++++++++++++++++++++++++----- hircluster.h | 16 ++++ 7 files changed, 219 insertions(+), 40 deletions(-) diff --git a/.gitignore b/.gitignore index c44b5c5..b8f96e4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ /*.dylib /*.a /*.pc +/hiredis-test.exe +/.project +/.cproject diff --git a/Makefile b/Makefile index a321bd1..bf36f0b 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ # This file is released under the BSD license, see the COPYING file OBJ=net.o hiredis.o sds.o async.o read.o hiarray.o hiutil.o command.o crc16.o adlist.o hircluster.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-sync TESTS=hiredis-test LIBNAME=libhiredis_vip PKGCONFNAME=hiredis_vip.pc @@ -102,6 +102,9 @@ hiredis-example-libev: examples/example-libev.c adapters/libev.h $(STLIBNAME) hiredis-example-glib: examples/example-glib.c adapters/glib.h $(STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) $(REAL_LDFLAGS) $(shell pkg-config --cflags --libs glib-2.0) -I. $< $(STLIBNAME) +hiredis-example-sync: examples/example-sync.c + $(CC) -o examples/$@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -I. $< $(STLIBNAME) + ifndef AE_DIR hiredis-example-ae: @echo "Please specify AE_DIR (e.g. /src)" diff --git a/command.c b/command.c index 115bc25..23d3755 100644 --- a/command.c +++ b/command.c @@ -20,6 +20,8 @@ redis_argz(struct cmd *r) switch (r->type) { case CMD_REQ_REDIS_PING: case CMD_REQ_REDIS_QUIT: + case CMD_REQ_REDIS_INFO: + case CMD_REQ_REDIS_DBSIZE: return 1; default: @@ -612,6 +614,12 @@ redis_parse_cmd(struct cmd *r) break; } + if (str4icmp(m, 'i', 'n', 'f', 'o')) { + r->type = CMD_REQ_REDIS_INFO; + r->noforward = 1; + break; + } + if (str4icmp(m, 'a', 'u', 't', 'h')) { r->type = CMD_REQ_REDIS_AUTH; r->noforward = 1; @@ -814,6 +822,12 @@ redis_parse_cmd(struct cmd *r) break; } + if (str6icmp(m, 'd', 'b', 's', 'i', 'z', 'e')) { + r->type = CMD_REQ_REDIS_DBSIZE; + r->noforward = 1; + break; + } + break; case 7: @@ -941,6 +955,8 @@ redis_parse_cmd(struct cmd *r) break; } + break; + case 11: if (str11icmp(m, 'i', 'n', 'c', 'r', 'b', 'y', 'f', 'l', 'o', 'a', 't')) { r->type = CMD_REQ_REDIS_INCRBYFLOAT; diff --git a/command.h b/command.h index b7c388a..baf5299 100644 --- a/command.h +++ b/command.h @@ -119,6 +119,8 @@ typedef enum cmd_parse_result { ACTION( REQ_REDIS_EVALSHA ) \ ACTION( REQ_REDIS_PING ) /* redis requests - ping/quit */ \ ACTION( REQ_REDIS_QUIT) \ + ACTION( REQ_REDIS_INFO ) \ + ACTION( REQ_REDIS_DBSIZE ) \ ACTION( REQ_REDIS_AUTH) \ ACTION( RSP_REDIS_STATUS ) /* redis response */ \ ACTION( RSP_REDIS_ERROR ) \ diff --git a/examples/example-sync.c b/examples/example-sync.c index 5a3345c..571531c 100644 --- a/examples/example-sync.c +++ b/examples/example-sync.c @@ -16,6 +16,7 @@ int main() char *key2="key1"; char *value2="value-1"; redisClusterContext *cc; + redisReply *reply; cc = redisClusterContextInit(); redisClusterSetOptionAddNodes(cc, "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"); @@ -27,20 +28,11 @@ int main() return -1; } - redisReply *reply = redisClusterCommand(cc, "dbsize"); - if(reply == NULL) - { - printf("reply is null[%s]\n", cc->errstr); - redisClusterFree(cc); - return -1; - } + test_cluster_update_route(cc); - printf("reply->type:%d\n", reply->type); - printf("reply->str:%s\n", reply->str); - - freeReplyObject(reply); + printf("redisClusterDbSize %ld\n", (long)redisClusterDbSize(cc)); - redisReply *reply = redisClusterCommand(cc, "hmget %s %s", key, field); + reply = redisClusterCommand(cc, "hmget %s %s", key, field); if(reply == NULL) { printf("reply is null[%s]\n", cc->errstr); @@ -48,7 +40,7 @@ int main() return -1; } - printf("reply->type:%d", reply->type); + printf("reply->type:%d\n", reply->type); freeReplyObject(reply); @@ -60,9 +52,10 @@ int main() return -1; } - printf("reply->str:%s", reply->str); + printf("reply->str:%s\n", reply->str); freeReplyObject(reply); redisClusterFree(cc); return 0; } + diff --git a/hircluster.c b/hircluster.c index b051936..20903eb 100644 --- a/hircluster.c +++ b/hircluster.c @@ -21,7 +21,8 @@ #define REDIS_PROTOCOL_ASKING "*1\r\n$6\r\nASKING\r\n" -#define IP_PORT_SEPARATOR ":" +#define IP_PORT_SEPARATOR ":" +#define IP_DATA_PORT_SEPARATOR "@" #define CLUSTER_ADDRESS_SEPARATOR "," @@ -515,6 +516,8 @@ static cluster_node *node_get_with_slots( node->port = (int)port_elem->integer; node->role = role; + debug("addr %s, host %s, port %d, role %d", node->addr, node->host, node->port, node->role); + return node; error: @@ -533,7 +536,7 @@ static cluster_node *node_get_with_nodes( redisClusterContext *cc, sds *node_infos, int info_count, uint8_t role) { - sds *ip_port = NULL; + sds *ip_port = NULL, *data_port = NULL; int count_ip_port = 0; cluster_node *node; @@ -569,20 +572,35 @@ static cluster_node *node_get_with_nodes( node->name = node_infos[0]; node->addr = node_infos[1]; + /* addr format: 127.0.0.1:7001@17001 */ ip_port = sdssplitlen(node_infos[1], sdslen(node_infos[1]), IP_PORT_SEPARATOR, strlen(IP_PORT_SEPARATOR), &count_ip_port); if(ip_port == NULL || count_ip_port != 2) { __redisClusterSetError(cc,REDIS_ERR_OTHER, - "split ip port error"); + "split ip:port error"); goto error; } node->host = ip_port[0]; - node->port = hi_atoi(ip_port[1], sdslen(ip_port[1])); + + data_port = sdssplitlen(ip_port[1], sdslen(ip_port[1]), + IP_DATA_PORT_SEPARATOR, strlen(IP_DATA_PORT_SEPARATOR), &count_ip_port); + if(data_port == NULL || count_ip_port != 2) + { + __redisClusterSetError(cc,REDIS_ERR_OTHER, + "split port@data-port error"); + goto error; + } + + node->port = hi_atoi(data_port[0], sdslen(data_port[0])); + node->data_port = hi_atoi(data_port[1], sdslen(data_port[1])); node->role = role; sdsfree(ip_port[1]); free(ip_port); + sdsfree(data_port[0]); + sdsfree(data_port[1]); + free(data_port); node_infos[0] = NULL; node_infos[1] = NULL; @@ -853,7 +871,7 @@ parse_cluster_slots(redisClusterContext *cc, }else{ elem_nodes = elem_slots->element[idx]; if(elem_nodes->type != REDIS_REPLY_ARRAY || - elem_nodes->elements != 3){ + elem_nodes->elements < 2){ __redisClusterSetError(cc, REDIS_ERR_OTHER, "Command(cluster slots) reply error: " "nodes sub_reply is not an correct array."); @@ -1958,6 +1976,8 @@ static void print_cluster_node_list(redisClusterContext *cc) listNode *ln; cluster_node *master, *slave; hilist *slaves; + long size; + redisReply *reply = NULL; if(cc == NULL) { @@ -1966,13 +1986,29 @@ static void print_cluster_node_list(redisClusterContext *cc) di = dictGetIterator(cc->nodes); - printf("name\taddress\trole\tslaves\n"); + printf("name\taddress\trole\tslaves\tdbsize\n"); while((de = dictNext(di)) != NULL) { master = dictGetEntryVal(de); - printf("%s\t%s\t%d\t%s\n",master->name, master->addr, - master->role, master->slaves?"hava":"null"); + size = -1; + + /* establish connection if not already existing */ + if (master->con == NULL) + master->con = ctx_get_by_node(cc, master); + + reply = redisCommand(master->con, "DBSIZE"); + if (reply == NULL || master->con->err) { + debug("REDIS: error: %s", master->con->errstr); + } + else if (reply->type == REDIS_REPLY_INTEGER) { + size = (long) reply->integer; + } + redisFree(master->con); + master->con = NULL; + + printf("%s\t%s\t%d\t%s\t%ld\n",master->name, master->addr, + master->role, master->slaves?"hava":"null", size); slaves = master->slaves; if(slaves == NULL) @@ -2001,7 +2037,7 @@ int test_cluster_update_route(redisClusterContext *cc) ret = cluster_update_route(cc); - //print_cluster_node_list(cc); + print_cluster_node_list(cc); return ret; } @@ -2085,6 +2121,12 @@ void redisClusterFree(redisClusterContext *cc) { * When no set of reply functions is given, the default set will be used. */ static int _redisClusterConnect2(redisClusterContext *cc) { + int ret; + +#ifdef DEBUG + debug("before connect"); + print_cluster_node_list(cc); +#endif if (cc->nodes == NULL || dictSize(cc->nodes) == 0) { @@ -2092,7 +2134,14 @@ static int _redisClusterConnect2(redisClusterContext *cc) return REDIS_ERR; } - return cluster_update_route(cc); + ret = cluster_update_route(cc); + +#ifdef DEBUG + debug("after connect"); + print_cluster_node_list(cc); +#endif + + return ret; } /* Connect to a Redis cluster. On error the field error in the returned @@ -2191,6 +2240,8 @@ int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr) int port; sds addr_sds = NULL; + debug("adding %s", addr); + if(cc == NULL) { return REDIS_ERR; @@ -2260,6 +2311,9 @@ int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr) node->port = port; dictAdd(cc->nodes, sdsnewlen(node->addr, sdslen(node->addr)), node); + + debug("dictAdd addr %s, host %s, port %d", node->addr, node->host, node->port); + } return REDIS_OK; @@ -2491,6 +2545,7 @@ redisContext *ctx_get_by_node(redisClusterContext *cc, cluster_node *node) if(node->host == NULL || node->port <= 0) { + debug("host %s, port %d", node->host, node->port); return NULL; } @@ -2905,8 +2960,8 @@ static int __redisClusterGetReply(redisClusterContext *cc, int slot_num, void ** static cluster_node *node_get_by_ask_error_reply( redisClusterContext *cc, redisReply *reply) { - sds *part = NULL, *ip_port = NULL; - int part_len = 0, ip_port_len; + sds *part = NULL, *ip_port = NULL, *data_port = NULL; + int part_len = 0, ip_port_len, data_port_len; dictEntry *de; cluster_node *node = NULL; @@ -2946,7 +3001,18 @@ static cluster_node *node_get_by_ask_error_reply( cluster_node_init(node); node->addr = part[1]; node->host = ip_port[0]; - node->port = hi_atoi(ip_port[1], sdslen(ip_port[1])); + + data_port = sdssplitlen(ip_port[1], sdslen(ip_port[1]), + IP_DATA_PORT_SEPARATOR, strlen(IP_DATA_PORT_SEPARATOR), &data_port_len); + if(data_port == NULL || data_port_len != 2) + { + __redisClusterSetError(cc,REDIS_ERR_OTHER, + "split port@data-port error"); + goto done; + } + + node->port = hi_atoi(data_port[0], sdslen(data_port[0])); + node->data_port = hi_atoi(data_port[1], sdslen(data_port[1])); node->role = REDIS_ROLE_MASTER; dictAdd(cc->nodes, sdsnewlen(node->addr, sdslen(node->addr)), node); @@ -2991,6 +3057,11 @@ static cluster_node *node_get_by_ask_error_reply( sdsfreesplitres(ip_port, ip_port_len); ip_port = NULL; } + if(data_port != NULL) + { + sdsfreesplitres(data_port, data_port_len); + data_port = NULL; + } return node; } @@ -3004,8 +3075,11 @@ static void *redis_cluster_command_execute(redisClusterContext *cc, redisContext *c = NULL; int error_type; + retry: + debug("node_get_by_table, slot_num %d", command->slot_num); + node = node_get_by_table(cc, (uint32_t)command->slot_num); if(node == NULL) { @@ -3436,10 +3510,12 @@ static void *command_post_fragment(redisClusterContext *cc, reply = sub_command->reply; if(reply == NULL) { + listReleaseIterator(list_iter); return NULL; } else if(reply->type == REDIS_REPLY_ERROR) { + listReleaseIterator(list_iter); return reply; } @@ -3447,12 +3523,14 @@ static void *command_post_fragment(redisClusterContext *cc, if(reply->type != REDIS_REPLY_ARRAY) { __redisClusterSetError(cc,REDIS_ERR_OTHER,"reply type is error(here only can be array)"); + listReleaseIterator(list_iter); return NULL; } }else if(command->type == CMD_REQ_REDIS_DEL){ if(reply->type != REDIS_REPLY_INTEGER) { __redisClusterSetError(cc,REDIS_ERR_OTHER,"reply type is error(here only can be integer)"); + listReleaseIterator(list_iter); return NULL; } @@ -3462,6 +3540,7 @@ static void *command_post_fragment(redisClusterContext *cc, reply->len != 2 || strcmp(reply->str, REDIS_STATUS_OK) != 0) { __redisClusterSetError(cc,REDIS_ERR_OTHER,"reply type is error(here only can be status and ok)"); + listReleaseIterator(list_iter); return NULL; } }else { @@ -3469,6 +3548,11 @@ static void *command_post_fragment(redisClusterContext *cc, } } + if(list_iter != NULL) + { + listReleaseIterator(list_iter); + } + reply = hi_calloc(1,sizeof(*reply)); if (reply == NULL) @@ -3577,8 +3661,10 @@ static int command_format_by_slot(redisClusterContext *cc, } key_count = hiarray_n(command->keys); + debug("key_count %d", key_count); - if(key_count <= 0) + //if(key_count <= 0) + if(key_count < 0) { __redisClusterSetError(cc, REDIS_ERR_OTHER, "No keys in command(must have keys for redis cluster mode)"); goto done; @@ -3589,6 +3675,8 @@ static int command_format_by_slot(redisClusterContext *cc, slot_num = keyHashSlot(kp->start, kp->end - kp->start); command->slot_num = slot_num; + debug("command->slot_num %d", slot_num); + goto done; } @@ -3618,6 +3706,8 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) listNode *list_node; listIter *list_iter = NULL; + debug("entered"); + if(cc == NULL) { return NULL; @@ -3650,6 +3740,8 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) slot_num = command_format_by_slot(cc, command, commands); + debug("slot_num %d", slot_num); + if(slot_num < 0) { goto error; @@ -3663,6 +3755,7 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) //all keys belong to one slot if(listLength(commands) == 0) { + debug("listLength == 0"); reply = redis_cluster_command_execute(cc, command); goto done; } @@ -3673,20 +3766,29 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) while((list_node = listNext(list_iter)) != NULL) { sub_command = list_node->value; - + + debug("while sub_command %p", (void*) sub_command); + reply = redis_cluster_command_execute(cc, sub_command); if(reply == NULL) { + listReleaseIterator(list_iter); goto error; } else if(reply->type == REDIS_REPLY_ERROR) { + listReleaseIterator(list_iter); goto done; } sub_command->reply = reply; } + if(list_iter != NULL) + { + listReleaseIterator(list_iter); + } + reply = command_post_fragment(cc, command, commands); done: @@ -3699,11 +3801,6 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) listRelease(commands); } - if(list_iter != NULL) - { - listReleaseIterator(list_iter); - } - cc->retry_count = 0; return reply; @@ -3721,11 +3818,6 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) listRelease(commands); } - if(list_iter != NULL) - { - listReleaseIterator(list_iter); - } - cc->retry_count = 0; return NULL; @@ -4135,6 +4227,7 @@ int redisClusterGetReply(redisClusterContext *cc, void **reply) { { __redisClusterSetError(cc,REDIS_ERR_OTHER, "sub_command is null"); + listReleaseIterator(list_iter); goto error; } @@ -4143,17 +4236,24 @@ int redisClusterGetReply(redisClusterContext *cc, void **reply) { { __redisClusterSetError(cc,REDIS_ERR_OTHER, "sub_command slot_num is less then zero"); + listReleaseIterator(list_iter); goto error; } if(__redisClusterGetReply(cc, slot_num, &sub_reply) != REDIS_OK) { + listReleaseIterator(list_iter); goto error; } sub_command->reply = sub_reply; } + if(list_iter != NULL) + { + listReleaseIterator(list_iter); + } + *reply = command_post_fragment(cc, command, commands); if(*reply == NULL) { @@ -4962,3 +5062,49 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc) hi_free(acc); } +long long redisClusterDbSize(redisClusterContext *cc) +{ + long long size = 0; + dictIterator *di = NULL; + dictEntry *de; + cluster_node *master; + redisReply *reply = NULL; + + if (cc == NULL) + return size; + + di = dictGetIterator(cc->nodes); + + while ((de = dictNext(di)) != NULL) { + master = dictGetEntryVal(de); + + /* establish connection if not already existing */ + if (master->con == NULL) + master->con = ctx_get_by_node(cc, master); + + /* if we can't connect then continue */ + if (master->con == NULL) + continue; + + reply = redisCommand(master->con, "DBSIZE"); + if (reply == NULL || master->con->err) { + redisFree(master->con); + master->con = NULL; + __redisClusterSetError(cc, REDIS_ERR_OTHER, "failed DBSIZE request"); + return REDIS_ERR; + } + else if (reply->type == REDIS_REPLY_INTEGER) { + size += reply->integer; + } + else { + redisFree(master->con); + master->con = NULL; + __redisClusterSetError(cc, REDIS_ERR_OTHER, "unexpected DBSIZE response"); + return REDIS_ERR; + } + redisFree(master->con); + master->con = NULL; + } + + return size; +} diff --git a/hircluster.h b/hircluster.h index 95585c9..61944d1 100644 --- a/hircluster.h +++ b/hircluster.h @@ -30,6 +30,17 @@ * is 'cluster nodes' command.*/ #define HIRCLUSTER_FLAG_ROUTE_USE_SLOTS 0x4000 + +//#define DEBUG 1 + +#ifdef DEBUG +#define debug(fmt, ...) \ + printf("XXX %s: " #fmt "\n", __func__, ## __VA_ARGS__); +#else +#define debug(...) +#endif + + struct dict; struct hilist; @@ -39,6 +50,7 @@ typedef struct cluster_node sds addr; sds host; int port; + int data_port; uint8_t role; uint8_t myself; /* myself ? */ redisContext *con; @@ -187,6 +199,10 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc); redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc, cluster_node *node); +/*#####################################################*/ + +long long redisClusterDbSize(redisClusterContext *cc); + #ifdef __cplusplus } #endif From 8ebd3825b6ea16d15871b6bc8d329d38906a8028 Mon Sep 17 00:00:00 2001 From: Stipe Tolj Date: Tue, 16 Oct 2018 18:18:16 +0200 Subject: [PATCH 4/5] support for redis 4 cluster mode, some APU additions --- Makefile | 5 +- command.c | 16 ++++ command.h | 2 + examples/example-sync.c | 61 +++++++++++++ hircluster.c | 196 +++++++++++++++++++++++++++++++++++----- hircluster.h | 16 ++++ 6 files changed, 270 insertions(+), 26 deletions(-) create mode 100755 examples/example-sync.c diff --git a/Makefile b/Makefile index a321bd1..bf36f0b 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ # This file is released under the BSD license, see the COPYING file OBJ=net.o hiredis.o sds.o async.o read.o hiarray.o hiutil.o command.o crc16.o adlist.o hircluster.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-sync TESTS=hiredis-test LIBNAME=libhiredis_vip PKGCONFNAME=hiredis_vip.pc @@ -102,6 +102,9 @@ hiredis-example-libev: examples/example-libev.c adapters/libev.h $(STLIBNAME) hiredis-example-glib: examples/example-glib.c adapters/glib.h $(STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) $(REAL_LDFLAGS) $(shell pkg-config --cflags --libs glib-2.0) -I. $< $(STLIBNAME) +hiredis-example-sync: examples/example-sync.c + $(CC) -o examples/$@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -I. $< $(STLIBNAME) + ifndef AE_DIR hiredis-example-ae: @echo "Please specify AE_DIR (e.g. /src)" diff --git a/command.c b/command.c index 115bc25..23d3755 100644 --- a/command.c +++ b/command.c @@ -20,6 +20,8 @@ redis_argz(struct cmd *r) switch (r->type) { case CMD_REQ_REDIS_PING: case CMD_REQ_REDIS_QUIT: + case CMD_REQ_REDIS_INFO: + case CMD_REQ_REDIS_DBSIZE: return 1; default: @@ -612,6 +614,12 @@ redis_parse_cmd(struct cmd *r) break; } + if (str4icmp(m, 'i', 'n', 'f', 'o')) { + r->type = CMD_REQ_REDIS_INFO; + r->noforward = 1; + break; + } + if (str4icmp(m, 'a', 'u', 't', 'h')) { r->type = CMD_REQ_REDIS_AUTH; r->noforward = 1; @@ -814,6 +822,12 @@ redis_parse_cmd(struct cmd *r) break; } + if (str6icmp(m, 'd', 'b', 's', 'i', 'z', 'e')) { + r->type = CMD_REQ_REDIS_DBSIZE; + r->noforward = 1; + break; + } + break; case 7: @@ -941,6 +955,8 @@ redis_parse_cmd(struct cmd *r) break; } + break; + case 11: if (str11icmp(m, 'i', 'n', 'c', 'r', 'b', 'y', 'f', 'l', 'o', 'a', 't')) { r->type = CMD_REQ_REDIS_INCRBYFLOAT; diff --git a/command.h b/command.h index b7c388a..baf5299 100644 --- a/command.h +++ b/command.h @@ -119,6 +119,8 @@ typedef enum cmd_parse_result { ACTION( REQ_REDIS_EVALSHA ) \ ACTION( REQ_REDIS_PING ) /* redis requests - ping/quit */ \ ACTION( REQ_REDIS_QUIT) \ + ACTION( REQ_REDIS_INFO ) \ + ACTION( REQ_REDIS_DBSIZE ) \ ACTION( REQ_REDIS_AUTH) \ ACTION( RSP_REDIS_STATUS ) /* redis response */ \ ACTION( RSP_REDIS_ERROR ) \ diff --git a/examples/example-sync.c b/examples/example-sync.c new file mode 100755 index 0000000..571531c --- /dev/null +++ b/examples/example-sync.c @@ -0,0 +1,61 @@ +/* + * example-sync.c + * + * Created on: 12.10.2018 + * Author: tolj + */ + +#include +#include +int main() +{ + char *key="key-a"; + char *field="field-1"; + char *key1="key1"; + char *value1="value-1"; + char *key2="key1"; + char *value2="value-1"; + redisClusterContext *cc; + redisReply *reply; + + cc = redisClusterContextInit(); + redisClusterSetOptionAddNodes(cc, "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"); + redisClusterSetOptionParseSlaves(cc); + redisClusterConnect2(cc); + if(cc == NULL || cc->err) + { + printf("connect error : %s\n", cc == NULL ? "NULL" : cc->errstr); + return -1; + } + + test_cluster_update_route(cc); + + printf("redisClusterDbSize %ld\n", (long)redisClusterDbSize(cc)); + + reply = redisClusterCommand(cc, "hmget %s %s", key, field); + if(reply == NULL) + { + printf("reply is null[%s]\n", cc->errstr); + redisClusterFree(cc); + return -1; + } + + printf("reply->type:%d\n", reply->type); + + freeReplyObject(reply); + + reply = redisClusterCommand(cc, "mset %s %s %s %s", key1, value1, key2, value2); + if(reply == NULL) + { + printf("reply is null[%s]\n", cc->errstr); + redisClusterFree(cc); + return -1; + } + + printf("reply->str:%s\n", reply->str); + + freeReplyObject(reply); + redisClusterFree(cc); + return 0; +} + diff --git a/hircluster.c b/hircluster.c index b051936..20903eb 100644 --- a/hircluster.c +++ b/hircluster.c @@ -21,7 +21,8 @@ #define REDIS_PROTOCOL_ASKING "*1\r\n$6\r\nASKING\r\n" -#define IP_PORT_SEPARATOR ":" +#define IP_PORT_SEPARATOR ":" +#define IP_DATA_PORT_SEPARATOR "@" #define CLUSTER_ADDRESS_SEPARATOR "," @@ -515,6 +516,8 @@ static cluster_node *node_get_with_slots( node->port = (int)port_elem->integer; node->role = role; + debug("addr %s, host %s, port %d, role %d", node->addr, node->host, node->port, node->role); + return node; error: @@ -533,7 +536,7 @@ static cluster_node *node_get_with_nodes( redisClusterContext *cc, sds *node_infos, int info_count, uint8_t role) { - sds *ip_port = NULL; + sds *ip_port = NULL, *data_port = NULL; int count_ip_port = 0; cluster_node *node; @@ -569,20 +572,35 @@ static cluster_node *node_get_with_nodes( node->name = node_infos[0]; node->addr = node_infos[1]; + /* addr format: 127.0.0.1:7001@17001 */ ip_port = sdssplitlen(node_infos[1], sdslen(node_infos[1]), IP_PORT_SEPARATOR, strlen(IP_PORT_SEPARATOR), &count_ip_port); if(ip_port == NULL || count_ip_port != 2) { __redisClusterSetError(cc,REDIS_ERR_OTHER, - "split ip port error"); + "split ip:port error"); goto error; } node->host = ip_port[0]; - node->port = hi_atoi(ip_port[1], sdslen(ip_port[1])); + + data_port = sdssplitlen(ip_port[1], sdslen(ip_port[1]), + IP_DATA_PORT_SEPARATOR, strlen(IP_DATA_PORT_SEPARATOR), &count_ip_port); + if(data_port == NULL || count_ip_port != 2) + { + __redisClusterSetError(cc,REDIS_ERR_OTHER, + "split port@data-port error"); + goto error; + } + + node->port = hi_atoi(data_port[0], sdslen(data_port[0])); + node->data_port = hi_atoi(data_port[1], sdslen(data_port[1])); node->role = role; sdsfree(ip_port[1]); free(ip_port); + sdsfree(data_port[0]); + sdsfree(data_port[1]); + free(data_port); node_infos[0] = NULL; node_infos[1] = NULL; @@ -853,7 +871,7 @@ parse_cluster_slots(redisClusterContext *cc, }else{ elem_nodes = elem_slots->element[idx]; if(elem_nodes->type != REDIS_REPLY_ARRAY || - elem_nodes->elements != 3){ + elem_nodes->elements < 2){ __redisClusterSetError(cc, REDIS_ERR_OTHER, "Command(cluster slots) reply error: " "nodes sub_reply is not an correct array."); @@ -1958,6 +1976,8 @@ static void print_cluster_node_list(redisClusterContext *cc) listNode *ln; cluster_node *master, *slave; hilist *slaves; + long size; + redisReply *reply = NULL; if(cc == NULL) { @@ -1966,13 +1986,29 @@ static void print_cluster_node_list(redisClusterContext *cc) di = dictGetIterator(cc->nodes); - printf("name\taddress\trole\tslaves\n"); + printf("name\taddress\trole\tslaves\tdbsize\n"); while((de = dictNext(di)) != NULL) { master = dictGetEntryVal(de); - printf("%s\t%s\t%d\t%s\n",master->name, master->addr, - master->role, master->slaves?"hava":"null"); + size = -1; + + /* establish connection if not already existing */ + if (master->con == NULL) + master->con = ctx_get_by_node(cc, master); + + reply = redisCommand(master->con, "DBSIZE"); + if (reply == NULL || master->con->err) { + debug("REDIS: error: %s", master->con->errstr); + } + else if (reply->type == REDIS_REPLY_INTEGER) { + size = (long) reply->integer; + } + redisFree(master->con); + master->con = NULL; + + printf("%s\t%s\t%d\t%s\t%ld\n",master->name, master->addr, + master->role, master->slaves?"hava":"null", size); slaves = master->slaves; if(slaves == NULL) @@ -2001,7 +2037,7 @@ int test_cluster_update_route(redisClusterContext *cc) ret = cluster_update_route(cc); - //print_cluster_node_list(cc); + print_cluster_node_list(cc); return ret; } @@ -2085,6 +2121,12 @@ void redisClusterFree(redisClusterContext *cc) { * When no set of reply functions is given, the default set will be used. */ static int _redisClusterConnect2(redisClusterContext *cc) { + int ret; + +#ifdef DEBUG + debug("before connect"); + print_cluster_node_list(cc); +#endif if (cc->nodes == NULL || dictSize(cc->nodes) == 0) { @@ -2092,7 +2134,14 @@ static int _redisClusterConnect2(redisClusterContext *cc) return REDIS_ERR; } - return cluster_update_route(cc); + ret = cluster_update_route(cc); + +#ifdef DEBUG + debug("after connect"); + print_cluster_node_list(cc); +#endif + + return ret; } /* Connect to a Redis cluster. On error the field error in the returned @@ -2191,6 +2240,8 @@ int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr) int port; sds addr_sds = NULL; + debug("adding %s", addr); + if(cc == NULL) { return REDIS_ERR; @@ -2260,6 +2311,9 @@ int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr) node->port = port; dictAdd(cc->nodes, sdsnewlen(node->addr, sdslen(node->addr)), node); + + debug("dictAdd addr %s, host %s, port %d", node->addr, node->host, node->port); + } return REDIS_OK; @@ -2491,6 +2545,7 @@ redisContext *ctx_get_by_node(redisClusterContext *cc, cluster_node *node) if(node->host == NULL || node->port <= 0) { + debug("host %s, port %d", node->host, node->port); return NULL; } @@ -2905,8 +2960,8 @@ static int __redisClusterGetReply(redisClusterContext *cc, int slot_num, void ** static cluster_node *node_get_by_ask_error_reply( redisClusterContext *cc, redisReply *reply) { - sds *part = NULL, *ip_port = NULL; - int part_len = 0, ip_port_len; + sds *part = NULL, *ip_port = NULL, *data_port = NULL; + int part_len = 0, ip_port_len, data_port_len; dictEntry *de; cluster_node *node = NULL; @@ -2946,7 +3001,18 @@ static cluster_node *node_get_by_ask_error_reply( cluster_node_init(node); node->addr = part[1]; node->host = ip_port[0]; - node->port = hi_atoi(ip_port[1], sdslen(ip_port[1])); + + data_port = sdssplitlen(ip_port[1], sdslen(ip_port[1]), + IP_DATA_PORT_SEPARATOR, strlen(IP_DATA_PORT_SEPARATOR), &data_port_len); + if(data_port == NULL || data_port_len != 2) + { + __redisClusterSetError(cc,REDIS_ERR_OTHER, + "split port@data-port error"); + goto done; + } + + node->port = hi_atoi(data_port[0], sdslen(data_port[0])); + node->data_port = hi_atoi(data_port[1], sdslen(data_port[1])); node->role = REDIS_ROLE_MASTER; dictAdd(cc->nodes, sdsnewlen(node->addr, sdslen(node->addr)), node); @@ -2991,6 +3057,11 @@ static cluster_node *node_get_by_ask_error_reply( sdsfreesplitres(ip_port, ip_port_len); ip_port = NULL; } + if(data_port != NULL) + { + sdsfreesplitres(data_port, data_port_len); + data_port = NULL; + } return node; } @@ -3004,8 +3075,11 @@ static void *redis_cluster_command_execute(redisClusterContext *cc, redisContext *c = NULL; int error_type; + retry: + debug("node_get_by_table, slot_num %d", command->slot_num); + node = node_get_by_table(cc, (uint32_t)command->slot_num); if(node == NULL) { @@ -3436,10 +3510,12 @@ static void *command_post_fragment(redisClusterContext *cc, reply = sub_command->reply; if(reply == NULL) { + listReleaseIterator(list_iter); return NULL; } else if(reply->type == REDIS_REPLY_ERROR) { + listReleaseIterator(list_iter); return reply; } @@ -3447,12 +3523,14 @@ static void *command_post_fragment(redisClusterContext *cc, if(reply->type != REDIS_REPLY_ARRAY) { __redisClusterSetError(cc,REDIS_ERR_OTHER,"reply type is error(here only can be array)"); + listReleaseIterator(list_iter); return NULL; } }else if(command->type == CMD_REQ_REDIS_DEL){ if(reply->type != REDIS_REPLY_INTEGER) { __redisClusterSetError(cc,REDIS_ERR_OTHER,"reply type is error(here only can be integer)"); + listReleaseIterator(list_iter); return NULL; } @@ -3462,6 +3540,7 @@ static void *command_post_fragment(redisClusterContext *cc, reply->len != 2 || strcmp(reply->str, REDIS_STATUS_OK) != 0) { __redisClusterSetError(cc,REDIS_ERR_OTHER,"reply type is error(here only can be status and ok)"); + listReleaseIterator(list_iter); return NULL; } }else { @@ -3469,6 +3548,11 @@ static void *command_post_fragment(redisClusterContext *cc, } } + if(list_iter != NULL) + { + listReleaseIterator(list_iter); + } + reply = hi_calloc(1,sizeof(*reply)); if (reply == NULL) @@ -3577,8 +3661,10 @@ static int command_format_by_slot(redisClusterContext *cc, } key_count = hiarray_n(command->keys); + debug("key_count %d", key_count); - if(key_count <= 0) + //if(key_count <= 0) + if(key_count < 0) { __redisClusterSetError(cc, REDIS_ERR_OTHER, "No keys in command(must have keys for redis cluster mode)"); goto done; @@ -3589,6 +3675,8 @@ static int command_format_by_slot(redisClusterContext *cc, slot_num = keyHashSlot(kp->start, kp->end - kp->start); command->slot_num = slot_num; + debug("command->slot_num %d", slot_num); + goto done; } @@ -3618,6 +3706,8 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) listNode *list_node; listIter *list_iter = NULL; + debug("entered"); + if(cc == NULL) { return NULL; @@ -3650,6 +3740,8 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) slot_num = command_format_by_slot(cc, command, commands); + debug("slot_num %d", slot_num); + if(slot_num < 0) { goto error; @@ -3663,6 +3755,7 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) //all keys belong to one slot if(listLength(commands) == 0) { + debug("listLength == 0"); reply = redis_cluster_command_execute(cc, command); goto done; } @@ -3673,20 +3766,29 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) while((list_node = listNext(list_iter)) != NULL) { sub_command = list_node->value; - + + debug("while sub_command %p", (void*) sub_command); + reply = redis_cluster_command_execute(cc, sub_command); if(reply == NULL) { + listReleaseIterator(list_iter); goto error; } else if(reply->type == REDIS_REPLY_ERROR) { + listReleaseIterator(list_iter); goto done; } sub_command->reply = reply; } + if(list_iter != NULL) + { + listReleaseIterator(list_iter); + } + reply = command_post_fragment(cc, command, commands); done: @@ -3699,11 +3801,6 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) listRelease(commands); } - if(list_iter != NULL) - { - listReleaseIterator(list_iter); - } - cc->retry_count = 0; return reply; @@ -3721,11 +3818,6 @@ void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len) listRelease(commands); } - if(list_iter != NULL) - { - listReleaseIterator(list_iter); - } - cc->retry_count = 0; return NULL; @@ -4135,6 +4227,7 @@ int redisClusterGetReply(redisClusterContext *cc, void **reply) { { __redisClusterSetError(cc,REDIS_ERR_OTHER, "sub_command is null"); + listReleaseIterator(list_iter); goto error; } @@ -4143,17 +4236,24 @@ int redisClusterGetReply(redisClusterContext *cc, void **reply) { { __redisClusterSetError(cc,REDIS_ERR_OTHER, "sub_command slot_num is less then zero"); + listReleaseIterator(list_iter); goto error; } if(__redisClusterGetReply(cc, slot_num, &sub_reply) != REDIS_OK) { + listReleaseIterator(list_iter); goto error; } sub_command->reply = sub_reply; } + if(list_iter != NULL) + { + listReleaseIterator(list_iter); + } + *reply = command_post_fragment(cc, command, commands); if(*reply == NULL) { @@ -4962,3 +5062,49 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc) hi_free(acc); } +long long redisClusterDbSize(redisClusterContext *cc) +{ + long long size = 0; + dictIterator *di = NULL; + dictEntry *de; + cluster_node *master; + redisReply *reply = NULL; + + if (cc == NULL) + return size; + + di = dictGetIterator(cc->nodes); + + while ((de = dictNext(di)) != NULL) { + master = dictGetEntryVal(de); + + /* establish connection if not already existing */ + if (master->con == NULL) + master->con = ctx_get_by_node(cc, master); + + /* if we can't connect then continue */ + if (master->con == NULL) + continue; + + reply = redisCommand(master->con, "DBSIZE"); + if (reply == NULL || master->con->err) { + redisFree(master->con); + master->con = NULL; + __redisClusterSetError(cc, REDIS_ERR_OTHER, "failed DBSIZE request"); + return REDIS_ERR; + } + else if (reply->type == REDIS_REPLY_INTEGER) { + size += reply->integer; + } + else { + redisFree(master->con); + master->con = NULL; + __redisClusterSetError(cc, REDIS_ERR_OTHER, "unexpected DBSIZE response"); + return REDIS_ERR; + } + redisFree(master->con); + master->con = NULL; + } + + return size; +} diff --git a/hircluster.h b/hircluster.h index 95585c9..61944d1 100644 --- a/hircluster.h +++ b/hircluster.h @@ -30,6 +30,17 @@ * is 'cluster nodes' command.*/ #define HIRCLUSTER_FLAG_ROUTE_USE_SLOTS 0x4000 + +//#define DEBUG 1 + +#ifdef DEBUG +#define debug(fmt, ...) \ + printf("XXX %s: " #fmt "\n", __func__, ## __VA_ARGS__); +#else +#define debug(...) +#endif + + struct dict; struct hilist; @@ -39,6 +50,7 @@ typedef struct cluster_node sds addr; sds host; int port; + int data_port; uint8_t role; uint8_t myself; /* myself ? */ redisContext *con; @@ -187,6 +199,10 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc); redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc, cluster_node *node); +/*#####################################################*/ + +long long redisClusterDbSize(redisClusterContext *cc); + #ifdef __cplusplus } #endif From 32b401ac904c72094e3859e4ddf3e131d40c239c Mon Sep 17 00:00:00 2001 From: Stipe Tolj Date: Tue, 16 Oct 2018 22:13:29 +0200 Subject: [PATCH 5/5] re-add example --- examples/example-sync.c | 61 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 examples/example-sync.c diff --git a/examples/example-sync.c b/examples/example-sync.c new file mode 100644 index 0000000..571531c --- /dev/null +++ b/examples/example-sync.c @@ -0,0 +1,61 @@ +/* + * example-sync.c + * + * Created on: 12.10.2018 + * Author: tolj + */ + +#include +#include +int main() +{ + char *key="key-a"; + char *field="field-1"; + char *key1="key1"; + char *value1="value-1"; + char *key2="key1"; + char *value2="value-1"; + redisClusterContext *cc; + redisReply *reply; + + cc = redisClusterContextInit(); + redisClusterSetOptionAddNodes(cc, "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"); + redisClusterSetOptionParseSlaves(cc); + redisClusterConnect2(cc); + if(cc == NULL || cc->err) + { + printf("connect error : %s\n", cc == NULL ? "NULL" : cc->errstr); + return -1; + } + + test_cluster_update_route(cc); + + printf("redisClusterDbSize %ld\n", (long)redisClusterDbSize(cc)); + + reply = redisClusterCommand(cc, "hmget %s %s", key, field); + if(reply == NULL) + { + printf("reply is null[%s]\n", cc->errstr); + redisClusterFree(cc); + return -1; + } + + printf("reply->type:%d\n", reply->type); + + freeReplyObject(reply); + + reply = redisClusterCommand(cc, "mset %s %s %s %s", key1, value1, key2, value2); + if(reply == NULL) + { + printf("reply is null[%s]\n", cc->errstr); + redisClusterFree(cc); + return -1; + } + + printf("reply->str:%s\n", reply->str); + + freeReplyObject(reply); + redisClusterFree(cc); + return 0; +} +