diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 679a0d582..a798abd41 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -28,6 +28,7 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.util.concurrent.ExecutionException import scala.reflect.ClassTag //noinspection SimplifyAssertInspection @@ -323,21 +324,20 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assert(offset.map(_.offset))(isSome(equalTo(9L))) }, test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") { - val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i")) for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - _ <- produceMany(topic, kvs) - messagesReceived <- Ref.make[Int](0) + topic <- randomTopic + group <- randomGroup + client <- randomClient + _ <- scheduledProduce(topic, Schedule.fixed(50.millis).jittered).runDrain.forkScoped + lastProcessedOffset <- Ref.make(0L) offset <- ( Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .mapConcatZIO { record => + .mapZIO { record => for { - nr <- messagesReceived.updateAndGet(_ + 1) + nr <- lastProcessedOffset.updateAndGet(_ + 1) _ <- Consumer.stopConsumption.when(nr == 10) - } yield if (nr < 10) Seq(record.offset) else Seq.empty + } yield record.offset } .aggregateAsync(Consumer.offsetBatches) .mapZIO(_.commit) @@ -353,7 +353,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { maxRebalanceDuration = 6.seconds ) ) - } yield assertTrue(offset.map(_.offset).contains(9L)) + lastOffset <- lastProcessedOffset.get + } yield assertTrue(offset.map(_.offset).contains(lastOffset)) } @@ TestAspect.nonFlaky(2), test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: @@ -1377,9 +1378,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }: _*) @@ TestAspect.nonFlaky(2), test("running streams don't stall after a poll timeout") { for { - topic <- randomTopic - clientId <- randomClient - _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic)) + topic <- randomTopic + clientId <- randomClient + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic)).flatMap(ZIO.fromTry(_)).retryN(3).catchSome { + case e: ExecutionException + if e.getCause.isInstanceOf[org.apache.kafka.common.errors.TopicExistsException] => + ZIO.unit + } settings <- consumerSettings(clientId) consumer <- Consumer.make(settings.withPollTimeout(50.millis)) recordsOut <- Queue.unbounded[Unit] diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala similarity index 75% rename from zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala rename to zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala index 77dbc77b0..ffc79101e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala @@ -1,11 +1,13 @@ package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit import zio.test._ -object RunloopCommitOffsetsSpec extends ZIOSpecDefault { +object CommitOffsetsSpec extends ZIOSpecDefault { private val tp10 = new TopicPartition("t1", 0) private val tp11 = new TopicPartition("t1", 1) @@ -14,9 +16,9 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { private val tp22 = new TopicPartition("t2", 2) override def spec: Spec[TestEnvironment with Scope, Any] = - suite("Runloop.CommitOffsets spec")( + suite("CommitOffsets spec")( test("addCommits adds to empty CommitOffsets") { - val s1 = Runloop.CommitOffsets(Map.empty) + val s1 = CommitOffsets(Map.empty) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) assertTrue( inc == 0, @@ -24,7 +26,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits updates offset when it is higher") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L)) + val s1 = CommitOffsets(Map(tp10 -> 4L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) assertTrue( inc == 10 - 4, @@ -32,7 +34,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits ignores an offset when it is lower") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5)))) assertTrue( inc == 0, @@ -40,7 +42,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits keeps unrelated partitions") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11)))) assertTrue( inc == 0, @@ -48,7 +50,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits does it all at once") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L)))) assertTrue( inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0, @@ -56,7 +58,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits adds multiple commits") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L)) val (inc, s2) = s1.addCommits( Chunk( makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)), @@ -69,35 +71,35 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("keepPartitions removes some partitions") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val s2 = s1.keepPartitions(Set(tp10)) assertTrue(s2.offsets == Map(tp10 -> 10L)) }, test("does not 'contain' offset when tp is not present") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val result = s1.contains(tp20, 10) assertTrue(!result) }, test("does not 'contain' a higher offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp10, 11) assertTrue(!result) }, test("does 'contain' equal offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp10, 10) assertTrue(result) }, test("does 'contain' lower offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp20, 19) assertTrue(result) } ) - private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = { + private def makeCommit(offsets: Map[TopicPartition, Long]): Commit = { val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) } val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None)) - Runloop.Commit(0L, o, p) + Commit(0L, o, p) } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala new file mode 100644 index 000000000..b855febb8 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala @@ -0,0 +1,223 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RebalanceInProgressException +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.test._ +import zio.{ durationInt, Promise, UIO, ZIO } + +import java.util.{ Map => JavaMap } +import scala.jdk.CollectionConverters.MapHasAsJava + +object CommitterSpec extends ZIOSpecDefault { + private val mockMetrics = new ConsumerMetrics { + override def observePoll(resumedCount: Int, pausedCount: Int, latency: zio.Duration, pollSize: Int): UIO[Unit] = + ZIO.unit + + override def observeCommit(latency: zio.Duration): UIO[Unit] = ZIO.unit + override def observeAggregatedCommit(latency: zio.Duration, commitSize: NanoTime): UIO[Unit] = ZIO.unit + override def observeRebalance( + currentlyAssignedCount: Int, + assignedCount: Int, + revokedCount: Int, + lostCount: Int + ): UIO[Unit] = ZIO.unit + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = ZIO.unit + override def observePollAuthError(): UIO[Unit] = ZIO.unit + } + + override def spec = suite("Committer")( + test("signals that a new commit is available") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter + .make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + _ <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + } yield assertCompletes + }, + test("handles a successful commit by completing the commit effect") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + _ <- commitFiber.join + } yield assertCompletes + }, + test("handles a failed commit by completing the commit effect with a failure") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(callback.onComplete(offsets, new RuntimeException("Commit failed"))) + ) + result <- commitFiber.await + } yield assertTrue(result.isFailure) + }, + test("retries when rebalancing") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(callback.onComplete(offsets, new RebalanceInProgressException("Rebalance in progress"))) + ) + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + result <- commitFiber.await + } yield assertTrue(result.isSuccess) + }, + test("adds 1 to the committed last offset") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + _ <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped + _ <- commitAvailable.await + committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] + _ <- committer.processQueuedCommits((offsets, callback) => + committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) + ) + offsetsCommitted <- committedOffsets.await + } yield assertTrue( + offsetsCommitted == Map(tp -> new OffsetAndMetadata(2)).asJava + ) + }, + test("batches commits from multiple partitions and offsets") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + commitFiber1 <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped + commitFiber2 <- committer.commit(Map(tp -> new OffsetAndMetadata(2))).forkScoped + commitFiber3 <- committer.commit(Map(tp2 -> new OffsetAndMetadata(3))).forkScoped + _ <- commitAvailable.await + committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] + _ <- committer.processQueuedCommits((offsets, callback) => + committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) + ) + _ <- commitFiber1.join zip commitFiber2.join zip commitFiber3.join + offsetsCommitted <- committedOffsets.await + } yield assertTrue( + offsetsCommitted == Map(tp -> new OffsetAndMetadata(3), tp2 -> new OffsetAndMetadata(4)).asJava + ) + }, + test("keeps track of pending commits") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + pendingCommitsDuringCommit <- committer.pendingCommitCount + _ <- committer.updatePendingCommitsAfterPoll + pendingCommitsAfterCommit <- committer.pendingCommitCount + _ <- commitFiber.join + } yield assertTrue(pendingCommitsDuringCommit == 1 && pendingCommitsAfterCommit == 0) + }, + test("keep track of committed offsets") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + committedOffsets <- committer.getCommittedOffsets + _ <- commitFiber.join + } yield assertTrue(committedOffsets.offsets == Map(tp -> 0L)) + }, + test("clean committed offsets of no-longer assigned partitions") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + mockMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + _ <- committer.pruneCommittedOffsets(Set.empty) + committedOffsets <- committer.getCommittedOffsets + _ <- commitFiber.join + } yield assertTrue(committedOffsets.offsets.isEmpty) + } + ) @@ TestAspect.withLiveClock +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala new file mode 100644 index 000000000..78da09a45 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -0,0 +1,218 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent +import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings } +import zio.test._ +import zio.{ durationInt, Chunk, Promise, Ref, Scope, Semaphore, Task, UIO, ZIO } + +import java.util + +object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { + type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]] + + private val mockMetrics = new ConsumerMetrics { + override def observePoll(resumedCount: Int, pausedCount: Int, latency: zio.Duration, pollSize: Int): UIO[Unit] = + ZIO.unit + + override def observeCommit(latency: zio.Duration): UIO[Unit] = ZIO.unit + override def observeAggregatedCommit(latency: zio.Duration, commitSize: NanoTime): UIO[Unit] = ZIO.unit + override def observeRebalance( + currentlyAssignedCount: Int, + assignedCount: Int, + revokedCount: Int, + lostCount: Int + ): UIO[Unit] = ZIO.unit + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = ZIO.unit + override def observePollAuthError(): UIO[Unit] = ZIO.unit + } + + def spec = suite("RunloopRebalanceListener")( + test("should track assigned, revoked and lost partitions") { + for { + lastEvent <- Ref.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + tp3 = new TopicPartition("topic", 2) + tp4 = new TopicPartition("topic", 3) + listener <- makeCoordinator(lastEvent, consumer) + _ <- listener.toRebalanceListener.onAssigned(Set(tp)) + _ <- listener.toRebalanceListener.onAssigned(Set(tp4)) + _ <- listener.toRebalanceListener.onRevoked(Set(tp2)) + _ <- listener.toRebalanceListener.onLost(Set(tp3)) + event <- lastEvent.get + } yield assertTrue( + event.wasInvoked && event.assignedTps == Set(tp, tp4) && event.revokedTps == Set(tp2) && event.lostTps == Set( + tp3 + ) + ) + }, + test("should end streams for revoked and lost partitions") { + for { + lastEvent <- Ref.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + tp3 = new TopicPartition("topic", 2) + assignedStreams <- ZIO.foreach(Chunk(tp, tp2, tp3))(makeStreamControl) + listener <- makeCoordinator(lastEvent, consumer, assignedStreams = assignedStreams) + _ <- listener.toRebalanceListener.onAssigned(Set(tp)) + _ <- listener.toRebalanceListener.onRevoked(Set(tp2)) + _ <- listener.toRebalanceListener.onLost(Set(tp3)) + event <- lastEvent.get + // Lost and end partition's stream should be ended + _ <- assignedStreams(1).stream.runDrain + _ <- assignedStreams(2).stream.runDrain + } yield assertTrue(event.endedStreams.map(_.tp).toSet == Set(tp2, tp3)) + }, + suite("rebalanceSafeCommits")( + test("should wait for the last pulled offset to commit") { + for { + lastEvent <- Ref.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) { + override def commitAsync( + offsets: util.Map[TopicPartition, OffsetAndMetadata], + callback: OffsetCommitCallback + ): Unit = + // Do nothing during rebalancing + if (callback != null) callback.onComplete(offsets, null) + + override def commitSync(): Unit = () + } + tp = new TopicPartition("topic", 0) + streamControl <- makeStreamControl(tp) + records = createTestRecords(3) + recordsPulled <- Promise.make[Nothing, Unit] + _ <- streamControl.offerRecords(records) + runtime <- ZIO.runtime[Any] + committer <- LiveCommitter.make(10.seconds, Diagnostics.NoOp, mockMetrics, ZIO.unit, runtime) + + streamDrain <- + streamControl.stream + .tap(_ => recordsPulled.succeed(())) + .tap(record => + committer + .commit( + Map( + new TopicPartition("topic", record.partition) -> new OffsetAndMetadata(record.offset.offset, null) + ) + ) + ) + .runDrain + .forkScoped + listener <- + makeCoordinator( + lastEvent, + consumer, + assignedStreams = Chunk(streamControl), + rebalanceSafeCommits = true, + committer = committer + ) + _ <- listener.toRebalanceListener.onRevoked(Set(tp)) + _ <- streamDrain.join + } yield assertCompletes + }, + test("should continue if waiting for the stream to continue has timed out") { + for { + lastEvent <- Ref.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + streamControl <- makeStreamControl(tp) + records = createTestRecords(3) + recordsPulled <- Promise.make[Nothing, Unit] + _ <- streamControl.offerRecords(records) + committedOffsets <- Ref.make(CommitOffsets.empty) + done <- Promise.make[Throwable, Unit] + committer = new MockCommitter { + override val commit = + offsets => + committedOffsets + .update(_.addCommits(Chunk(Commit(0L, offsets, done)))._2) + override def getCommittedOffsets = committedOffsets.get + } + streamDrain <- + streamControl.stream + .tap(_ => recordsPulled.succeed(())) + .runDrain + .forkScoped + listener <- + makeCoordinator( + lastEvent, + consumer, + assignedStreams = Chunk(streamControl), + rebalanceSafeCommits = true, + committer = committer + ) + _ <- listener.toRebalanceListener.onRevoked(Set(tp)) + _ <- streamDrain.join + } yield assertCompletes + } + ) + ) @@ TestAspect.withLiveClock + + private def makeStreamControl(tp: TopicPartition): UIO[PartitionStreamControl] = + PartitionStreamControl.newPartitionStream(tp, ZIO.unit, Diagnostics.NoOp, 30.seconds) + + private def makeCoordinator( + lastEvent: Ref[RebalanceEvent], + mockConsumer: BinaryMockConsumer, + assignedStreams: Chunk[PartitionStreamControl] = Chunk.empty, + committer: Committer = new MockCommitter {}, + settings: ConsumerSettings = ConsumerSettings(List("")).withCommitTimeout(1.second), + rebalanceSafeCommits: Boolean = false + ): ZIO[Scope, Throwable, RebalanceCoordinator] = + Semaphore.make(1).map(new ConsumerAccess(mockConsumer, _)).map { consumerAccess => + new RebalanceCoordinator( + lastEvent, + settings.withRebalanceSafeCommits(rebalanceSafeCommits), + consumerAccess, + 5.seconds, + ZIO.succeed(assignedStreams), + committer + ) + } + + private def createTestRecords(count: Int): Chunk[ByteArrayCommittableRecord] = + Chunk.fromIterable( + (1 to count).map(i => + CommittableRecord( + record = new ConsumerRecord[Array[Byte], Array[Byte]]( + "test-topic", + 0, + i.toLong, + Array[Byte](), + Array[Byte]() + ), + commitHandle = _ => ZIO.unit, + consumerGroupMetadata = None + ) + ) + ) +} + +abstract class MockCommitter extends Committer { + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + + override def processQueuedCommits( + commitAsync: (java.util.Map[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => zio.Task[Unit], + executeOnEmpty: Boolean + ): zio.Task[Unit] = ZIO.unit + override def queueSize: UIO[Int] = ZIO.succeed(0) + override def pendingCommitCount: UIO[Int] = ZIO.succeed(0) + override def getPendingCommits: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty) + override def updatePendingCommitsAfterPoll: UIO[Unit] = ZIO.unit + override def pruneCommittedOffsets(assignedPartitions: Set[TopicPartition]): UIO[Unit] = ZIO.unit + override def getCommittedOffsets: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala new file mode 100644 index 000000000..dc2864fde --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala @@ -0,0 +1,86 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback } +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.{ Chunk, Task, UIO } + +import java.lang.Math.max +import java.util.{ Map => JavaMap } +import scala.collection.mutable + +private[internal] trait Committer { + val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] + + /** + * Takes commits from the queue, commits them and adds them to pending commits + * + * If the queue is empty, nothing happens, unless executeOnEmpty is true. + * + * @param commitAsync + * Function 'commitAsync' on the KafkaConsumer. This is isolated from the whole KafkaConsumer for testing purposes. + * The caller should ensure exclusive access to the KafkaConsumer. + * @param executeOnEmpty + * Execute commitAsync() even if there are no commits + */ + def processQueuedCommits( + commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + executeOnEmpty: Boolean = false + ): Task[Unit] + + def queueSize: UIO[Int] + + def pendingCommitCount: UIO[Int] + + def getPendingCommits: UIO[CommitOffsets] + + def updatePendingCommitsAfterPoll: UIO[Unit] + + def pruneCommittedOffsets(assignedPartitions: Set[TopicPartition]): UIO[Unit] + + def getCommittedOffsets: UIO[CommitOffsets] +} + +private[internal] object Committer { + final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { + + /** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */ + def addCommits(c: Chunk[Commit]): (Long, CommitOffsets) = { + val updatedOffsets = mutable.Map.empty[TopicPartition, Long] + updatedOffsets.sizeHint(offsets.size) + updatedOffsets ++= offsets + var offsetIncrease = 0L + c.foreach { commit => + commit.offsets.foreach { case (tp, offsetAndMeta) => + val offset = offsetAndMeta.offset() + val maxOffset = updatedOffsets.get(tp) match { + case Some(existingOffset) => + offsetIncrease += max(0L, offset - existingOffset) + max(existingOffset, offset) + case None => + // This partition was not committed to from this consumer yet. Therefore we do not know the offset + // increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0. + // Lets go with the simplest for now: ```offsetIncrease += 0``` + offset + } + updatedOffsets += tp -> maxOffset + } + } + (offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap)) + } + + def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = + CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) + + def contains(tp: TopicPartition, offset: Long): Boolean = + offsets.get(tp).exists(_ >= offset) + + def get(tp: TopicPartition): Option[Long] = offsets.get(tp) + } + + private[internal] object CommitOffsets { + val empty: CommitOffsets = CommitOffsets(Map.empty) + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala index f43329a4f..aff76f7cd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala @@ -16,7 +16,12 @@ private[internal] trait ConsumerMetrics { def observeCommit(latency: Duration): UIO[Unit] def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit] def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit] - def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] + def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] def observePollAuthError(): UIO[Unit] } @@ -330,14 +335,19 @@ private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) exten .contramap[Int](_.toDouble) .tagged(metricLabels) - override def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] = + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = for { _ <- ZIO.foreachDiscard(state.assignedStreams)(_.outstandingPolls @@ queuePollsHistogram) queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize) _ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs)) _ <- allQueueSizeHistogram.update(queueSizes.sum) _ <- pendingRequestsHistogram.update(state.pendingRequests.size) - _ <- pendingCommitsHistogram.update(state.pendingCommits.size) + _ <- pendingCommitsHistogram.update(pendingCommits) _ <- subscriptionStateGauge.update(state.subscriptionState) _ <- commandQueueSizeHistogram.update(commandQueueSize) _ <- commitQueueSizeHistogram.update(commitQueueSize) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala new file mode 100644 index 000000000..9b4ddc53c --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -0,0 +1,159 @@ +package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback } +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RebalanceInProgressException +import zio.kafka.consumer.Consumer.CommitTimeout +import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.{ durationLong, Chunk, Duration, Exit, Promise, Queue, Ref, Runtime, Scope, Task, UIO, Unsafe, ZIO } + +import java.util +import java.util.{ Map => JavaMap } +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +private[consumer] final class LiveCommitter( + commitQueue: Queue[Commit], + commitTimeout: Duration, + diagnostics: Diagnostics, + consumerMetrics: ConsumerMetrics, + onCommitAvailable: UIO[Unit], + committedOffsetsRef: Ref[CommitOffsets], + sameThreadRuntime: Runtime[Any], + pendingCommits: Ref[Chunk[Commit]] +) extends Committer { + + /** This is the implementation behind the user facing api `Offset.commit`. */ + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = + offsets => + for { + p <- Promise.make[Throwable, Unit] + startTime = java.lang.System.nanoTime() + _ <- commitQueue.offer(Commit(startTime, offsets, p)) + _ <- onCommitAvailable + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + endTime = java.lang.System.nanoTime() + latency = (endTime - startTime).nanoseconds + _ <- consumerMetrics.observeCommit(latency) + } yield () + + override def processQueuedCommits( + commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + executeOnEmpty: Boolean = false + ): Task[Unit] = for { + commits <- commitQueue.takeAll + _ <- ZIO.logDebug(s"Processing ${commits.size} commits") + _ <- ZIO.unless(commits.isEmpty && !executeOnEmpty) { + val (offsets, callback, onFailure) = asyncCommitParameters(commits) + pendingCommits.update(_ ++ commits) *> + // We don't wait for the completion of the commit here, because it + // will only complete once we poll again. + commitAsync(offsets, callback) + .catchAll(onFailure) + } + } yield () + + /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ + private def asyncCommitParameters( + commits: Chunk[Commit] + ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { + val offsets = commits + .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => + commit.offsets.foreach { case (tp, offset) => + acc += (tp -> acc + .get(tp) + .map(current => if (current.offset() > offset.offset()) current else offset) + .getOrElse(offset)) + } + acc + } + .toMap + val offsetsWithMetaData = offsets.map { case (tp, offset) => + tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) + } + val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) + // We assume the commit is started immediately after returning from this method. + val startTime = java.lang.System.nanoTime() + val onSuccess = { + val endTime = java.lang.System.nanoTime() + val latency = (endTime - startTime).nanoseconds + for { + offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) + _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) + result <- cont(Exit.unit) + _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) + } yield result + } + val onFailure: Throwable => UIO[Unit] = { + case _: RebalanceInProgressException => + for { + _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") + _ <- commitQueue.offerAll(commits) + _ <- onCommitAvailable + } yield () + case err: Throwable => + cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) + } + val callback = + new OffsetCommitCallback { + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = + Unsafe.unsafe { implicit u => + sameThreadRuntime.unsafe.run { + if (exception eq null) onSuccess else onFailure(exception) + } + .getOrThrowFiberFailure() + } + } + (offsetsWithMetaData.asJava, callback, onFailure) + } + + override def queueSize: UIO[Int] = commitQueue.size + + override def pendingCommitCount: UIO[Int] = pendingCommits.get.map(_.size) + + override def getPendingCommits: UIO[CommitOffsets] = + pendingCommits.get.map(CommitOffsets.empty.addCommits(_)._2) + + override def updatePendingCommitsAfterPoll: UIO[Unit] = + pendingCommits.get.flatMap(ZIO.filter(_)(_.isPending)).flatMap(pendingCommits.set) + + override def pruneCommittedOffsets(assignedPartitions: Set[TopicPartition]): UIO[Unit] = + committedOffsetsRef.update(_.keepPartitions(assignedPartitions)) + + override def getCommittedOffsets: UIO[CommitOffsets] = committedOffsetsRef.get +} + +private[internal] object LiveCommitter { + def make( + commitTimeout: Duration, + diagnostics: Diagnostics, + consumerMetrics: ConsumerMetrics, + onCommitAvailable: UIO[Unit], + sameThreadRuntime: Runtime[Any] + ): ZIO[Scope, Nothing, LiveCommitter] = for { + pendingCommits <- Ref.make(Chunk.empty[Commit]) + commitQueue <- ZIO.acquireRelease(Queue.unbounded[Commit])(_.shutdown) + committedOffsetsRef <- Ref.make(CommitOffsets.empty) + } yield new LiveCommitter( + commitQueue, + commitTimeout, + diagnostics, + consumerMetrics, + onCommitAvailable, + committedOffsetsRef, + sameThreadRuntime, + pendingCommits + ) + + private[internal] final case class Commit( + createdAt: NanoTime, + offsets: Map[TopicPartition, OffsetAndMetadata], + cont: Promise[Throwable, Unit] + ) { + @inline def isDone: UIO[Boolean] = cont.isDone + @inline def isPending: UIO[Boolean] = isDone.negate + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala new file mode 100644 index 000000000..066bfdf15 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala @@ -0,0 +1,292 @@ +package zio.kafka.consumer.internal +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer +import zio.kafka.consumer.internal.RebalanceCoordinator.{ + EndOffsetCommitPending, + EndOffsetCommitted, + EndOffsetNotCommitted, + RebalanceEvent, + StreamCompletionStatus +} +import zio.kafka.consumer.{ ConsumerSettings, RebalanceListener } +import zio.stream.ZStream +import zio.{ durationInt, Chunk, Duration, Ref, Task, UIO, ZIO } + +/** + * The Runloop's RebalanceListener gets notified of partitions that are assigned, revoked and lost + * + * Because this happens during the call to `poll()`, we communicate any results to the Runloop via a `Ref` + * + * When rebalanceSafeCommits is enabled, we await completion of all revoked partitions' streams and their commits before + * continuing. + */ +private[internal] class RebalanceCoordinator( + lastRebalanceEvent: Ref[RebalanceEvent], + settings: ConsumerSettings, + consumer: ConsumerAccess, + maxRebalanceDuration: Duration, + getCurrentAssignedStreams: UIO[Chunk[PartitionStreamControl]], + committer: Committer +) { + private val commitTimeoutNanos = settings.commitTimeout.toNanos + + private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing + private val rebalanceSafeCommits = settings.rebalanceSafeCommits + private val commitTimeout = settings.commitTimeout + + // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This + // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the + // rebalance listener. + // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, + // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to + // another thread cannot be used. + + // Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. + private val commitQueuePollInterval = 100.millis + + def getAndResetLastEvent: UIO[RebalanceEvent] = + lastRebalanceEvent.getAndSet(RebalanceEvent.None) + + // End streams from the rebalance listener. + // When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. + private def endStreams(streamsToEnd: Chunk[PartitionStreamControl]): Task[Any] = + ZIO.unless(streamsToEnd.isEmpty) { + for { + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, streamsToEnd)).when(rebalanceSafeCommits) + } yield () + } + + private def doAwaitStreamCommits( + consumer: ByteArrayKafkaConsumer, + streamsToEnd: Chunk[PartitionStreamControl] + ): Task[Unit] = { + val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos + + def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L + + def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = + "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") + + def getStreamCompletionStatuses: UIO[Chunk[StreamCompletionStatus]] = + for { + committedOffsets <- committer.getCommittedOffsets + latestPendingCommitOffsets <- committer.getPendingCommits.map(_.offsets) + streamResults <- + ZIO.foreach(streamsToEnd) { stream => + for { + isDone <- stream.completedPromise.isDone + lastPulledOffset <- stream.lastPulledOffset + endOffset <- if (isDone) stream.completedPromise.await else ZIO.none + + endOffsetCommitStatus = + endOffset match { + case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => + EndOffsetCommitted + case Some(endOffset) if latestPendingCommitOffsets.get(stream.tp).contains(endOffset.offset) => + EndOffsetCommitPending + case _ => EndOffsetNotCommitted + } + } yield StreamCompletionStatus( + stream.tp, + isDone, + lastPulledOffset.map(_.offset), + committedOffsets.get(stream.tp), + endOffsetCommitStatus + ) + } + } yield streamResults + + @inline + def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { + val statusStrings = completionStatusesAsString(completionStatuses) + ZIO.logDebug( + s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + + s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" + ) + } + + def logInitialStreamCompletionStatuses: UIO[Unit] = + for { + completionStatuses <- getStreamCompletionStatuses + _ <- logStreamCompletionStatuses(completionStatuses) + } yield () + + def endingStreamsCompletedAndCommitsExist: UIO[Boolean] = + for { + completionStatuses <- getStreamCompletionStatuses + _ <- logStreamCompletionStatuses(completionStatuses) + } yield completionStatuses.forall { status => + // A stream is complete when it never got any records, or when it committed the offset of the last consumed record + status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) + } + + def logFinalStreamCompletionStatuses(completed: Boolean): UIO[Unit] = + if (completed) + ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") + else + for { + completionStatuses <- getStreamCompletionStatuses + statusStrings = completionStatusesAsString(completionStatuses) + _ <- + ZIO.logWarning( + s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + + s"the records they consumed; the rebalance will continue. " + + s"This might cause another consumer to process some records again. $statusStrings" + ) + } yield () + + def commitSync: Task[Unit] = + ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) + + // Outline: + // - Every `commitQueuePollInterval` until the deadline has been reached: + // - Get all commits from the commit queue. + // - Start an async commit for these commits. + // - Collect all these new (pending) commits. + // - repeat the above until: + // - All streams that were ended have completed their work, and + // - we have seen a completed or pending commit for all end-offsets. + // An end-offset of a stream is the offset of the last record given to that stream. + // - Do a single sync commit without any offsets, this has the side-effect of blocking until all + // preceding async commits are complete (this requires kafka-client 3.6.0 or later). + // Because all commits created here (including those from non-ending streams) are now complete, we do not + // have to add them to the pending commits of the runloop state. + // + // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. + // Instead, we poll the queue in a loop. + for { + _ <- logInitialStreamCompletionStatuses + completed <- + ZStream + .fromZIO(blockingSleep(commitQueuePollInterval)) + .forever + // Even if there is nothing to commit, continue to drive communication with the broker + // so that commits can complete and the streams can make progress, by setting + // executeOnEmpty = true + .tap(_ => + committer.processQueuedCommits( + (offsets, callback) => ZIO.attempt(consumer.commitAsync(offsets, callback)), + executeOnEmpty = true + ) + ) + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .mapZIO(_ => endingStreamsCompletedAndCommitsExist) + .takeUntil(completed => completed) + .runLast + .map(_.getOrElse(false)) + _ <- logFinalStreamCompletionStatuses(completed) + _ <- commitSync + _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } yield () + } + + // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. + // We do not know the order in which the call-back methods are invoked. + // + // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the + // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, + // the ref is updated. + // + // Each method: + // - emits a diagnostic event + // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to + // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll + // - ends streams that need to be ended + // - updates `lastRebalanceEvent` + // + def toRebalanceListener: RebalanceListener = RebalanceListener( + onAssigned = assignedTps => + withLastRebalanceEvent { rebalanceEvent => + for { + _ <- ZIO.logDebug { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${assignedTps.size} partitions are assigned$sameRebalance" + } + assignedStreams <- getCurrentAssignedStreams + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) assignedStreams + else Chunk.empty + _ <- endStreams(streamsToEnd) + _ <- ZIO.logTrace("onAssigned done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps ++ assignedTps, + endedStreams = rebalanceEvent.endedStreams ++ streamsToEnd + ) + }, + onRevoked = revokedTps => + withLastRebalanceEvent { rebalanceEvent => + for { + _ <- ZIO.logDebug { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${revokedTps.size} partitions are revoked$sameRebalance" + } + assignedStreams <- getCurrentAssignedStreams + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) assignedStreams + else assignedStreams.filter(control => revokedTps.contains(control.tp)) + _ <- endStreams(streamsToEnd) + _ <- ZIO.logTrace("onRevoked done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps -- revokedTps, + revokedTps = rebalanceEvent.revokedTps ++ revokedTps, + endedStreams = rebalanceEvent.endedStreams ++ streamsToEnd + ) + }, + onLost = lostTps => + withLastRebalanceEvent { rebalanceEvent => + for { + _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") + assignedStreams <- getCurrentAssignedStreams + lostStreams = assignedStreams.filter(control => lostTps.contains(control.tp)) + _ <- ZIO.foreachDiscard(lostStreams)(_.lost) + _ <- ZIO.logTrace(s"onLost done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps -- lostTps, + lostTps = rebalanceEvent.lostTps ++ lostTps, + endedStreams = rebalanceEvent.endedStreams ++ lostStreams + ) + } + ) + + private def withLastRebalanceEvent(f: RebalanceEvent => Task[RebalanceEvent]): Task[Unit] = + lastRebalanceEvent.get.flatMap(f).flatMap(lastRebalanceEvent.set) + +} + +private[internal] object RebalanceCoordinator { + + sealed trait EndOffsetCommitStatus + case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } + case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } + case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } + + final case class StreamCompletionStatus( + tp: TopicPartition, + streamEnded: Boolean, + lastPulledOffset: Option[Long], + lastCommittedOffset: Option[Long], + endOffsetCommitStatus: EndOffsetCommitStatus + ) { + override def toString: String = + s"$tp: " + + s"${if (streamEnded) "stream ended" else "stream is running"}, " + + s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + + s"last committed offset=${lastCommittedOffset.getOrElse("none")}, " + + endOffsetCommitStatus + } + + final case class RebalanceEvent( + wasInvoked: Boolean, + assignedTps: Set[TopicPartition], + revokedTps: Set[TopicPartition], + lostTps: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ) + + object RebalanceEvent { + val None: RebalanceEvent = + RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 8d45f8e55..192d1b651 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -2,21 +2,18 @@ package zio.kafka.consumer.internal import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException, RebalanceInProgressException } +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import zio._ -import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval } +import zio.kafka.consumer.Consumer.OffsetRetrieval import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.Runloop._ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent import zio.stream._ -import java.lang.Math.max -import java.util -import java.util.{ Map => JavaMap } -import scala.collection.mutable import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection @@ -25,24 +22,15 @@ private[consumer] final class Runloop private ( topLevelExecutor: Executor, sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, - commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], - lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, - maxRebalanceDuration: Duration, currentStateRef: Ref[State], - committedOffsetsRef: Ref[CommitOffsets] + rebalanceCoordinator: RebalanceCoordinator, + consumerMetrics: ConsumerMetrics, + committer: Committer ) { - private val commitTimeout = settings.commitTimeout - private val commitTimeoutNanos = settings.commitTimeout.toNanos - - private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing - private val rebalanceSafeCommits = settings.rebalanceSafeCommits - - private val consumerMetrics = new ZioConsumerMetrics(settings.metricLabels) - private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream( tp, @@ -81,236 +69,6 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private def makeRebalanceListener: ConsumerRebalanceListener = { - // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This - // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the - // rebalance listener. - // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, - // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to - // another thread cannot be used. - - // Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. - val commitQueuePollInterval = 100.millis - - // End streams from the rebalance listener. - // When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. - def endStreams(state: State, streamsToEnd: Chunk[PartitionStreamControl]): Task[Unit] = - if (streamsToEnd.isEmpty) ZIO.unit - else { - for { - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) - _ <- if (rebalanceSafeCommits) - consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) - else ZIO.unit - } yield () - } - - def doAwaitStreamCommits( - consumer: ByteArrayKafkaConsumer, - state: State, - streamsToEnd: Chunk[PartitionStreamControl] - ): Task[Unit] = { - val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos - - def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L - - val endingTps = streamsToEnd.map(_.tp).toSet - - def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = - commits.filter(commit => (commit.offsets.keySet intersect endingTps).nonEmpty) - - lazy val previousPendingCommits: Chunk[Commit] = - commitsOfEndingStreams(state.pendingCommits) - - def commitAsync(commits: Chunk[Commit]): UIO[Unit] = - if (commits.nonEmpty) { - val (offsets, callback, onFailure) = asyncCommitParameters(commits) - ZIO.logDebug(s"Async commit of ${offsets.size} offsets for ${commits.size} commits") *> - ZIO.attempt(consumer.commitAsync(offsets, callback)).catchAll(onFailure) - } else { - // Continue to drive communication with the broker so that commits can complete and the streams can - // make progress. - ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie - } - - sealed trait EndOffsetCommitStatus - case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } - case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } - case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } - - final case class StreamCompletionStatus( - tp: TopicPartition, - streamEnded: Boolean, - lastPulledOffset: Option[Long], - endOffsetCommitStatus: EndOffsetCommitStatus - ) { - override def toString: String = - s"${tp}: " + - s"${if (streamEnded) "stream ended" else "stream is running"}, " + - s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + - endOffsetCommitStatus - } - - def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = - "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") - - def getStreamCompletionStatuses(newCommits: Chunk[Commit]): UIO[Chunk[StreamCompletionStatus]] = - for { - committedOffsets <- committedOffsetsRef.get - allPendingCommitOffsets = - (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map { - case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset()) - } - streamResults <- - ZIO.foreach(streamsToEnd) { stream => - for { - isDone <- stream.completedPromise.isDone - lastPulledOffset <- stream.lastPulledOffset - endOffset <- if (isDone) stream.completedPromise.await else ZIO.none - - endOffsetCommitStatus = - endOffset match { - case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => - EndOffsetCommitted - case Some(endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) => - EndOffsetCommitPending - case _ => EndOffsetNotCommitted - } - } yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus) - } - } yield streamResults - - @inline - def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { - val statusStrings = completionStatusesAsString(completionStatuses) - ZIO.logInfo( - s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + - s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" - ) - } - - def logInitialStreamCompletionStatuses: UIO[Unit] = - for { - completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk.empty) - _ <- logStreamCompletionStatuses(completionStatuses) - } yield () - - def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] = - for { - completionStatuses <- getStreamCompletionStatuses(newCommits) - _ <- logStreamCompletionStatuses(completionStatuses) - } yield completionStatuses.forall { status => - // A stream is complete when it never got any records, or when it committed the offset of the last consumed record - status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) - } - - def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): UIO[Unit] = - if (completed) - ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") - else - for { - completionStatuses <- getStreamCompletionStatuses(newCommits) - statusStrings = completionStatusesAsString(completionStatuses) - _ <- - ZIO.logWarning( - s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + - s"the records they consumed; the rebalance will continue. " + - s"This might cause another consumer to process some records again. $statusStrings" - ) - } yield () - - def commitSync: Task[Unit] = - ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) - - // Outline: - // - Every `commitQueuePollInterval` until the deadline has been reached: - // - Get all commits from the commit queue. - // - Start an async commit for these commits. - // - Collect all these new (pending) commits. - // - repeat the above until: - // - All streams that were ended have completed their work, and - // - we have seen a completed or pending commit for all end-offsets. - // An end-offset of a stream is the offset of the last record given to that stream. - // - Do a single sync commit without any offsets, this has the side-effect of blocking until all - // preceding async commits are complete (this requires kafka-client 3.6.0 or later). - // Because all commits created here (including those from non-ending streams) are now complete, we do not - // have to add them to the pending commits of the runloop state. - // - // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. - // Instead, we poll the queue in a loop. - for { - _ <- logInitialStreamCompletionStatuses - completedAndCommits <- - ZStream - .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) - .tap(commitAsync) - .forever - .takeWhile(_ => java.lang.System.nanoTime() <= deadline) - .scan(Chunk.empty[Runloop.Commit])(_ ++ _) - .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) - .takeUntil { case (completed, _) => completed } - .runLast - .map(_.getOrElse((false, Chunk.empty))) - _ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2) - _ <- commitSync - _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") - } yield () - } - - // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. - // We do not know the order in which the call-back methods are invoked. - // - // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the - // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, - // the ref is updated. - // - // Each method: - // - emits a diagnostic event - // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to - // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll - // - ends streams that need to be ended - // - updates `lastRebalanceEvent` - // - val recordRebalanceRebalancingListener = RebalanceListener( - onAssigned = assignedTps => - for { - rebalanceEvent <- lastRebalanceEvent.get - _ <- ZIO.logDebug { - val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" - s"${assignedTps.size} partitions are assigned$sameRebalance" - } - state <- currentStateRef.get - streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams - else Chunk.empty - _ <- endStreams(state, streamsToEnd) - _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) - _ <- ZIO.logTrace("onAssigned done") - } yield (), - onRevoked = revokedTps => - for { - rebalanceEvent <- lastRebalanceEvent.get - _ <- ZIO.logDebug { - val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" - s"${revokedTps.size} partitions are revoked$sameRebalance" - } - state <- currentStateRef.get - streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams - else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) - _ <- endStreams(state, streamsToEnd) - _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) - _ <- ZIO.logTrace("onRevoked done") - } yield (), - onLost = lostTps => - for { - _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") - rebalanceEvent <- lastRebalanceEvent.get - state <- currentStateRef.get - lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) - _ <- ZIO.foreachDiscard(lostStreams)(_.lost) - _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, lostStreams)) - _ <- ZIO.logTrace(s"onLost done") - } yield () - ) - // Here we just want to avoid any executor shift if the user provided listener is the noop listener. val userRebalanceListener = settings.rebalanceListener match { @@ -318,93 +76,9 @@ private[consumer] final class Runloop private ( case _ => settings.rebalanceListener.runOnExecutor(topLevelExecutor) } - RebalanceListener.toKafka(recordRebalanceRebalancingListener ++ userRebalanceListener, sameThreadRuntime) + RebalanceListener.toKafka(rebalanceCoordinator.toRebalanceListener ++ userRebalanceListener, sameThreadRuntime) } - /** This is the implementation behind the user facing api `Offset.commit`. */ - private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - startTime = java.lang.System.nanoTime() - _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - endTime = java.lang.System.nanoTime() - latency = (endTime - startTime).nanoseconds - _ <- consumerMetrics.observeCommit(latency) - } yield () - - /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ - private def asyncCommitParameters( - commits: Chunk[Runloop.Commit] - ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { - val offsets = commits - .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => - commit.offsets.foreach { case (tp, offset) => - acc += (tp -> acc - .get(tp) - .map(current => if (current.offset() > offset.offset()) current else offset) - .getOrElse(offset)) - } - acc - } - .toMap - val offsetsWithMetaData = offsets.map { case (tp, offset) => - tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) - } - val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) - // We assume the commit is started immediately after returning from this method. - val startTime = java.lang.System.nanoTime() - val onSuccess = { - val endTime = java.lang.System.nanoTime() - val latency = (endTime - startTime).nanoseconds - for { - offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) - _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) - result <- cont(Exit.unit) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) - } yield result - } - val onFailure: Throwable => UIO[Unit] = { - case _: RebalanceInProgressException => - for { - _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commitQueue.offerAll(commits) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable) - } yield () - case err: Throwable => - cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) - } - val callback = - new OffsetCommitCallback { - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = - Unsafe.unsafe { implicit u => - sameThreadRuntime.unsafe.run { - if (exception eq null) onSuccess else onFailure(exception) - } - .getOrThrowFiberFailure() - } - } - (offsetsWithMetaData.asJava, callback, onFailure) - } - - private def handleCommits(state: State, commits: Chunk[Runloop.Commit]): UIO[State] = - if (commits.isEmpty) { - ZIO.succeed(state) - } else { - val (offsets, callback, onFailure) = asyncCommitParameters(commits) - val newState = state.addPendingCommits(commits) - consumer.runloopAccess { c => - // We don't wait for the completion of the commit here, because it - // will only complete once we poll again. - ZIO.attempt(c.commitAsync(offsets, callback)) - } - .catchAll(onFailure) - .as(newState) - } - /** * Does all needed to end revoked partitions: * 1. Complete the revoked assigned streams 2. Remove from the list of pending requests @@ -468,7 +142,7 @@ private[consumer] final class Runloop private ( builder += CommittableRecord[Array[Byte], Array[Byte]]( record = consumerRecord, - commitHandle = commit, + commitHandle = committer.commit, consumerGroupMetadata = consumerGroupMetadata ) } @@ -535,7 +209,7 @@ private[consumer] final class Runloop private ( partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + - s" ${state.pendingCommits.size} pending commits," + + s" ${committer.pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) _ <- currentStateRef.set(state) @@ -559,7 +233,7 @@ private[consumer] final class Runloop private ( tpWithoutData = requestedPartitions -- providedTps ) } - pollresult <- lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { + pollresult <- rebalanceCoordinator.getAndResetLastEvent.flatMap { case RebalanceEvent(false, _, _, _, _) => // The fast track, rebalance listener was not invoked: // no assignment changes, no new commits, only new records. @@ -612,9 +286,7 @@ private[consumer] final class Runloop private ( // Remove committed offsets for partitions that are no longer assigned: // NOTE: the type annotation is needed to keep the IntelliJ compiler happy. - _ <- - committedOffsetsRef - .update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit] + _ <- committer.pruneCommittedOffsets(updatedAssignedStreams.map(_.tp).toSet): Task[Unit] _ <- consumerMetrics.observeRebalance( currentAssigned.size, @@ -656,11 +328,10 @@ private[consumer] final class Runloop private ( pollResult.ignoreRecordsForTps, pollResult.records ) - updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) - _ <- checkStreamPullInterval(pollResult.assignedStreams) + _ <- committer.updatePendingCommitsAfterPoll + _ <- checkStreamPullInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, - pendingCommits = updatedPendingCommits, assignedStreams = pollResult.assignedStreams ) } @@ -819,19 +490,21 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { - commitCommands <- commitQueue.takeAll - _ <- ZIO.logDebug( - s"Processing ${commitCommands.size} commits," + - s" ${commands.size} commands: ${commands.mkString(",")}" - ) - stateAfterCommits <- handleCommits(state, commitCommands) + _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") + _ <- consumer.runloopAccess { consumer => + committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(consumer.commitAsync(offsets, callback)) + ) + } streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } - stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand) + stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) - updatedStateAfterPoll <- if (stateAfterCommands.shouldPoll) handlePoll(stateAfterCommands) - else ZIO.succeed(stateAfterCommands) + updatedStateAfterPoll <- shouldPoll(stateAfterCommands).flatMap { + case true => handlePoll(stateAfterCommands) + case false => ZIO.succeed(stateAfterCommands) + } // Immediately poll again, after processing all new queued commands - _ <- if (updatedStateAfterPoll.shouldPoll) commandQueue.offer(RunloopCommand.Poll) else ZIO.unit + _ <- ZIO.whenZIO(shouldPoll(updatedStateAfterPoll))(commandQueue.offer(RunloopCommand.Poll)) // Save the current state for other parts of Runloop (read-only, for metrics only) _ <- currentStateRef.set(updatedStateAfterPoll) } yield updatedStateAfterPoll @@ -840,13 +513,19 @@ private[consumer] final class Runloop private ( .onError(cause => partitionsHub.offer(Take.failCause(cause))) } + def shouldPoll(state: State): UIO[Boolean] = + committer.pendingCommitCount.map { pendingCommitCount => + state.subscriptionState.isSubscribed && (state.pendingRequests.nonEmpty || pendingCommitCount > 0 || state.assignedStreams.isEmpty) + } + private def observeRunloopMetrics(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ZIO[Any, Nothing, Unit] = { val observe = for { currentState <- currentStateRef.get commandQueueSize <- commandQueue.size - commitQueueSize <- commitQueue.size + commitQueueSize <- committer.queueSize + pendingCommits <- committer.pendingCommitCount _ <- consumerMetrics - .observeRunloopMetrics(currentState, commandQueueSize, commitQueueSize) + .observeRunloopMetrics(currentState, commandQueueSize, commitQueueSize, pendingCommits) } yield () observe @@ -892,57 +571,6 @@ object Runloop { pendingRequests: Chunk[RunloopCommand.Request] ) - private final case class RebalanceEvent( - wasInvoked: Boolean, - assignedTps: Set[TopicPartition], - revokedTps: Set[TopicPartition], - lostTps: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ) { - def onAssigned( - assigned: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps ++ assigned, - endedStreams = this.endedStreams ++ endedStreams - ) - - def onRevoked( - revoked: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps -- revoked, - revokedTps = revokedTps ++ revoked, - endedStreams = this.endedStreams ++ endedStreams - ) - - def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps -- lost, - lostTps = lostTps ++ lost, - endedStreams = this.endedStreams ++ endedStreams - ) - } - - private object RebalanceEvent { - val None: RebalanceEvent = - RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) - } - - private[internal] final case class Commit( - createdAt: NanoTime, - offsets: Map[TopicPartition, OffsetAndMetadata], - cont: Promise[Throwable, Unit] - ) { - @inline def isDone: UIO[Boolean] = cont.isDone - @inline def isPending: UIO[Boolean] = isDone.negate - } - private[consumer] def make( settings: ConsumerSettings, maxStreamPullInterval: Duration, @@ -953,28 +581,42 @@ object Runloop { ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) + lastRebalanceEvent <- Ref.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial - currentStateRef <- Ref.make(initialState) - committedOffsetsRef <- Ref.make(CommitOffsets.empty) - sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) - executor <- ZIO.executor + currentStateRef <- Ref.make(initialState) + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) + executor <- ZIO.executor + metrics = new ZioConsumerMetrics(settings.metricLabels) + committer <- LiveCommitter + .make( + settings.commitTimeout, + diagnostics, + metrics, + commandQueue.offer(RunloopCommand.CommitAvailable).unit, + sameThreadRuntime + ) + rebalanceCoordinator = new RebalanceCoordinator( + lastRebalanceEvent, + settings, + consumer, + maxRebalanceDuration, + currentStateRef.get.map(_.assignedStreams), + committer + ) runloop = new Runloop( settings = settings, topLevelExecutor = executor, sameThreadRuntime = sameThreadRuntime, consumer = consumer, - commitQueue = commitQueue, commandQueue = commandQueue, - lastRebalanceEvent = lastRebalanceEvent, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval, - maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, - committedOffsetsRef = committedOffsetsRef + consumerMetrics = metrics, + rebalanceCoordinator = rebalanceCoordinator, + committer = committer ) _ <- ZIO.logDebug("Starting Runloop") @@ -995,62 +637,18 @@ object Runloop { private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], - pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) { - def addPendingCommits(c: Chunk[Runloop.Commit]): State = copy(pendingCommits = pendingCommits ++ c) - def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) - - def shouldPoll: Boolean = - subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) + def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) } private object State { val initial: State = State( pendingRequests = Chunk.empty, - pendingCommits = Chunk.empty, assignedStreams = Chunk.empty, subscriptionState = SubscriptionState.NotSubscribed ) } - // package private for unit testing - private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { - - /** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */ - def addCommits(c: Chunk[Runloop.Commit]): (Long, CommitOffsets) = { - val updatedOffsets = mutable.Map.empty[TopicPartition, Long] - updatedOffsets.sizeHint(offsets.size) - updatedOffsets ++= offsets - var offsetIncrease = 0L - c.foreach { commit => - commit.offsets.foreach { case (tp, offsetAndMeta) => - val offset = offsetAndMeta.offset() - val maxOffset = updatedOffsets.get(tp) match { - case Some(existingOffset) => - offsetIncrease += max(0L, offset - existingOffset) - max(existingOffset, offset) - case None => - // This partition was not committed to from this consumer yet. Therefore we do not know the offset - // increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0. - // Lets go with the simplest for now: ```offsetIncrease += 0``` - offset - } - updatedOffsets += tp -> maxOffset - } - } - (offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap)) - } - - def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = - CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) - - def contains(tp: TopicPartition, offset: Long): Boolean = - offsets.get(tp).exists(_ >= offset) - } - - private[internal] object CommitOffsets { - val empty: CommitOffsets = CommitOffsets(Map.empty) - } }