diff --git a/README.md b/README.md index 9b9622cc..8ebb7b96 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ rdb version 7 com.moilioncircle redis-replicator - 1.0.2 + 1.0.3 ``` @@ -41,7 +41,7 @@ rdb version 7 ##File ```java - RedisReplicator replicator = new RedisReplicator(new File("dump.rdb")); + RedisReplicator replicator = new RedisReplicator(new File("dump.rdb"), Configuration.defaultSetting()); replicator.addRdbFilter(new RdbFilter() { @Override public boolean accept(KeyValuePair kv) { diff --git a/pom.xml b/pom.xml index 433159f0..83b1af25 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ com.moilioncircle redis-replicator - 1.0.2 + 1.0.3 redis-replicator Redis Replicator is a redis RDB and Command parser written in java. diff --git a/src/main/java/com/moilioncircle/redis/replicator/AbstractReplicator.java b/src/main/java/com/moilioncircle/redis/replicator/AbstractReplicator.java index 115b67bb..cc7c18f3 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/AbstractReplicator.java +++ b/src/main/java/com/moilioncircle/redis/replicator/AbstractReplicator.java @@ -24,6 +24,7 @@ import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -32,11 +33,13 @@ */ public abstract class AbstractReplicator implements Replicator { protected RedisInputStream inputStream; + protected BlockingQueue eventQueue; protected final ConcurrentHashMap> commands = new ConcurrentHashMap<>(); protected final List filters = new CopyOnWriteArrayList<>(); protected final List listeners = new CopyOnWriteArrayList<>(); protected final List rdbFilters = new CopyOnWriteArrayList<>(); protected final List rdbListeners = new CopyOnWriteArrayList<>(); + protected final EventHandlerWorker worker = new EventHandlerWorker(this); @Override public void doCommandHandler(Command command) { diff --git a/src/main/java/com/moilioncircle/redis/replicator/Configuration.java b/src/main/java/com/moilioncircle/redis/replicator/Configuration.java index 3d161623..e48c9596 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/Configuration.java +++ b/src/main/java/com/moilioncircle/redis/replicator/Configuration.java @@ -80,6 +80,11 @@ public static Configuration defaultSetting() { */ private String masterRunId = "?"; + /** + * blocking queue size + */ + private int eventQueueSize = 1000; + /** * psync offset */ @@ -179,4 +184,13 @@ public Configuration addOffset(long offset) { this.offset.addAndGet(offset); return this; } + + public int getEventQueueSize() { + return eventQueueSize; + } + + public Configuration setEventQueueSize(int eventQueueSize) { + this.eventQueueSize = eventQueueSize; + return this; + } } diff --git a/src/main/java/com/moilioncircle/redis/replicator/EventHandlerWorker.java b/src/main/java/com/moilioncircle/redis/replicator/EventHandlerWorker.java new file mode 100644 index 00000000..d5d4bc7a --- /dev/null +++ b/src/main/java/com/moilioncircle/redis/replicator/EventHandlerWorker.java @@ -0,0 +1,79 @@ +/* + * Copyright 2016 leon chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.moilioncircle.redis.replicator; + +import com.moilioncircle.redis.replicator.cmd.Command; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Created by leon on 8/25/16. + */ +class EventHandlerWorker extends Thread implements Closeable { + private static final Log logger = LogFactory.getLog(EventHandlerWorker.class); + + private final AbstractReplicator replicator; + private AtomicBoolean isClosed = new AtomicBoolean(false); + + public EventHandlerWorker(AbstractReplicator replicator) { + this.replicator = replicator; + setDaemon(true); + setName("event-handler-worker"); + } + + @Override + public void run() { + while (!isClosed.get()) { + try { + Object object = replicator.eventQueue.take(); + if (object instanceof KeyValuePair) { + KeyValuePair kv = (KeyValuePair) object; + if (!replicator.doRdbFilter(kv)) continue; + replicator.doRdbHandler(kv); + } else if (object instanceof Command) { + Command command = (Command) object; + if (!replicator.doCommandFilter(command)) continue; + replicator.doCommandHandler(command); + } else { + throw new AssertionError(object); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Throwable e) { + exceptionHandler(e); + } + } + } + + @Override + public void close() throws IOException { + isClosed.compareAndSet(false, true); + } + + public boolean isClosed() { + return isClosed.get(); + } + + protected void exceptionHandler(Throwable e) { + logger.error(e); + } +} diff --git a/src/main/java/com/moilioncircle/redis/replicator/RedisFileReplicator.java b/src/main/java/com/moilioncircle/redis/replicator/RedisFileReplicator.java index 948e9f99..41174657 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/RedisFileReplicator.java +++ b/src/main/java/com/moilioncircle/redis/replicator/RedisFileReplicator.java @@ -20,28 +20,32 @@ import com.moilioncircle.redis.replicator.rdb.RdbParser; import java.io.*; +import java.util.concurrent.ArrayBlockingQueue; /** * Created by leon on 8/13/16. */ -public class RedisFileReplicator extends AbstractReplicator { +class RedisFileReplicator extends AbstractReplicator { - public RedisFileReplicator(File file) throws FileNotFoundException { - this.inputStream = new RedisInputStream(new FileInputStream(file)); + public RedisFileReplicator(File file, Configuration configuration) throws FileNotFoundException { + this(new RedisInputStream(new FileInputStream(file)), configuration); } - public RedisFileReplicator(InputStream in) { - this.inputStream = new RedisInputStream(in); + public RedisFileReplicator(InputStream in, Configuration configuration) { + this.inputStream = new RedisInputStream(in, configuration.getBufferSize()); + this.eventQueue = new ArrayBlockingQueue<>(configuration.getEventQueueSize()); } @Override public void open() throws IOException { - RdbParser parser = new RdbParser(inputStream, this); + worker.start(); + RdbParser parser = new RdbParser(inputStream, this, this.eventQueue); parser.parse(); } @Override public void close() throws IOException { inputStream.close(); + if (worker != null && !worker.isClosed()) worker.close(); } } diff --git a/src/main/java/com/moilioncircle/redis/replicator/RedisReplicator.java b/src/main/java/com/moilioncircle/redis/replicator/RedisReplicator.java index 13bddebd..52eca9a5 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/RedisReplicator.java +++ b/src/main/java/com/moilioncircle/redis/replicator/RedisReplicator.java @@ -32,12 +32,12 @@ public class RedisReplicator implements Replicator { private final Replicator replicator; - public RedisReplicator(File file) throws FileNotFoundException { - replicator = new RedisFileReplicator(file); + public RedisReplicator(File file, Configuration configuration) throws FileNotFoundException { + replicator = new RedisFileReplicator(file, configuration); } - public RedisReplicator(InputStream in) { - replicator = new RedisFileReplicator(in); + public RedisReplicator(InputStream in, Configuration configuration) { + replicator = new RedisFileReplicator(in, configuration); } public RedisReplicator(String host, int port, Configuration configuration) throws IOException { diff --git a/src/main/java/com/moilioncircle/redis/replicator/RedisSocketReplicator.java b/src/main/java/com/moilioncircle/redis/replicator/RedisSocketReplicator.java index 1247ae1d..a73c8e4e 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/RedisSocketReplicator.java +++ b/src/main/java/com/moilioncircle/redis/replicator/RedisSocketReplicator.java @@ -31,6 +31,8 @@ import java.util.Arrays; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import static com.moilioncircle.redis.replicator.Constants.DOLLAR; @@ -39,7 +41,7 @@ /** * Created by leon on 8/9/16. */ -public class RedisSocketReplicator extends AbstractReplicator { +class RedisSocketReplicator extends AbstractReplicator { private static final Log logger = LogFactory.getLog(RedisSocketReplicator.class); @@ -57,6 +59,7 @@ public RedisSocketReplicator(String host, int port, Configuration configuration) this.host = host; this.port = port; this.configuration = configuration; + this.eventQueue = new ArrayBlockingQueue<>(configuration.getEventQueueSize()); buildInCommandParserRegister(); } @@ -67,6 +70,7 @@ public RedisSocketReplicator(String host, int port, Configuration configuration) */ @Override public void open() throws IOException { + worker.start(); for (int i = 0; i < configuration.getRetries(); i++) { try { @@ -126,22 +130,21 @@ public void handle(long len) { CommandParser operations = commands.get(cmdName); Command parsedCommand = operations.parse(cmdName, params); - //do command filter - if (!doCommandFilter(parsedCommand)) continue; - - //do command handler - doCommandHandler(parsedCommand); + //submit event + this.eventQueue.put(parsedCommand); } else { if (logger.isInfoEnabled()) logger.info("Redis reply:" + obj); } } //connected = false break; - } catch (SocketException | SocketTimeoutException e) { + } catch (SocketException | SocketTimeoutException | InterruptedException e) { + logger.error(e); //when close socket manual if (!connected.get()) { break; } + //connect refused //connect timeout //read timeout //connect abort @@ -150,13 +153,15 @@ public void handle(long len) { logger.info("retry connect to redis."); } } + // + if (worker != null && !worker.isClosed()) worker.close(); } private SyncMode trySync(final String reply) throws IOException { logger.info(reply); if (reply.startsWith("FULLRESYNC")) { //sync dump - parseDump(this); + parseDump(this, this.eventQueue); //after parsed dump file,cache master run id and offset so that next psync. String[] ary = reply.split(" "); configuration.setMasterRunId(ary[1]); @@ -169,12 +174,12 @@ private SyncMode trySync(final String reply) throws IOException { //server don't support psync logger.info("SYNC"); send("SYNC".getBytes()); - parseDump(this); + parseDump(this, this.eventQueue); return SyncMode.SYNC; } } - private void parseDump(final Replicator replicator) throws IOException { + private void parseDump(final AbstractReplicator replicator, final BlockingQueue eventQueue) throws IOException { //sync dump String reply = (String) replyParser.parse(new BulkReplyHandler() { @Override @@ -184,7 +189,7 @@ public String handle(long len, RedisInputStream in) throws IOException { logger.info("Discard " + len + " bytes"); in.skip(len); } else { - RdbParser parser = new RdbParser(in, replicator); + RdbParser parser = new RdbParser(in, replicator, eventQueue); parser.parse(); } return "OK"; @@ -291,6 +296,10 @@ private void connect() throws IOException { replyParser = new ReplyParser(inputStream); } + private void close0() throws IOException { + + } + @Override public void close() throws IOException { if (!connected.compareAndSet(true, false)) return; diff --git a/src/main/java/com/moilioncircle/redis/replicator/rdb/AbstractRdbParser.java b/src/main/java/com/moilioncircle/redis/replicator/rdb/AbstractRdbParser.java index c5b18cd2..1744dc93 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/rdb/AbstractRdbParser.java +++ b/src/main/java/com/moilioncircle/redis/replicator/rdb/AbstractRdbParser.java @@ -1,12 +1,13 @@ package com.moilioncircle.redis.replicator.rdb; -import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.AbstractReplicator; import com.moilioncircle.redis.replicator.io.RedisInputStream; import com.moilioncircle.redis.replicator.util.Lzf; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; +import java.util.concurrent.BlockingQueue; import static com.moilioncircle.redis.replicator.Constants.*; @@ -18,14 +19,17 @@ public abstract class AbstractRdbParser { protected final RedisInputStream in; - protected final Replicator replicator; + protected final AbstractReplicator replicator; - public AbstractRdbParser(RedisInputStream in, Replicator replicator) { + protected final BlockingQueue eventQueue; + + public AbstractRdbParser(RedisInputStream in, AbstractReplicator replicator, BlockingQueue eventQueue) { this.in = in; this.replicator = replicator; + this.eventQueue = eventQueue; } - protected long rdbLoad() throws IOException { + protected long rdbLoad() throws IOException, InterruptedException { throw new UnsupportedOperationException("rdbLoad()"); } diff --git a/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb6Parser.java b/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb6Parser.java index 69253ead..c166f205 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb6Parser.java +++ b/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb6Parser.java @@ -1,12 +1,13 @@ package com.moilioncircle.redis.replicator.rdb; -import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.AbstractReplicator; import com.moilioncircle.redis.replicator.io.RedisInputStream; import com.moilioncircle.redis.replicator.rdb.datatype.*; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.*; +import java.util.concurrent.BlockingQueue; import static com.moilioncircle.redis.replicator.Constants.*; @@ -15,11 +16,11 @@ */ public class Rdb6Parser extends AbstractRdbParser { - public Rdb6Parser(RedisInputStream in, Replicator replicator) { - super(in, replicator); + public Rdb6Parser(RedisInputStream in, AbstractReplicator replicator, BlockingQueue eventQueue) { + super(in, replicator, eventQueue); } - protected long rdbLoad() throws IOException { + protected long rdbLoad() throws IOException, InterruptedException { Db db = null; /** * rdb @@ -110,8 +111,8 @@ protected long rdbLoad() throws IOException { } if (kv == null) continue; if (logger.isDebugEnabled()) logger.debug(kv); - if (!replicator.doRdbFilter(kv)) continue; - replicator.doRdbHandler(kv); + //submit event + this.eventQueue.put(kv); } return in.total(); } diff --git a/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb7Parser.java b/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb7Parser.java index 3079339e..4f30af7d 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb7Parser.java +++ b/src/main/java/com/moilioncircle/redis/replicator/rdb/Rdb7Parser.java @@ -1,12 +1,13 @@ package com.moilioncircle.redis.replicator.rdb; -import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.AbstractReplicator; import com.moilioncircle.redis.replicator.io.RedisInputStream; import com.moilioncircle.redis.replicator.rdb.datatype.*; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.*; +import java.util.concurrent.BlockingQueue; import static com.moilioncircle.redis.replicator.Constants.*; @@ -15,11 +16,11 @@ */ public class Rdb7Parser extends AbstractRdbParser { - public Rdb7Parser(RedisInputStream in, Replicator replicator) { - super(in, replicator); + public Rdb7Parser(RedisInputStream in, AbstractReplicator replicator, BlockingQueue eventQueue) { + super(in, replicator, eventQueue); } - protected long rdbLoad() throws IOException { + protected long rdbLoad() throws IOException, InterruptedException { Db db = null; /** * rdb @@ -126,8 +127,8 @@ protected long rdbLoad() throws IOException { } if (kv == null) continue; if (logger.isDebugEnabled()) logger.debug(kv); - if (!replicator.doRdbFilter(kv)) continue; - replicator.doRdbHandler(kv); + //submit event + this.eventQueue.put(kv); } return in.total(); } diff --git a/src/main/java/com/moilioncircle/redis/replicator/rdb/RdbParser.java b/src/main/java/com/moilioncircle/redis/replicator/rdb/RdbParser.java index a48a17e6..f68c4e34 100644 --- a/src/main/java/com/moilioncircle/redis/replicator/rdb/RdbParser.java +++ b/src/main/java/com/moilioncircle/redis/replicator/rdb/RdbParser.java @@ -16,10 +16,11 @@ package com.moilioncircle.redis.replicator.rdb; -import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.AbstractReplicator; import com.moilioncircle.redis.replicator.io.RedisInputStream; import java.io.IOException; +import java.util.concurrent.BlockingQueue; /** * Redis RDB format @@ -27,14 +28,14 @@ * rdb version 7 * * @author leon.chen - * [https://github.com/antirez/redis/blob/3.0/src/rdb.c] - * [https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format] + * [https://github.com/antirez/redis/blob/3.0/src/rdb.c] + * [https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format] * @since 2016/8/11 */ public class RdbParser extends AbstractRdbParser { - public RdbParser(RedisInputStream in, Replicator replicator) { - super(in, replicator); + public RdbParser(RedisInputStream in, AbstractReplicator replicator, BlockingQueue eventQueue) { + super(in, replicator, eventQueue); } /** @@ -68,39 +69,45 @@ public RdbParser(RedisInputStream in, Replicator replicator) { * @throws IOException when read timeout */ public long parse() throws IOException { - /* + try { + /* * ---------------------------- * 52 45 44 49 53 # Magic String "REDIS" * 30 30 30 33 # RDB Version Number in big endian. In this case, version = 0003 = 3 * ---------------------------- */ - String magicString = StringHelper.str(in, 5);//REDIS - if (!magicString.equals("REDIS")) { - logger.error("Can't read MAGIC STRING [REDIS] ,value:" + magicString); - return in.total(); - } - int version = Integer.parseInt(StringHelper.str(in, 4));//0006 or 0007 - AbstractRdbParser rdbParser; - switch (version) { - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - rdbParser = new Rdb6Parser(in, replicator); - break; - case 7: - rdbParser = new Rdb7Parser(in, replicator); - break; - default: - logger.error("Can't handle RDB format version " + version); + String magicString = StringHelper.str(in, 5);//REDIS + if (!magicString.equals("REDIS")) { + logger.error("Can't read MAGIC STRING [REDIS] ,value:" + magicString); return in.total(); + } + int version = Integer.parseInt(StringHelper.str(in, 4));//0006 or 0007 + AbstractRdbParser rdbParser; + switch (version) { + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + rdbParser = new Rdb6Parser(in, replicator, eventQueue); + break; + case 7: + rdbParser = new Rdb7Parser(in, replicator, eventQueue); + break; + default: + logger.error("Can't handle RDB format version " + version); + return in.total(); + } + replicator.doPreFullSync(); + long rs = rdbParser.rdbLoad(); + replicator.doPostFullSync(); + return rs; + } catch (InterruptedException e) { + logger.error(e); + Thread.currentThread().interrupt(); + return -1; } - replicator.doPreFullSync(); - long rs = rdbParser.rdbLoad(); - replicator.doPostFullSync(); - return rs; } }