From 24bbb7bce4943bb46423d7ba00e14672a52bf95f Mon Sep 17 00:00:00 2001 From: Eq Zhan <35362437+pingxingshikong@users.noreply.github.com> Date: Wed, 31 May 2023 15:40:16 +0800 Subject: [PATCH] fixed:fix the issue of Redis 'SET key value nx ex 10' format parsing failure --- pom.xml | 7 ++- .../client/impl/JedisPipeLineClient.java | 55 +++++++++++++++---- .../queue/SendCommandWithOutQueue.java | 22 +++++++- ...mmandProcessingAofCommandSendStrategy.java | 1 + 4 files changed, 73 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index e985f1b4..6a13214a 100644 --- a/pom.xml +++ b/pom.xml @@ -111,12 +111,17 @@ + + + + + org.xerial sqlite-jdbc 3.21.0.1 + 3.39.3.0 - org.mybatis.spring.boot mybatis-spring-boot-starter diff --git a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java index 34d63b30..efde815e 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java +++ b/syncer-transmission/src/main/java/syncer/transmission/client/impl/JedisPipeLineClient.java @@ -672,19 +672,54 @@ public Object send(byte[] cmd, byte[]... args) { if (Strings.byteToString(cmd).toUpperCase().indexOf("SET") >= 0 || Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("RESTORE") || Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("RESTOREREPLACE") || Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("DEL")) { cleanData(Strings.byteToString(args[0])); } + + if (isSetNxWithTime(cmd, args)) { String byte3 = Strings.byteToString(args[3]); + String byte4 = Strings.byteToString(args[4]); + try { + long setExData=Long.parseLong(byte3); + }catch (NumberFormatException e){ + byte3="ex"; + } - kvPersistence.addKey(EventEntity - .builder() - .key(args[0]) - .value(args[1]) - .stringKey(Strings.byteToString(args[0])) - .pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME) - .dbNum(Long.valueOf(currentDbNum)) - .cmd("SET".getBytes()) - .ms(Long.parseLong(byte3)) - .build()); + if("ex".equalsIgnoreCase(byte3)||"nx".equalsIgnoreCase(byte3)||"xx".equalsIgnoreCase(byte3)||"px".equalsIgnoreCase(byte3)){ + kvPersistence.addKey(EventEntity + .builder() + .key(args[0]) + .value(args[1]) + .stringKey(Strings.byteToString(args[0])) + .pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME) + .dbNum(Long.valueOf(currentDbNum)) + .cmd("SET".getBytes()) + .ms(Long.parseLong(byte4)) + .build()); + }else { + kvPersistence.addKey(EventEntity + .builder() + .key(args[0]) + .value(args[1]) + .stringKey(Strings.byteToString(args[0])) + .pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME) + .dbNum(Long.valueOf(currentDbNum)) + .cmd("SET".getBytes()) + .ms(Long.parseLong(byte3)) + .build()); + } + +// if (isSetNxWithTime(cmd, args)) { +// String byte3 = Strings.byteToString(args[3]); +// +// kvPersistence.addKey(EventEntity +// .builder() +// .key(args[0]) +// .value(args[1]) +// .stringKey(Strings.byteToString(args[0])) +// .pipeLineCompensatorEnum(PipeLineCompensatorEnum.SET_WITH_TIME) +// .dbNum(Long.valueOf(currentDbNum)) +// .cmd("SET".getBytes()) +// .ms(Long.parseLong(byte3)) +// .build()); addCommandNum(); } else { if (args == null || args.length <= 0) { diff --git a/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java b/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java index 25714e39..30e7f543 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java +++ b/syncer-transmission/src/main/java/syncer/transmission/queue/SendCommandWithOutQueue.java @@ -158,7 +158,27 @@ public void run(KeyValueEventEntity keyValueEventEntity){ SingleTaskDataManagerUtils.brokenStatusAndLog("被抛弃key数量到达阈值[" + errorCount + "],exception reason["+e.getMessage()+"]", this.getClass(), taskId); } } - log.error("[{}]抛弃key:{} ,class:[{}]:原因[{}]",taskId, keyName,event.getClass().toString(),e.getMessage()); + String stringCommand=command; + if(event instanceof DefaultCommand){ + try { + DefaultCommand defaultCommand= (DefaultCommand) event; + command= Strings.byteToString(defaultCommand.getCommand()); + String [] args=Strings.byteToString(defaultCommand.getArgs()); + StringBuilder commands=new StringBuilder(); + commands.append(" ").append(command); + for (int i=0;iAofCommandSendStrategy",e.getCause()); } }