Skip to content

Commit

Permalink
fast and slow sync
Browse files Browse the repository at this point in the history
  • Loading branch information
qifanwang committed Dec 12, 2024
1 parent a195c95 commit 5b3b03e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
24 changes: 18 additions & 6 deletions core/src/main/java/com/ctrip/xpipe/utils/OffsetNotifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,34 @@ protected boolean tryReleaseShared(long newOffset) {
}
}

private final Sync sync;
private final Sync syncFast;
private final Sync syncSlow;

public OffsetNotifier(long offset) {
this.sync = new Sync(offset);
this.syncFast = new Sync(offset);
this.syncSlow = new Sync(offset);
}

public void await(long startOffset) throws InterruptedException {
sync.acquireSharedInterruptibly(startOffset);
syncFast.acquireSharedInterruptibly(startOffset);
}


public void awaitSlowQueue(long startOffset) throws InterruptedException {
syncFast.acquireSharedInterruptibly(startOffset);
}

public boolean await(long startOffset, long miliSeconds) throws InterruptedException{
return sync.tryAcquireSharedNanos(startOffset, miliSeconds * (1000*1000));
return syncFast.tryAcquireSharedNanos(startOffset, miliSeconds * (1000*1000));

}

public boolean awaitSlowQueue(long startOffset, long miliSeconds) throws InterruptedException{
return syncSlow.tryAcquireSharedNanos(startOffset, miliSeconds * (1000*1000));

}

public void offsetIncreased(long newOffset) {
sync.releaseShared(newOffset);
syncFast.releaseShared(newOffset);
syncSlow.releaseShared(newOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ReferenceFileRegion doRead(long milliSeconds) throws IOException {
if (null != replDelayConfig
&& (delayBytes = replDelayConfig.getDelayBytes()) > 0 && (delayMilli = replDelayConfig.getDelayMilli()) > 0) {
logger.debug("[readDelay]{}:{}", delayBytes, delayMilli);
offsetNotifier.await(curPosition + delayBytes, delayMilli);
offsetNotifier.awaitSlowQueue(curPosition + delayBytes, delayMilli);
} else if (milliSeconds >= 0) {
offsetNotifier.await(curPosition, milliSeconds);
} else {
Expand Down

0 comments on commit 5b3b03e

Please sign in to comment.