From c9927901478a809cf81ef3cf8ce4b7e669b36045 Mon Sep 17 00:00:00 2001
From: Eq Zhan <35362437+pingxingshikong@users.noreply.github.com>
Date: Wed, 12 Jul 2023 16:38:53 +0800
Subject: [PATCH] fix the issue of Redis using ACL connection failure
---
pom.xml | 1 -
.../main/java/syncer/jedis/BinaryClient.java | 11 ++++++-
.../main/java/syncer/jedis/BinaryJedis.java | 8 +++++
.../syncer/jedis/commands/BasicCommands.java | 1 +
.../client/RedisClientFactory.java | 12 ++++----
.../impl/JedisMultiExecPipeLineClient.java | 28 ++++++++++--------
.../client/impl/JedisPipeLineClient.java | 27 ++++++++++-------
.../impl/JedisPipeLineMultiRetryRunner.java | 2 +-
.../client/impl/JedisPipeLineRetryRunner.java | 2 +-
...JedisPipelineSubmitCommandRetryRunner.java | 2 +-
...PipelineSubmitMultiCommandRetryRunner.java | 2 +-
.../sentinel/FastRedisSentinelClient.java | 4 +--
.../client/sentinel/RedisSentinel.java | 5 ++--
.../syncer/transmission/model/TaskModel.java | 26 ++++++++++++-----
.../queue/SendCommandWithOutQueue.java | 12 ++++++--
.../task/RedisDataSyncTransmissionTask.java | 2 +-
...disSyncFilterByAuxKeyTransmissionTask.java | 2 +-
.../transmission/util/CompensatorUtils.java | 5 +++-
.../util/redis/KeyCountUtils.java | 5 +++-
.../util/redis/RedisReplIdCheck.java | 2 +-
.../util/redis/RedisUrlCheck.java | 4 ++-
.../util/redis/RedisVersionUtil.java | 4 ++-
.../webapp/util/DtoToTaskModelUtils.java | 29 +++++++++++++++++--
23 files changed, 139 insertions(+), 57 deletions(-)
diff --git a/pom.xml b/pom.xml
index 6a13214a..156ad159 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,6 @@
org.xerial
sqlite-jdbc
- 3.21.0.1
3.39.3.0
diff --git a/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java b/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java
index fe654b61..5c650481 100644
--- a/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java
+++ b/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java
@@ -593,7 +593,16 @@ public void brpop(final int timeout, final byte[]... keys) {
public void auth(final String password) {
setPassword(password);
- sendCommand(AUTH, password);
+ if(password.contains(" ")){
+ auth(password.split(" ")[0],password.split(" ")[1]);
+ }else{
+ sendCommand(AUTH, password);
+ }
+ }
+
+ public void auth(final String user,final String password) {
+ setPassword(user+" "+password);
+ sendCommand(AUTH, user,password);
}
public void subscribe(final byte[]... channels) {
diff --git a/syncer-jedis/src/main/java/syncer/jedis/BinaryJedis.java b/syncer-jedis/src/main/java/syncer/jedis/BinaryJedis.java
index 7fbabd3a..de82ee8b 100644
--- a/syncer-jedis/src/main/java/syncer/jedis/BinaryJedis.java
+++ b/syncer-jedis/src/main/java/syncer/jedis/BinaryJedis.java
@@ -2239,6 +2239,14 @@ public String auth(final String password) {
return client.getStatusCodeReply();
}
+ @Override
+ public String auth(final String user,final String password) {
+ checkIsInMultiOrPipeline();
+ client.auth(user,password);
+ return client.getStatusCodeReply();
+ }
+
+
public Pipeline pipelined() {
pipeline = new Pipeline();
pipeline.setClient(client);
diff --git a/syncer-jedis/src/main/java/syncer/jedis/commands/BasicCommands.java b/syncer-jedis/src/main/java/syncer/jedis/commands/BasicCommands.java
index 6c1fd498..4775967b 100644
--- a/syncer-jedis/src/main/java/syncer/jedis/commands/BasicCommands.java
+++ b/syncer-jedis/src/main/java/syncer/jedis/commands/BasicCommands.java
@@ -60,6 +60,7 @@ public interface BasicCommands {
*/
String auth(String password);
+ String auth(String user,String password);
/**
* The SAVE commands performs a synchronous save of the dataset producing a point in time snapshot of all the data inside the Redis instance, in the form of an RDB file.
You almost never want to call SAVE in production environments where it will block all the other clients. Instead usually BGSAVE is used. However in case of issues preventing Redis to create the background saving child (for instance errors in the fork(2) system call), the SAVE command can be a good last resort to perform the dump of the latest dataset.
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/RedisClientFactory.java b/syncer-transmission/src/main/java/syncer/transmission/client/RedisClientFactory.java
index 9c5327e4..3436e69c 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/RedisClientFactory.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/RedisClientFactory.java
@@ -28,14 +28,14 @@
*/
@Slf4j
public class RedisClientFactory {
- public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl,String masterName,String address) {
+ public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String user,String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl,String masterName,String address) {
RedisClient redisClient = null;
switch (redisType) {
case SINGLE:
if(BreakPointConfig.getBreakpointContinuationType().equals(BreakpointContinuationType.v1)){
- redisClient = new JedisPipeLineClient(host,port,password,count,errorCount,taskId);
+ redisClient = new JedisPipeLineClient(host,port,user,password,count,errorCount,taskId);
}else{
- redisClient = new JedisMultiExecPipeLineClient(host,port,password,sourceHost,sourcePort,count,errorCount,taskId);
+ redisClient = new JedisMultiExecPipeLineClient(host,port,user,password,sourceHost,sourcePort,count,errorCount,taskId);
}
log.info("host[{}],port[{}] , {} client init success",host,port,BreakPointConfig.getBreakpointContinuationType());
break;
@@ -56,14 +56,14 @@ public static RedisClient createRedisClient(RedisType redisType, String host, In
return redisClient;
}
- public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl) {
+ public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String user,String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl) {
RedisClient redisClient = null;
switch (redisType) {
case SINGLE:
if(BreakPointConfig.getBreakpointContinuationType().equals(BreakpointContinuationType.v1)){
- redisClient = new JedisPipeLineClient(host,port,password,count,errorCount,taskId);
+ redisClient = new JedisPipeLineClient(host,port,user,password,count,errorCount,taskId);
}else{
- redisClient = new JedisMultiExecPipeLineClient(host,port,password,sourceHost,sourcePort,count,errorCount,taskId);
+ redisClient = new JedisMultiExecPipeLineClient(host,port,user,password,sourceHost,sourcePort,count,errorCount,taskId);
}
log.info("host[{}],port[{}] , {} client init success",host,port,BreakPointConfig.getBreakpointContinuationType());
break;
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisMultiExecPipeLineClient.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisMultiExecPipeLineClient.java
index d040221d..0f88e0b3 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisMultiExecPipeLineClient.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisMultiExecPipeLineClient.java
@@ -65,6 +65,7 @@ public class JedisMultiExecPipeLineClient implements RedisClient {
protected String sourceHost;
protected Integer sourcePort;
protected String password;
+ protected String user;
protected Jedis targetClient;
protected Pipeline pipelined;
private Integer currentDbNum = 0;
@@ -131,11 +132,12 @@ public class JedisMultiExecPipeLineClient implements RedisClient {
private AtomicBoolean hasMulti=new AtomicBoolean(false);
- public JedisMultiExecPipeLineClient(String host, Integer port, String password,String sourceHost, Integer sourcePort,int count, long errorCount, String taskId) {
+ public JedisMultiExecPipeLineClient(String host, Integer port, String user, String password,String sourceHost, Integer sourcePort,int count, long errorCount, String taskId) {
this.host = host;
this.port = port;
this.taskId = taskId;
this.password=password;
+ this.user=user;
this.sourceHost=sourceHost;
this.sourcePort=sourcePort;
if (count != 0) {
@@ -146,7 +148,7 @@ public JedisMultiExecPipeLineClient(String host, Integer port, String password,S
this.errorCount = errorCount;
}
- targetClient=createJedis(this.host,this.port,password);
+ targetClient=createJedis(this.host,this.port,user,password);
pipelined = targetClient.pipelined();
retry=new ConnectErrorRetry(taskId);
@@ -158,7 +160,7 @@ public JedisMultiExecPipeLineClient(String host, Integer port, String password,S
protected void resizeClient(String host,Integer port,String password){
pipelined.close();
- targetClient=createJedis(this.host,this.port,password);
+ targetClient=createJedis(this.host,this.port,user,password);
pipelined = targetClient.pipelined();
retry=new ConnectErrorRetry(taskId);
}
@@ -1075,9 +1077,11 @@ void selectDb(Long dbNum) {
* @param password
* @return
*/
- protected Jedis createJedis(String host,int port,String password){
+ protected Jedis createJedis(String host,int port,String user,String password){
Jedis jedis=new Jedis(host,port);
- if (!StringUtils.isEmpty(password)) {
+ if (!StringUtils.isEmpty(user)&&!StringUtils.isEmpty(password)) {
+ jedis.auth(user,password);
+ }else if (!StringUtils.isEmpty(password)) {
jedis.auth(password);
}
if(CMD.PONG.equalsIgnoreCase(jedis.ping())){
@@ -1254,7 +1258,7 @@ void compensator(EventEntity eventEntity) {
try {
Jedis client = null;
try {
- client = createJedis(this.host,this.port,password);
+ client = createJedis(this.host,this.port,user,password);
Object result = null;
String command = null;
String key = null;
@@ -1494,7 +1498,7 @@ void compensatorMap(EventEntity eventEntity) {
if (appendMap.containsKey(eventEntity.getStringKey())) {
appendMap.get(eventEntity.getStringKey()).getValue().append(Strings.byteToString(eventEntity.getValue()));
} else {
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
StringBuilder stringBuilder = new StringBuilder();
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
@@ -1514,7 +1518,7 @@ void compensatorMap(EventEntity eventEntity) {
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCR)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
@@ -1526,7 +1530,7 @@ void compensatorMap(EventEntity eventEntity) {
incrMap.get(eventEntity.getStringKey());
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBY)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
int newValue = 0;
String numData = Strings.byteToString(eventEntity.getValueList()[1]);
@@ -1541,7 +1545,7 @@ void compensatorMap(EventEntity eventEntity) {
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBYFLOAT)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
float newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
@@ -1552,7 +1556,7 @@ void compensatorMap(EventEntity eventEntity) {
incrDoubleMap.put(eventEntity.getStringKey(), newValue);
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECR)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
@@ -1565,7 +1569,7 @@ void compensatorMap(EventEntity eventEntity) {
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECRBY)) {
submitCommandNumNow();
- client =createJedis(host,port,password);
+ client =createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java
index efde815e..7c6bbad8 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java
@@ -48,6 +48,7 @@
public class JedisPipeLineClient implements RedisClient {
protected String host;
protected Integer port;
+ protected String user;
protected String password;
protected Jedis targetClient;
protected Pipeline pipelined;
@@ -91,12 +92,13 @@ public class JedisPipeLineClient implements RedisClient {
public JedisPipeLineClient() {
}
- public JedisPipeLineClient(String host, Integer port, String password, int count, long errorCount, String taskId) {
+ public JedisPipeLineClient(String host, Integer port, String user, String password, int count, long errorCount, String taskId) {
this.host = host;
this.port = port;
this.taskId = taskId;
this.password=password;
+ this.user=user;
if (count != 0) {
this.count = count;
}
@@ -105,7 +107,7 @@ public JedisPipeLineClient(String host, Integer port, String password, int count
this.errorCount = errorCount;
}
- targetClient=createJedis(this.host,this.port,password);
+ targetClient=createJedis(this.host,this.port,user,password);
pipelined = targetClient.pipelined();
retry=new ConnectErrorRetry(taskId);
@@ -889,11 +891,14 @@ void selectDb(Long dbNum) {
* @param password
* @return
*/
- protected Jedis createJedis(String host,int port,String password){
+ protected Jedis createJedis(String host,int port,String user,String password){
Jedis jedis=new Jedis(host,port);
- if (!StringUtils.isEmpty(password)) {
+ if(!StringUtils.isEmpty(user)){
+ jedis.auth(user,password);
+ }else if (!StringUtils.isEmpty(password)) {
jedis.auth(password);
}
+
if(CMD.PONG.equalsIgnoreCase(jedis.ping())){
return jedis;
}
@@ -1054,7 +1059,7 @@ protected void compensator(EventEntity eventEntity) {
try {
Jedis client = null;
try {
- client = createJedis(this.host,this.port,password);
+ client = createJedis(this.host,this.port,user,password);
Object result = null;
String command = null;
String key = null;
@@ -1217,7 +1222,7 @@ void compensatorMap(EventEntity eventEntity) {
if (appendMap.containsKey(eventEntity.getStringKey())) {
appendMap.get(eventEntity.getStringKey()).getValue().append(Strings.byteToString(eventEntity.getValue()));
} else {
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
StringBuilder stringBuilder = new StringBuilder();
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
@@ -1237,7 +1242,7 @@ void compensatorMap(EventEntity eventEntity) {
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCR)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
@@ -1249,7 +1254,7 @@ void compensatorMap(EventEntity eventEntity) {
incrMap.get(eventEntity.getStringKey());
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBY)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
int newValue = 0;
String numData = Strings.byteToString(eventEntity.getValueList()[1]);
@@ -1264,7 +1269,7 @@ void compensatorMap(EventEntity eventEntity) {
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBYFLOAT)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
float newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
@@ -1275,7 +1280,7 @@ void compensatorMap(EventEntity eventEntity) {
incrDoubleMap.put(eventEntity.getStringKey(), newValue);
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECR)) {
submitCommandNumNow();
- client = createJedis(host,port,password);
+ client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
@@ -1288,7 +1293,7 @@ void compensatorMap(EventEntity eventEntity) {
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECRBY)) {
submitCommandNumNow();
- client =createJedis(host,port,password);
+ client =createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineMultiRetryRunner.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineMultiRetryRunner.java
index bc16d2ef..62fca3a8 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineMultiRetryRunner.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineMultiRetryRunner.java
@@ -17,7 +17,7 @@ public JedisPipeLineMultiRetryRunner(JedisMultiExecPipeLineClient client) {
@Override
public void run() throws JedisConnectionException {
//runner
- client.targetClient=client.createJedis(client.host,client.port,client.password);
+ client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();
List dataList=new ArrayList<>();
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineRetryRunner.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineRetryRunner.java
index aa531fa3..9c05c308 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineRetryRunner.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineRetryRunner.java
@@ -17,7 +17,7 @@ public JedisPipeLineRetryRunner(JedisPipeLineClient client) {
@Override
public void run() throws JedisConnectionException {
//runner
- client.targetClient=client.createJedis(client.host,client.port,client.password);
+ client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();
List dataList=new ArrayList<>();
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitCommandRetryRunner.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitCommandRetryRunner.java
index 92f16b83..5c56ea62 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitCommandRetryRunner.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitCommandRetryRunner.java
@@ -24,7 +24,7 @@ public JedisPipelineSubmitCommandRetryRunner(JedisPipeLineClient client) {
@Override
public void run() throws JedisConnectionException {
- client.targetClient=client.createJedis(client.host,client.port,client.password);
+ client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();
List dataList=new ArrayList<>();
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitMultiCommandRetryRunner.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitMultiCommandRetryRunner.java
index 2215991a..682e00f7 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitMultiCommandRetryRunner.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipelineSubmitMultiCommandRetryRunner.java
@@ -24,7 +24,7 @@ public JedisPipelineSubmitMultiCommandRetryRunner(JedisMultiExecPipeLineClient c
@Override
public void run() throws JedisConnectionException {
- client.targetClient=client.createJedis(client.host,client.port,client.password);
+ client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();
List dataList=new ArrayList<>();
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/sentinel/FastRedisSentinelClient.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/sentinel/FastRedisSentinelClient.java
index 33e8c551..8af70fa5 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/sentinel/FastRedisSentinelClient.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/sentinel/FastRedisSentinelClient.java
@@ -61,7 +61,7 @@ protected void initClient(HostAndPort master) {
currentHostMaster = master;
log.info("Created Sentinel to master at " + master);
if(Objects.isNull(pipelined)){
- targetClient=createJedis(master.getHost(), master.getPort(),password);
+ targetClient=createJedis(master.getHost(), master.getPort(),user,password);
pipelined = targetClient.pipelined();
log.warn("[{}] connected to [{}:{}]",taskId,master.getHost(),master.getPort());
currentMaster=master.getHost()+":"+master.getPort();
@@ -74,7 +74,7 @@ protected void initClient(HostAndPort master) {
submitCommandNumNow();
pipelined.close();
targetClient.close();
- targetClient=createJedis(master.getHost(), master.getPort(),password);
+ targetClient=createJedis(master.getHost(), master.getPort(),user,password);
pipelined = targetClient.pipelined();
host=master.getHost();
port=master.getPort();
diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/sentinel/RedisSentinel.java b/syncer-transmission/src/main/java/syncer/transmission/client/sentinel/RedisSentinel.java
index d61872ee..654b6216 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/client/sentinel/RedisSentinel.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/client/sentinel/RedisSentinel.java
@@ -36,6 +36,7 @@ public class RedisSentinel {
protected final String channel = "+switch-master";
protected final ScheduledExecutorService schedule = newSingleThreadScheduledExecutor();
protected final AtomicInteger hostsPulseSize = new AtomicInteger(0);
+ protected String user=null;
protected String password=null;
protected String sentinelPassword=null;
private Sentinel sentinel;
@@ -100,9 +101,9 @@ void doSwitchListener(HostAndPort host) {
}
if(BreakpointContinuationType.v1.equals(breakpointContinuationType)){
- client = new JedisPipeLineClient(host.getHost(),host.getPort(),password,count,errorCount,taskId);
+ client = new JedisPipeLineClient(host.getHost(),host.getPort(),user,password,count,errorCount,taskId);
}else {
- client = new JedisMultiExecPipeLineClient(host.getHost(),host.getPort(),password,sourceHost,sourcePort,count,errorCount,taskId);
+ client = new JedisMultiExecPipeLineClient(host.getHost(),host.getPort(),user,password,sourceHost,sourcePort,count,errorCount,taskId);
}
}
diff --git a/syncer-transmission/src/main/java/syncer/transmission/model/TaskModel.java b/syncer-transmission/src/main/java/syncer/transmission/model/TaskModel.java
index d684bff0..13945f23 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/model/TaskModel.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/model/TaskModel.java
@@ -426,15 +426,15 @@ public Map getDataAnalysis(){
}
public String getSourceUri(){
- return getUri(getSourceRedisAddress(),getSourcePassword(),sourceRedisMasterName,sourceSentinelAuthPassword);
+ return getUri(getSourceRedisAddress(),getSourceUserName(),getSourcePassword(),sourceRedisMasterName,sourceSentinelAuthPassword);
}
public Set getTargetUri(){
- return getUrlList(getTargetRedisAddress(),getTargetPassword(),targetRedisMasterName,targetSentinelAuthPassword);
+ return getUrlList(getTargetRedisAddress(),getTargetUserName(),getTargetPassword(),targetRedisMasterName,targetSentinelAuthPassword);
}
- private Set getUrlList(String sourceUrls, String password,String redisMasterName,String sentinelAuthPassword) {
+ private Set getUrlList(String sourceUrls, String authUser,String password,String redisMasterName,String sentinelAuthPassword) {
Set urlList = new HashSet<>();
if (StringUtils.isEmpty(sourceUrls)){
return new HashSet<>();
@@ -442,7 +442,7 @@ private Set getUrlList(String sourceUrls, String password,String redis
String[] sourceUrlsList = sourceUrls.split(";");
//循环遍历所有的url
for (String url : sourceUrlsList) {
- String uri=getUri(url,password,redisMasterName,sentinelAuthPassword);
+ String uri=getUri(url,authUser,password,redisMasterName,sentinelAuthPassword);
if(!StringUtils.isEmpty(uri)) {
urlList.add(uri);
}
@@ -450,7 +450,7 @@ private Set getUrlList(String sourceUrls, String password,String redis
return urlList;
}
- public String getUri(String address,String password,String masterName,String sentinelPassword){
+ public String getUri(String address,String authUser,String password,String masterName,String sentinelPassword){
StringBuilder stringHead = new StringBuilder("redis://");
int index=0;
//如果截取出空字符串直接跳过
@@ -463,6 +463,18 @@ public String getUri(String address,String password,String masterName,String se
stringHead.append(password);
}
+ if(authUser!=null&&authUser.length()>0){
+ if(index>0){
+ stringHead.append("&authUser=");
+ stringHead.append(authUser);
+ index++;
+ }else{
+ stringHead.append("?authUser=");
+ stringHead.append(authUser);
+ index++;
+ }
+ }
+
if(masterName!=null&&masterName.length()>0){
if(index>0){
stringHead.append("&masterRedisName=");
@@ -500,7 +512,7 @@ public String[] getSourceHostUris(){
try {
String[]host=sourceHost.split(";");
for (int i = 0; i < host.length; i++) {
- host[i]=getUri(host[i],sourcePassword,sourceRedisMasterName,sourceSentinelAuthPassword);
+ host[i]=getUri(host[i],sourceUserName,sourcePassword,sourceRedisMasterName,sourceSentinelAuthPassword);
}
return host;
}catch (Exception e){
@@ -517,7 +529,7 @@ public String[] getSourceHostUris(){
public String[] getTargetHostUris(){
String[]host=targetHost.split(";");
for (int i = 0; i < host.length; i++) {
- host[i]=getUri(host[i],targetPassword,targetRedisMasterName,targetSentinelAuthPassword);
+ host[i]=getUri(host[i],targetUserName,targetPassword,targetRedisMasterName,targetSentinelAuthPassword);
}
return host;
}
diff --git a/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java b/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java
index 30e7f543..68a11618 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java
@@ -11,6 +11,7 @@
package syncer.transmission.queue;
+import com.alibaba.fastjson.JSON;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
@@ -81,7 +82,7 @@ public void run(KeyValueEventEntity keyValueEventEntity){
Event event=keyValueEventEntity.getEvent();
String keyName=null;
String command=null;
- String value=null;
+ String value="";
int dataType=12;
long ttl=-1L;
if(event instanceof DefaultCommand){
@@ -89,12 +90,18 @@ public void run(KeyValueEventEntity keyValueEventEntity){
command= Strings.byteToString(defaultCommand.getCommand());
if(defaultCommand.getArgs().length>0){
keyName= Strings.byteToString(((DefaultCommand) event).getCommand())+Strings.byteToString(((DefaultCommand) event).getArgs()[0]);
+ String []values=Strings.byteToString(((DefaultCommand) event).getArgs());
+ for (int i=0 ;i= 0) {
- System.out.println("String :" + data);
+ log.info("ERROR String :{}", data);
return false;
}
diff --git a/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java b/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java
index 632a8f80..8fa033cc 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java
@@ -41,10 +41,13 @@ public synchronized static void updateKeyCount(String taskId, RedisURI suri){
target = new Jedis(suri.getHost(), suri.getPort());
//获取password
- if (!StringUtils.isEmpty(tconfig.getAuthPassword())) {
+ if(!StringUtils.isEmpty(tconfig.getAuthUser())&&!StringUtils.isEmpty(tconfig.getAuthPassword())){
+ Object auth = target.auth(tconfig.getAuthUser(),tconfig.getAuthPassword());
+ }else if (!StringUtils.isEmpty(tconfig.getAuthPassword())) {
Object auth = target.auth( tconfig.getAuthPassword());
}
+
//重试三次
int i = 3;
while (i > 0) {
diff --git a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisReplIdCheck.java b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisReplIdCheck.java
index 756a2330..5480386b 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisReplIdCheck.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisReplIdCheck.java
@@ -59,7 +59,7 @@ public String[] selectSyncerBuffer(String targetUri,String type) throws URISynt
ReplicConfig targetConfig = ReplicConfig.valueOf(targetUriplus);
if(!StringUtils.isEmpty(targetConfig.getAuthUser())&&!StringUtils.isEmpty(targetConfig.getAuthPassword())){
- Object targetAuth = target.auth(targetConfig.getAuthUser()+" "+targetConfig.getAuthPassword());
+ Object targetAuth = target.auth(targetConfig.getAuthUser(),targetConfig.getAuthPassword());
}else if (!StringUtils.isEmpty(targetConfig.getAuthPassword())) {
Object targetAuth = target.auth(targetConfig.getAuthPassword());
}
diff --git a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java
index 827849db..2875bc41 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java
@@ -130,7 +130,9 @@ public boolean checkClientConnectState(String url, String clientName,boolean sen
String auth = target.auth(tconfig.getSentinelAuthPassword());
}
}else {
- if (!StringUtils.isEmpty(tconfig.getAuthPassword())) {
+ if(!StringUtils.isEmpty(tconfig.getAuthUser())){
+ String auth = target.auth(tconfig.getAuthUser(),tconfig.getAuthPassword());
+ }else if (!StringUtils.isEmpty(tconfig.getAuthPassword())) {
String auth = target.auth(tconfig.getAuthPassword());
}
}
diff --git a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisVersionUtil.java b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisVersionUtil.java
index 398f47b8..8819434c 100644
--- a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisVersionUtil.java
+++ b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisVersionUtil.java
@@ -82,7 +82,9 @@ public String selectSyncerVersion(String targetUri) throws URISyntaxException, T
ReplicConfig targetConfig = ReplicConfig.valueOf(targetUriplus);
//获取password
- if (!StringUtils.isEmpty(targetConfig.getAuthPassword())) {
+ if(!StringUtils.isEmpty(targetConfig.getAuthUser())){
+ Object targetAuth = target.auth(targetConfig.getAuthUser(),targetConfig.getAuthPassword());
+ }else if (!StringUtils.isEmpty(targetConfig.getAuthPassword())) {
Object targetAuth = target.auth(targetConfig.getAuthPassword());
}
String info = target.info();
diff --git a/syncer-webapp/src/main/java/syncer/webapp/util/DtoToTaskModelUtils.java b/syncer-webapp/src/main/java/syncer/webapp/util/DtoToTaskModelUtils.java
index 8a0435c2..05e2fe48 100644
--- a/syncer-webapp/src/main/java/syncer/webapp/util/DtoToTaskModelUtils.java
+++ b/syncer-webapp/src/main/java/syncer/webapp/util/DtoToTaskModelUtils.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.logging.log4j.util.Strings;
import org.springframework.util.StringUtils;
import syncer.common.constant.ResultCodeAndMessage;
import syncer.common.exception.TaskMsgException;
@@ -131,9 +132,9 @@ public class DtoToTaskModelUtils {
.fileAddress("")
//Redis 6.0 ACL相关
.sourceAcl(param.isSourceAcl())
-// .sourceUserName(param.getSourceUserName())
+ .sourceUserName(param.getSourceUserName())
.targetAcl(param.isTargetAcl())
-// .targetUserName(param.getTargetUserName())
+ .targetUserName(param.getTargetUserName())
.syncType(SyncTypeUtils.getSyncType(syncerType).getCode())
.errorCount(param.getErrorCount())
.timeDeviation(param.getTimeDeviation())
@@ -153,6 +154,22 @@ public class DtoToTaskModelUtils {
}catch (Exception e){
throw new TaskMsgException(CodeUtils.codeMessages("100","请确保targetRedisVersion正确(请保留小数点后一位)"));
}
+
+
+ if(Strings.isEmpty(param.getTargetUserName())){
+ if(param.getTargetPassword().contains(" ")){
+ taskModel.setTargetUserName(param.getTargetPassword().split(" ")[0]);
+ taskModel.setTargetPassword(param.getTargetPassword().split(" ")[1]);
+ }
+ }
+
+ if(Strings.isEmpty(param.getSourceUserName())){
+ if(param.getSourcePassword().contains(" ")){
+ taskModel.setSourceUserName(param.getSourcePassword().split(" ")[0]);
+ taskModel.setSourcePassword(param.getSourcePassword().split(" ")[1]);
+ }
+ }
+
if(param.getDbMapper()!=null){
taskModel.setDbMapper(JSON.toJSONString(param.getDbMapper()));
}else {
@@ -382,6 +399,14 @@ public synchronized static List getTaskModelList(CreateDumpUpParam pa
.targetUserName(param.getTargetUserName())
.errorCount(param.getErrorCount())
.build();
+
+ if(Strings.isEmpty(param.getSourceUserName())){
+ if(param.getSourcePassword().contains(" ")){
+ taskModel.setSourceUserName(param.getSourcePassword().split(" ")[0]);
+ taskModel.setSourcePassword(param.getSourcePassword().split(" ")[1]);
+ }
+ }
+
if (param.getDbMapper() != null) {
taskModel.setDbMapper(JSON.toJSONString(param.getDbMapper()));
} else {