Skip to content

Commit

Permalink
fix the issue of Redis using ACL connection failure
Browse files Browse the repository at this point in the history
  • Loading branch information
pingxingshikong committed Jul 12, 2023
1 parent 24bbb7b commit c992790
Show file tree
Hide file tree
Showing 23 changed files with 139 additions and 57 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.21.0.1</version>
<version>3.39.3.0</version>
</dependency>
<dependency>
Expand Down
11 changes: 10 additions & 1 deletion syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,16 @@ public void brpop(final int timeout, final byte[]... keys) {

public void auth(final String password) {
setPassword(password);
sendCommand(AUTH, password);
if(password.contains(" ")){
auth(password.split(" ")[0],password.split(" ")[1]);
}else{
sendCommand(AUTH, password);
}
}

public void auth(final String user,final String password) {
setPassword(user+" "+password);
sendCommand(AUTH, user,password);
}

public void subscribe(final byte[]... channels) {
Expand Down
8 changes: 8 additions & 0 deletions syncer-jedis/src/main/java/syncer/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,14 @@ public String auth(final String password) {
return client.getStatusCodeReply();
}

@Override
public String auth(final String user,final String password) {
checkIsInMultiOrPipeline();
client.auth(user,password);
return client.getStatusCodeReply();
}


public Pipeline pipelined() {
pipeline = new Pipeline();
pipeline.setClient(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public interface BasicCommands {
*/
String auth(String password);

String auth(String user,String password);
/**
* The SAVE commands performs a synchronous save of the dataset producing a point in time snapshot of all the data inside the Redis instance, in the form of an RDB file.
You almost never want to call SAVE in production environments where it will block all the other clients. Instead usually BGSAVE is used. However in case of issues preventing Redis to create the background saving child (for instance errors in the fork(2) system call), the SAVE command can be a good last resort to perform the dump of the latest dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
*/
@Slf4j
public class RedisClientFactory {
public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl,String masterName,String address) {
public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String user,String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl,String masterName,String address) {
RedisClient redisClient = null;
switch (redisType) {
case SINGLE:
if(BreakPointConfig.getBreakpointContinuationType().equals(BreakpointContinuationType.v1)){
redisClient = new JedisPipeLineClient(host,port,password,count,errorCount,taskId);
redisClient = new JedisPipeLineClient(host,port,user,password,count,errorCount,taskId);
}else{
redisClient = new JedisMultiExecPipeLineClient(host,port,password,sourceHost,sourcePort,count,errorCount,taskId);
redisClient = new JedisMultiExecPipeLineClient(host,port,user,password,sourceHost,sourcePort,count,errorCount,taskId);
}
log.info("host[{}],port[{}] , {} client init success",host,port,BreakPointConfig.getBreakpointContinuationType());
break;
Expand All @@ -56,14 +56,14 @@ public static RedisClient createRedisClient(RedisType redisType, String host, In
return redisClient;
}

public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl) {
public static RedisClient createRedisClient(RedisType redisType, String host, Integer port, String user,String password, String sourceHost, Integer sourcePort, int count, long errorCount, String taskId, String jimUrl, String cfsUrl) {
RedisClient redisClient = null;
switch (redisType) {
case SINGLE:
if(BreakPointConfig.getBreakpointContinuationType().equals(BreakpointContinuationType.v1)){
redisClient = new JedisPipeLineClient(host,port,password,count,errorCount,taskId);
redisClient = new JedisPipeLineClient(host,port,user,password,count,errorCount,taskId);
}else{
redisClient = new JedisMultiExecPipeLineClient(host,port,password,sourceHost,sourcePort,count,errorCount,taskId);
redisClient = new JedisMultiExecPipeLineClient(host,port,user,password,sourceHost,sourcePort,count,errorCount,taskId);
}
log.info("host[{}],port[{}] , {} client init success",host,port,BreakPointConfig.getBreakpointContinuationType());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class JedisMultiExecPipeLineClient implements RedisClient {
protected String sourceHost;
protected Integer sourcePort;
protected String password;
protected String user;
protected Jedis targetClient;
protected Pipeline pipelined;
private Integer currentDbNum = 0;
Expand Down Expand Up @@ -131,11 +132,12 @@ public class JedisMultiExecPipeLineClient implements RedisClient {
private AtomicBoolean hasMulti=new AtomicBoolean(false);


public JedisMultiExecPipeLineClient(String host, Integer port, String password,String sourceHost, Integer sourcePort,int count, long errorCount, String taskId) {
public JedisMultiExecPipeLineClient(String host, Integer port, String user, String password,String sourceHost, Integer sourcePort,int count, long errorCount, String taskId) {
this.host = host;
this.port = port;
this.taskId = taskId;
this.password=password;
this.user=user;
this.sourceHost=sourceHost;
this.sourcePort=sourcePort;
if (count != 0) {
Expand All @@ -146,7 +148,7 @@ public JedisMultiExecPipeLineClient(String host, Integer port, String password,S
this.errorCount = errorCount;
}

targetClient=createJedis(this.host,this.port,password);
targetClient=createJedis(this.host,this.port,user,password);

pipelined = targetClient.pipelined();
retry=new ConnectErrorRetry(taskId);
Expand All @@ -158,7 +160,7 @@ public JedisMultiExecPipeLineClient(String host, Integer port, String password,S

protected void resizeClient(String host,Integer port,String password){
pipelined.close();
targetClient=createJedis(this.host,this.port,password);
targetClient=createJedis(this.host,this.port,user,password);
pipelined = targetClient.pipelined();
retry=new ConnectErrorRetry(taskId);
}
Expand Down Expand Up @@ -1075,9 +1077,11 @@ void selectDb(Long dbNum) {
* @param password
* @return
*/
protected Jedis createJedis(String host,int port,String password){
protected Jedis createJedis(String host,int port,String user,String password){
Jedis jedis=new Jedis(host,port);
if (!StringUtils.isEmpty(password)) {
if (!StringUtils.isEmpty(user)&&!StringUtils.isEmpty(password)) {
jedis.auth(user,password);
}else if (!StringUtils.isEmpty(password)) {
jedis.auth(password);
}
if(CMD.PONG.equalsIgnoreCase(jedis.ping())){
Expand Down Expand Up @@ -1254,7 +1258,7 @@ void compensator(EventEntity eventEntity) {
try {
Jedis client = null;
try {
client = createJedis(this.host,this.port,password);
client = createJedis(this.host,this.port,user,password);
Object result = null;
String command = null;
String key = null;
Expand Down Expand Up @@ -1494,7 +1498,7 @@ void compensatorMap(EventEntity eventEntity) {
if (appendMap.containsKey(eventEntity.getStringKey())) {
appendMap.get(eventEntity.getStringKey()).getValue().append(Strings.byteToString(eventEntity.getValue()));
} else {
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
StringBuilder stringBuilder = new StringBuilder();
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
Expand All @@ -1514,7 +1518,7 @@ void compensatorMap(EventEntity eventEntity) {

} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCR)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
Expand All @@ -1526,7 +1530,7 @@ void compensatorMap(EventEntity eventEntity) {
incrMap.get(eventEntity.getStringKey());
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBY)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
int newValue = 0;
String numData = Strings.byteToString(eventEntity.getValueList()[1]);
Expand All @@ -1541,7 +1545,7 @@ void compensatorMap(EventEntity eventEntity) {

} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBYFLOAT)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
float newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
Expand All @@ -1552,7 +1556,7 @@ void compensatorMap(EventEntity eventEntity) {
incrDoubleMap.put(eventEntity.getStringKey(), newValue);
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECR)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
Expand All @@ -1565,7 +1569,7 @@ void compensatorMap(EventEntity eventEntity) {

} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECRBY)) {
submitCommandNumNow();
client =createJedis(host,port,password);
client =createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
public class JedisPipeLineClient implements RedisClient {
protected String host;
protected Integer port;
protected String user;
protected String password;
protected Jedis targetClient;
protected Pipeline pipelined;
Expand Down Expand Up @@ -91,12 +92,13 @@ public class JedisPipeLineClient implements RedisClient {
public JedisPipeLineClient() {
}

public JedisPipeLineClient(String host, Integer port, String password, int count, long errorCount, String taskId) {
public JedisPipeLineClient(String host, Integer port, String user, String password, int count, long errorCount, String taskId) {

this.host = host;
this.port = port;
this.taskId = taskId;
this.password=password;
this.user=user;
if (count != 0) {
this.count = count;
}
Expand All @@ -105,7 +107,7 @@ public JedisPipeLineClient(String host, Integer port, String password, int count
this.errorCount = errorCount;
}

targetClient=createJedis(this.host,this.port,password);
targetClient=createJedis(this.host,this.port,user,password);

pipelined = targetClient.pipelined();
retry=new ConnectErrorRetry(taskId);
Expand Down Expand Up @@ -889,11 +891,14 @@ void selectDb(Long dbNum) {
* @param password
* @return
*/
protected Jedis createJedis(String host,int port,String password){
protected Jedis createJedis(String host,int port,String user,String password){
Jedis jedis=new Jedis(host,port);
if (!StringUtils.isEmpty(password)) {
if(!StringUtils.isEmpty(user)){
jedis.auth(user,password);
}else if (!StringUtils.isEmpty(password)) {
jedis.auth(password);
}

if(CMD.PONG.equalsIgnoreCase(jedis.ping())){
return jedis;
}
Expand Down Expand Up @@ -1054,7 +1059,7 @@ protected void compensator(EventEntity eventEntity) {
try {
Jedis client = null;
try {
client = createJedis(this.host,this.port,password);
client = createJedis(this.host,this.port,user,password);
Object result = null;
String command = null;
String key = null;
Expand Down Expand Up @@ -1217,7 +1222,7 @@ void compensatorMap(EventEntity eventEntity) {
if (appendMap.containsKey(eventEntity.getStringKey())) {
appendMap.get(eventEntity.getStringKey()).getValue().append(Strings.byteToString(eventEntity.getValue()));
} else {
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
StringBuilder stringBuilder = new StringBuilder();
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
Expand All @@ -1237,7 +1242,7 @@ void compensatorMap(EventEntity eventEntity) {

} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCR)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
Expand All @@ -1249,7 +1254,7 @@ void compensatorMap(EventEntity eventEntity) {
incrMap.get(eventEntity.getStringKey());
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBY)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
int newValue = 0;
String numData = Strings.byteToString(eventEntity.getValueList()[1]);
Expand All @@ -1264,7 +1269,7 @@ void compensatorMap(EventEntity eventEntity) {

} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.INCRBYFLOAT)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
float newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue) || oldValue.equalsIgnoreCase("null")) {
Expand All @@ -1275,7 +1280,7 @@ void compensatorMap(EventEntity eventEntity) {
incrDoubleMap.put(eventEntity.getStringKey(), newValue);
} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECR)) {
submitCommandNumNow();
client = createJedis(host,port,password);
client = createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
Expand All @@ -1288,7 +1293,7 @@ void compensatorMap(EventEntity eventEntity) {

} else if (eventEntity.getPipeLineCompensatorEnum().equals(PipeLineCompensatorEnum.DECRBY)) {
submitCommandNumNow();
client =createJedis(host,port,password);
client =createJedis(host,port,user,password);
String oldValue = client.get(eventEntity.getStringKey());
Integer newValue = 0;
if (org.springframework.util.StringUtils.isEmpty(oldValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public JedisPipeLineMultiRetryRunner(JedisMultiExecPipeLineClient client) {
@Override
public void run() throws JedisConnectionException {
//runner
client.targetClient=client.createJedis(client.host,client.port,client.password);
client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();

List<EventEntity> dataList=new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public JedisPipeLineRetryRunner(JedisPipeLineClient client) {
@Override
public void run() throws JedisConnectionException {
//runner
client.targetClient=client.createJedis(client.host,client.port,client.password);
client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();

List<EventEntity> dataList=new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public JedisPipelineSubmitCommandRetryRunner(JedisPipeLineClient client) {
@Override
public void run() throws JedisConnectionException {

client.targetClient=client.createJedis(client.host,client.port,client.password);
client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();

List<EventEntity> dataList=new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public JedisPipelineSubmitMultiCommandRetryRunner(JedisMultiExecPipeLineClient c
@Override
public void run() throws JedisConnectionException {

client.targetClient=client.createJedis(client.host,client.port,client.password);
client.targetClient=client.createJedis(client.host,client.port,client.user,client.password);
client.pipelined = client.targetClient.pipelined();

List<EventEntity> dataList=new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected void initClient(HostAndPort master) {
currentHostMaster = master;
log.info("Created Sentinel to master at " + master);
if(Objects.isNull(pipelined)){
targetClient=createJedis(master.getHost(), master.getPort(),password);
targetClient=createJedis(master.getHost(), master.getPort(),user,password);
pipelined = targetClient.pipelined();
log.warn("[{}] connected to [{}:{}]",taskId,master.getHost(),master.getPort());
currentMaster=master.getHost()+":"+master.getPort();
Expand All @@ -74,7 +74,7 @@ protected void initClient(HostAndPort master) {
submitCommandNumNow();
pipelined.close();
targetClient.close();
targetClient=createJedis(master.getHost(), master.getPort(),password);
targetClient=createJedis(master.getHost(), master.getPort(),user,password);
pipelined = targetClient.pipelined();
host=master.getHost();
port=master.getPort();
Expand Down
Loading

0 comments on commit c992790

Please sign in to comment.