From 5b3b03e731ee9a76e8fea4881a6efcf887884647 Mon Sep 17 00:00:00 2001 From: qifanwang Date: Thu, 12 Dec 2024 15:10:13 +0800 Subject: [PATCH] fast and slow sync --- .../com/ctrip/xpipe/utils/OffsetNotifier.java | 24 ++++++++++++++----- .../keeper/store/cmd/OffsetCommandReader.java | 2 +- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/ctrip/xpipe/utils/OffsetNotifier.java b/core/src/main/java/com/ctrip/xpipe/utils/OffsetNotifier.java index 0310018ad3..57511c1058 100644 --- a/core/src/main/java/com/ctrip/xpipe/utils/OffsetNotifier.java +++ b/core/src/main/java/com/ctrip/xpipe/utils/OffsetNotifier.java @@ -27,22 +27,34 @@ protected boolean tryReleaseShared(long newOffset) { } } - private final Sync sync; + private final Sync syncFast; + private final Sync syncSlow; public OffsetNotifier(long offset) { - this.sync = new Sync(offset); + this.syncFast = new Sync(offset); + this.syncSlow = new Sync(offset); } public void await(long startOffset) throws InterruptedException { - sync.acquireSharedInterruptibly(startOffset); + syncFast.acquireSharedInterruptibly(startOffset); } - + + public void awaitSlowQueue(long startOffset) throws InterruptedException { + syncFast.acquireSharedInterruptibly(startOffset); + } + public boolean await(long startOffset, long miliSeconds) throws InterruptedException{ - return sync.tryAcquireSharedNanos(startOffset, miliSeconds * (1000*1000)); + return syncFast.tryAcquireSharedNanos(startOffset, miliSeconds * (1000*1000)); + + } + + public boolean awaitSlowQueue(long startOffset, long miliSeconds) throws InterruptedException{ + return syncSlow.tryAcquireSharedNanos(startOffset, miliSeconds * (1000*1000)); } public void offsetIncreased(long newOffset) { - sync.releaseShared(newOffset); + syncFast.releaseShared(newOffset); + syncSlow.releaseShared(newOffset); } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java index 3e3d96e20b..f8f48d878c 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java @@ -52,7 +52,7 @@ public ReferenceFileRegion doRead(long milliSeconds) throws IOException { if (null != replDelayConfig && (delayBytes = replDelayConfig.getDelayBytes()) > 0 && (delayMilli = replDelayConfig.getDelayMilli()) > 0) { logger.debug("[readDelay]{}:{}", delayBytes, delayMilli); - offsetNotifier.await(curPosition + delayBytes, delayMilli); + offsetNotifier.awaitSlowQueue(curPosition + delayBytes, delayMilli); } else if (milliSeconds >= 0) { offsetNotifier.await(curPosition, milliSeconds); } else {