Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Committer and RebalanceCoordinator classes from Runloop + unit tests #1375

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
fd0c49d
Refactor stream completion status in preparation of additional logging
svroonland Nov 3, 2024
0d6258b
Add logging + fix condition
svroonland Nov 3, 2024
399dcac
Also some debug logging while waiting
svroonland Nov 3, 2024
c56dc69
Fix lint
svroonland Nov 3, 2024
11b3238
Increase timeout
svroonland Nov 3, 2024
dc38739
Correct race condition
svroonland Nov 5, 2024
2ee212c
Remove sequential
svroonland Nov 5, 2024
9caee50
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
e034b77
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
5b2d13e
Merge branch 'master' into more-rebalance-safe-commits-logging
svroonland Nov 6, 2024
221b283
Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.s…
svroonland Nov 6, 2024
bad1def
Stricter comparison of pending commit offset to last pulled offset
svroonland Nov 9, 2024
a7f7cd2
Merge remote-tracking branch 'origin/master' into more-rebalance-safe…
svroonland Nov 9, 2024
cc377dd
More rebalance safe commits logging alt (#1367)
erikvanoosten Nov 10, 2024
7994954
Remove withFilter usage
svroonland Nov 10, 2024
d3e0270
Fix timeToDeadlineMillis
svroonland Nov 10, 2024
54494f3
Extract a Committer and RunloopRebalanceListener from the Runloop
svroonland Nov 10, 2024
1e47de2
Fix unstable test, seems unrelated
svroonland Nov 10, 2024
ac4e978
Fix timeToDeadlineMillis
svroonland Nov 10, 2024
5917c1d
Small convenience method
svroonland Nov 10, 2024
dc614da
Adjust unrepresentative test
svroonland Nov 10, 2024
02532da
Document parameter
svroonland Nov 10, 2024
6ad6eaf
Inline RebalanceEvent modifications
svroonland Nov 10, 2024
9c36a5d
Use startTime in Commit
svroonland Nov 10, 2024
07189e2
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 10, 2024
d6db595
Merge commit '727e7d7acce4cb0a6f301451e1b25df3dd887e2b' into separate…
svroonland Nov 10, 2024
02a36ee
Merge remote-tracking branch 'origin/master' into separate-rebalance-…
svroonland Nov 10, 2024
d1957d8
Do not depend on entire ConsumerSettings in Committer
svroonland Nov 12, 2024
807fdf9
Make Committer better testable
svroonland Nov 12, 2024
98a1031
Tests for Committer
svroonland Nov 12, 2024
810362b
Let RunloopRebalanceListener own its last event
svroonland Nov 12, 2024
2310ea1
Extract Committer trait
svroonland Nov 13, 2024
51d3a06
Add last committed offset to stream completion status
svroonland Nov 13, 2024
3439e10
Some tests for the rebalance listener
svroonland Nov 13, 2024
8fb3622
Renames
svroonland Nov 13, 2024
b60bd8e
More tests + cleanup
svroonland Nov 13, 2024
c995799
Merge commit '1f8d5904' into separate-rebalance-listener-file
svroonland Nov 13, 2024
7744d66
Merge commit '18aa941d' into separate-rebalance-listener-file
svroonland Nov 13, 2024
05d710c
Merge commit 'e1a41448' into separate-rebalance-listener-file
svroonland Nov 13, 2024
ae15221
Merge commit '504074f6' into separate-rebalance-listener-file
svroonland Nov 13, 2024
4acefdf
Move CommittOffsets to Committer as it is part of its interface
svroonland Nov 13, 2024
0fd4114
Cleanup
svroonland Nov 13, 2024
3915b08
cFix doc
svroonland Nov 13, 2024
e86d757
Formatting
svroonland Nov 13, 2024
ea59926
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 13, 2024
4072dae
Merge branch 'master' into separate-rebalance-listener-file
svroonland Nov 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.ExecutionException
import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
Expand Down Expand Up @@ -769,7 +770,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
clientId = clientId,
groupId = Some(groupId),
`max.poll.records` = 1,
rebalanceSafeCommits = rebalanceSafeCommits
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = 60.seconds
)
consumer <- Consumer.make(settings)
} yield consumer
Expand Down Expand Up @@ -1376,9 +1378,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}: _*) @@ TestAspect.nonFlaky(2),
test("running streams don't stall after a poll timeout") {
for {
topic <- randomTopic
clientId <- randomClient
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic))
topic <- randomTopic
clientId <- randomClient
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic)).flatMap(ZIO.fromTry(_)).retryN(3).catchSome {
case e: ExecutionException
if e.getCause.isInstanceOf[org.apache.kafka.common.errors.TopicExistsException] =>
ZIO.unit
}
settings <- consumerSettings(clientId)
consumer <- Consumer.make(settings.withPollTimeout(50.millis))
recordsOut <- Queue.unbounded[Unit]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import zio._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import zio.kafka.consumer.internal.Committer.{ Commit, CommitOffsets }
import zio.test._

object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
object CommitOffsetsSpec extends ZIOSpecDefault {

private val tp10 = new TopicPartition("t1", 0)
private val tp11 = new TopicPartition("t1", 1)
Expand All @@ -14,49 +15,49 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
private val tp22 = new TopicPartition("t2", 2)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("Runloop.CommitOffsets spec")(
suite("CommitOffsets spec")(
test("addCommits adds to empty CommitOffsets") {
val s1 = Runloop.CommitOffsets(Map.empty)
val s1 = CommitOffsets(Map.empty)
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits updates offset when it is higher") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L))
val s1 = CommitOffsets(Map(tp10 -> 4L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 10 - 4,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits ignores an offset when it is lower") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits keeps unrelated partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L)
)
},
test("addCommits does it all at once") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L))))
assertTrue(
inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 206L, tp21 -> 210L, tp22 -> 220L)
)
},
test("addCommits adds multiple commits") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L))
val (inc, s2) = s1.addCommits(
Chunk(
makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)),
Expand All @@ -69,35 +70,35 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
)
},
test("keepPartitions removes some partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s2 = s1.keepPartitions(Set(tp10))
assertTrue(s2.offsets == Map(tp10 -> 10L))
},
test("does not 'contain' offset when tp is not present") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val result = s1.contains(tp20, 10)
assertTrue(!result)
},
test("does not 'contain' a higher offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 11)
assertTrue(!result)
},
test("does 'contain' equal offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 10)
assertTrue(result)
},
test("does 'contain' lower offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp20, 19)
assertTrue(result)
}
)

private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = {
private def makeCommit(offsets: Map[TopicPartition, Long]): Commit = {
val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) }
val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None))
Runloop.Commit(0L, o, p)
Commit(0L, o, p)
}
}
207 changes: 207 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package zio.kafka.consumer.internal
import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio.kafka.consumer.Consumer.CommitTimeout
import zio.kafka.consumer.ConsumerSettings
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.Committer.{ Commit, CommitOffsets }
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.{ durationLong, Chunk, Exit, Promise, Queue, Ref, Runtime, Scope, Task, UIO, Unsafe, ZIO }

import java.lang.Math.max
import java.util
import java.util.{ Map => JavaMap }
import scala.collection.mutable
import scala.jdk.CollectionConverters._

private[consumer] final class Committer(
commitQueue: Queue[Commit],
settings: ConsumerSettings,
diagnostics: Diagnostics,
consumerMetrics: ConsumerMetrics,
onCommitAvailable: UIO[Unit],
committedOffsetsRef: Ref[CommitOffsets],
sameThreadRuntime: Runtime[Any],
pendingCommits: Ref[Chunk[Commit]] // TODO make Commit internal
) {
private val commitTimeout = settings.commitTimeout

/** This is the implementation behind the user facing api `Offset.commit`. */
val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] =
offsets =>
for {
p <- Promise.make[Throwable, Unit]
startTime = java.lang.System.nanoTime()
_ <- commitQueue.offer(Commit(java.lang.System.nanoTime(), offsets, p))
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
_ <- onCommitAvailable
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
endTime = java.lang.System.nanoTime()
latency = (endTime - startTime).nanoseconds
_ <- consumerMetrics.observeCommit(latency)
} yield ()

/**
* Takes commits from the queue, commits them and adds them to pending commits
*
* If the queue is empty, nothing happens, unless executeOnEmpty is true.
*
* @param consumer
* Consumer with exclusive access
*/
def handleNewCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean = false): Task[Unit] = for {
commits <- commitQueue.takeAll
_ <- ZIO.logDebug(s"Processing ${commits.size} commits")
_ <- ZIO.unless(commits.isEmpty && !executeOnEmpty) {
val (offsets, callback, onFailure) = asyncCommitParameters(commits)
pendingCommits.update(_ ++ commits) *>
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
ZIO
.attempt(consumer.commitAsync(offsets, callback))
.catchAll(onFailure)
}
} yield ()

/** Merge commits and prepare parameters for calling `consumer.commitAsync`. */
private def asyncCommitParameters(
commits: Chunk[Commit]
): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = {
val offsets = commits
.foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc
.get(tp)
.map(current => if (current.offset() > offset.offset()) current else offset)
.getOrElse(offset))
}
acc
}
.toMap
val offsetsWithMetaData = offsets.map { case (tp, offset) =>
tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata)
}
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
// We assume the commit is started immediately after returning from this method.
val startTime = java.lang.System.nanoTime()
val onSuccess = {
val endTime = java.lang.System.nanoTime()
val latency = (endTime - startTime).nanoseconds
for {
offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits))
_ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty)
result <- cont(Exit.unit)
_ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
} yield result
}
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
for {
_ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried")
_ <- commitQueue.offerAll(commits)
_ <- onCommitAvailable //
} yield ()
case err: Throwable =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err))
}
val callback =
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
sameThreadRuntime.unsafe.run {
if (exception eq null) onSuccess else onFailure(exception)
}
.getOrThrowFiberFailure()
}
}
(offsetsWithMetaData.asJava, callback, onFailure)
}

def pendingCommitCount: UIO[Int] = pendingCommits.get.map(_.size)

def queueSize: UIO[Int] = commitQueue.size

def updatePendingCommitsAfterPoll: UIO[Unit] =
pendingCommits.get.flatMap(ZIO.filter(_)(_.isPending)).flatMap(pendingCommits.set)

def pruneCommittedOffsets(assignedPartitions: Set[TopicPartition]): UIO[Unit] =
committedOffsetsRef.update(_.keepPartitions(assignedPartitions))

def getCommittedOffsets: UIO[CommitOffsets] = committedOffsetsRef.get

def getPendingCommits: UIO[CommitOffsets] =
pendingCommits.get.map(CommitOffsets.empty.addCommits(_)._2)
}

private[internal] object Committer {
def make(
settings: ConsumerSettings,
diagnostics: Diagnostics,
consumerMetrics: ConsumerMetrics,
onCommitAvailable: UIO[Unit],
sameThreadRuntime: Runtime[Any]
): ZIO[Any with Scope, Nothing, Committer] = for {
pendingCommits <- Ref.make(Chunk.empty[Commit])
commitQueue <- ZIO.acquireRelease(Queue.unbounded[Commit])(_.shutdown)
committedOffsetsRef <- Ref.make(CommitOffsets.empty)
} yield new Committer(
commitQueue,
settings,
diagnostics,
consumerMetrics,
onCommitAvailable,
committedOffsetsRef,
sameThreadRuntime,
pendingCommits
)

private[internal] final case class Commit(
createdAt: NanoTime,
offsets: Map[TopicPartition, OffsetAndMetadata],
cont: Promise[Throwable, Unit]
) {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}

// package private for unit testing
private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) {

/** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */
def addCommits(c: Chunk[Commit]): (Long, CommitOffsets) = {
val updatedOffsets = mutable.Map.empty[TopicPartition, Long]
updatedOffsets.sizeHint(offsets.size)
updatedOffsets ++= offsets
var offsetIncrease = 0L
c.foreach { commit =>
commit.offsets.foreach { case (tp, offsetAndMeta) =>
val offset = offsetAndMeta.offset()
val maxOffset = updatedOffsets.get(tp) match {
case Some(existingOffset) =>
offsetIncrease += max(0L, offset - existingOffset)
max(existingOffset, offset)
case None =>
// This partition was not committed to from this consumer yet. Therefore we do not know the offset
// increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0.
// Lets go with the simplest for now: ```offsetIncrease += 0```
offset
}
updatedOffsets += tp -> maxOffset
}
}
(offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap))
}

def keepPartitions(tps: Set[TopicPartition]): CommitOffsets =
CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) })

def contains(tp: TopicPartition, offset: Long): Boolean =
offsets.get(tp).exists(_ >= offset)
}

private[internal] object CommitOffsets {
val empty: CommitOffsets = CommitOffsets(Map.empty)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ private[internal] trait ConsumerMetrics {
def observeCommit(latency: Duration): UIO[Unit]
def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit]
def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit]
def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit]
def observeRunloopMetrics(
state: Runloop.State,
commandQueueSize: Int,
commitQueueSize: Int,
pendingCommits: Int
): UIO[Unit]
def observePollAuthError(): UIO[Unit]
}

Expand Down Expand Up @@ -330,14 +335,19 @@ private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) exten
.contramap[Int](_.toDouble)
.tagged(metricLabels)

override def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] =
override def observeRunloopMetrics(
state: Runloop.State,
commandQueueSize: Int,
commitQueueSize: Int,
pendingCommits: Int
): UIO[Unit] =
for {
_ <- ZIO.foreachDiscard(state.assignedStreams)(_.outstandingPolls @@ queuePollsHistogram)
queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize)
_ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs))
_ <- allQueueSizeHistogram.update(queueSizes.sum)
_ <- pendingRequestsHistogram.update(state.pendingRequests.size)
_ <- pendingCommitsHistogram.update(state.pendingCommits.size)
_ <- pendingCommitsHistogram.update(pendingCommits)
_ <- subscriptionStateGauge.update(state.subscriptionState)
_ <- commandQueueSizeHistogram.update(commandQueueSize)
_ <- commitQueueSizeHistogram.update(commitQueueSize)
Expand Down
Loading
Loading