Skip to content

Commit

Permalink
fix: ensure hasOngoingSendLoop.exitSafe()
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 7, 2024
1 parent 9f2edeb commit 2da6443
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) {
LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread");

// Schedule directly
scheduleSendJobInEventLoopIfNeeded(chan);
scheduleSendJobInEventLoopIfNeeded(chan, false);
}

private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
Expand All @@ -618,11 +618,6 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
}

final EventLoop eventLoop = chan.eventLoop();
if (eventLoop.inEventLoop()) {
scheduleSendJobInEventLoopIfNeeded(chan);
return;
}

if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) {
// Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
// 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
Expand All @@ -631,7 +626,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
// 2. uses eventLoop.execute() directly
// Avg latency: 3.2677197021496998s
// Avg QPS: 476925.0751855796/s
eventLoop.execute(() -> scheduleSendJobInEventLoopIfNeeded(chan));
eventLoop.execute(() -> scheduleSendJobInEventLoopIfNeeded(chan, true));
}

// Otherwise:
Expand All @@ -642,10 +637,13 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
// second loopSend0(), which will call poll()
}

private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan) {
private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan, boolean enteredSafe) {
// Guarantee only 1 send loop.
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterUnsafe()) {
BatchFlushEndPointContext.HasOngoingSendLoop hasOngoingSendLoop = chan.context.batchFlushEndPointContext.hasOngoingSendLoop;
if (hasOngoingSendLoop.tryEnterUnsafe()) {
loopSend(chan);
} else if (enteredSafe) {
hasOngoingSendLoop.exitSafe();
}
}

Expand Down Expand Up @@ -691,6 +689,8 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext
batchFlushEndPointContext.hasOngoingSendLoop.exitSafe();
// // Guarantee thread-safety: no dangling tasks in the queue.
loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false);
// chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100,
// TimeUnit.NANOSECONDS);
} else {
// The send loop will be triggered later when a new task is added,
batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe();
Expand Down

0 comments on commit 2da6443

Please sign in to comment.