From 0fb7fcbb3fae1b9d6847b9b89de90cb7b413dd0a Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 22:02:28 +0400 Subject: [PATCH] Cleaned implementation of the new commit interface --- build.sbt | 2 + .../zio/kafka/bench/ConsumerBenchmark.scala | 31 ++++---- .../src/test/scala/zio/kafka/AdminSpec.scala | 17 ++-- .../test/scala/zio/kafka/ProducerSpec.scala | 22 +++--- .../zio/kafka/consumer/ConsumerSpec.scala | 72 ++++++----------- .../kafka/consumer/SubscriptionsSpec.scala | 31 +++----- .../kafka/consumer/CommittableRecord.scala | 52 ------------ .../scala/zio/kafka/consumer/Committer.scala | 25 +++--- .../scala/zio/kafka/consumer/Consumer.scala | 79 +++++++++++++------ .../internal/PartitionStreamControl.scala | 12 +-- .../zio/kafka/consumer/internal/Runloop.scala | 42 ++++------ .../consumer/internal/RunloopAccess.scala | 15 ++-- .../main/scala/zio/kafka/consumer/types.scala | 58 ++++++++++++++ .../zio/kafka/producer/Transaction.scala | 2 +- .../producer/TransactionalProducer.scala | 22 ++++-- 15 files changed, 246 insertions(+), 236 deletions(-) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/types.scala diff --git a/build.sbt b/build.sbt index 049d5433e1..04cfbca5d3 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,7 @@ lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2" lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11" +lazy val zioPrelude = "dev.zio" %% "zio-prelude" % "1.0.0-RC20" enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) @@ -102,6 +103,7 @@ lazy val zioKafka = .settings(enableZIO(enableStreaming = true)) .settings( libraryDependencies ++= Seq( + zioPrelude, kafkaClients, jacksonDatabind, scalaCollectionCompat diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala index b85b413fd0..aeb8b6d6d2 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala @@ -5,13 +5,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.openjdk.jmh.annotations._ import zio.kafka.bench.ZioBenchmark.randomThing import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription } +import zio.kafka.consumer.{ Consumer, Subscription } import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.Kafka import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer } -import zio.stream.ZSink -import zio.{ durationInt, Ref, Schedule, ZIO, ZLayer } +import zio.{ Chunk, Ref, ZIO, ZLayer } import java.util.concurrent.TimeUnit @@ -60,18 +59,20 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { def throughputWithCommits(): Any = runZIO { for { counter <- Ref.make(0) - _ <- ZIO.logAnnotate("consumer", "1") { - Consumer - .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .map(_.offset) - .aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis)) - .tap(batch => counter.update(_ + batch.size)) - .map(OffsetBatch.apply) - .mapZIO(_.commit) - .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) - .runDrain - .provideSome[Kafka](env) - } + _ <- ZIO + .logAnnotate("consumer", "1") { + Consumer + .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) + .tap { _ => + counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages)) + } + .mapChunksZIO(records => + counter.update(_ + records.size) *> Consumer.commitRecords(records).as(Chunk.empty) + ) + .takeUntilZIO((_: Chunk[_]) => counter.get.map(_ >= nrMessages)) + .runDrain + } + .provideSome[Kafka](env) } yield () } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index 33416e7d01..bf0b0da7dd 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -23,7 +23,7 @@ import zio.kafka.admin.AdminClient.{ } import zio.kafka.admin.acl._ import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType } -import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription } +import zio.kafka.consumer.{ Consumer, Subscription } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit._ @@ -228,13 +228,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .flatMapPar(partitionCount)(_._2) .take(count) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](20)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20)) + .mapConcatZIO(records => Consumer.commitRecords(records).as(records)) .runCollect .provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID))) @@ -301,7 +296,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .foreach(Consumer.commitRecord) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -344,7 +339,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .foreach(Consumer.commitRecord) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -645,7 +640,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { groupInstanceId: Option[String] = None ): ZIO[Kafka, Throwable, Unit] = Consumer .plainStream(Subscription.topics(topicName), Serde.string, Serde.string) - .foreach(_.offset.commit) + .foreach(Consumer.commitRecord) .provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId)) private def getStableConsumerGroupDescription( diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index dc870f5731..457a4580fb 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -1,11 +1,13 @@ package zio.kafka +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig import zio._ import zio.kafka.admin.AdminClient.NewTopic import zio.kafka.consumer._ +import zio.kafka.consumer.types.{ Offset, OffsetBatch } import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort } import zio.kafka.producer.{ ByteRecord, Producer, Transaction, TransactionalProducer } import zio.kafka.serde.Serde @@ -26,7 +28,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { def withConsumerInt( subscription: Subscription, settings: ConsumerSettings - ): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] = + ): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, ConsumerRecord[String, Int]]]] = Consumer.make(settings).flatMap { c => c.plainStream(subscription, Serde.string, Serde.int).toQueue() } @@ -193,7 +195,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { for { messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException)) record = messages - .filter(rec => rec.record.key == key1 && rec.record.value == value1) + .filter(rec => rec.key == key1 && rec.value == value1) } yield record } } @@ -201,7 +203,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { withConsumer(Topics(Set(topic2)), settings).flatMap { consumer => for { messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == key2 && rec.record.value == value2) + record = messages.filter(rec => rec.key == key2 && rec.value == value2) } yield record } } @@ -289,7 +291,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") + record = messages.filter(_.key == "bob") } yield record } } @@ -329,7 +331,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") + record = messages.filter(_.key == "bob") } yield record } } @@ -417,7 +419,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } @@ -458,7 +460,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } } @@ -504,7 +506,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } } @@ -544,7 +546,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { aliceAccountFeesPaid, Serde.string, Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) + Some(Offset.from(aliceHadMoneyCommittableMessage)) ) } } @@ -591,7 +593,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { aliceAccountFeesPaid, Serde.string, Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) + Some(Offset.from(aliceHadMoneyCommittableMessage)) ) *> t.abort } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index d68b93a8ae..ea2f7e9cdd 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -4,6 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerPartitionAssignor, + ConsumerRecord, CooperativeStickyAssignor, RangeAssignor } @@ -19,6 +20,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ SubscriptionFinalized } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } +import zio.kafka.consumer.types.OffsetBatch import zio.kafka.producer.{ Producer, TransactionalProducer } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ @@ -58,7 +60,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("chunk sizes") { @@ -96,7 +98,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(clientId = client)) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("Consuming+provideCustomLayer") { @@ -113,7 +115,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(100) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("plainStream emits messages for a pattern subscription") { @@ -128,7 +130,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("receive only messages from the subscribed topic-partition when creating a manual subscription") { @@ -148,7 +150,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(1) .runHead .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = record.map(r => (r.record.key, r.record.value)) + kvOut = record.map(r => (r.key, r.value)) } yield assert(kvOut)(isSome(equalTo("key2" -> "msg2"))) }, test("receive from the right offset when creating a manual subscription with manual seeking") { @@ -173,7 +175,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka]( consumer(client, Some(group), offsetRetrieval = offsetRetrieval) ) - kvOut = record.map(r => (r.record.key, r.record.value)) + kvOut = record.map(r => (r.key, r.value)) } yield assert(kvOut)(isSome(equalTo("key2-3" -> "msg2-3"))) }, test("restart from the committed position") { @@ -191,13 +193,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .filter(_._1 == new TopicPartition(topic, 0)) .flatMap(_._2) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](5)) + .mapConcatZIO(records => Consumer.commitRecords(records).as(records)) .runCollect .provideSomeLayer[Kafka]( consumer(first, Some(group)) @@ -209,13 +206,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .partitionedStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) .flatMap(_._2) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](20)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20)) + .mapConcatZIO(records => Consumer.commitRecords(records).as(records)) .runCollect .provideSomeLayer[Kafka]( consumer(second, Some(group)) @@ -287,7 +279,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipWithIndex .tap { case (record, idx) => (Consumer.stopConsumption <* ZIO.logDebug("Stopped consumption")).when(idx == 3) *> - record.offset.commit <* ZIO.logDebug(s"Committed $idx") + Consumer.commitRecord(record) <* ZIO.logDebug(s"Committed $idx") } .tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") } .runDrain @@ -312,10 +304,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { for { nr <- messagesReceived.updateAndGet(_ + 1) _ <- Consumer.stopConsumption.when(nr == 10) - } yield if (nr < 10) Seq(record.offset) else Seq.empty + } yield if (nr < 10) Seq(record) else Seq.empty } - .transduce(Consumer.offsetBatches) - .mapZIO(_.commit) + .mapChunksZIO(records => Consumer.commitRecords(records).as(records)) .runDrain *> Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -339,11 +330,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { subscription = Subscription.topics(topic) offsets <- (Consumer .partitionedStream(subscription, Serde.string, Serde.string) - .flatMapPar(nrPartitions)(_._2.map(_.offset)) + .flatMapPar(nrPartitions)(_._2) .take(nrMessages.toLong) - .transduce(Consumer.offsetBatches) + .mapChunksZIO(records => Consumer.commitRecords(records).as(records)) .take(1) - .mapZIO(_.commit) .runDrain *> Consumer.committed((0 until nrPartitions).map(new TopicPartition(topic, _)).toSet)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -460,13 +450,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](5)) + .mapConcatZIO(records => Consumer.commitRecords(records).as(records)) .runCollect .provideSomeLayer[Kafka](consumer(client1, Some(group))) // Start a new consumer with manual offset before the committed offset @@ -474,7 +459,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { secondResults <- Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) .take(nrRecords.toLong - manualOffsetSeek) - .map(_.record) .runCollect .provideSomeLayer[Kafka]( consumer(client2, Some(group), offsetRetrieval = offsetRetrieval) @@ -522,7 +506,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { newMessage <- Consumer .plainStream(subscription, Serde.string, Serde.string) .take(1) - .map(r => (r.record.key(), r.record.value())) + .map(r => (r.key(), r.value())) .run(ZSink.collectAll[(String, String)]) .map(_.head) .orDie @@ -675,7 +659,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> partitionStream.mapChunksZIO { records => - OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition) + Consumer.commitRecords(records) *> messagesReceived(tp.partition) .update(_ + records.size) .as(records) } @@ -812,9 +796,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .fromIterable(partitions.map(_._2)) .flatMapPar(Int.MaxValue)(s => s) .mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapChunksZIO(Consumer.commitRecords(_).as(Chunk.empty)) .runDrain } .mapZIO(_ => drainCount.updateAndGet(_ + 1)) @@ -837,9 +819,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream(subscription, Serde.string, Serde.string) .mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapChunksZIO(Consumer.commitRecords(_).as(Chunk.empty)) .runDrain .provideSomeLayer[Kafka]( customConsumer("consumer2", Some(group)) @@ -926,7 +906,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { records.map(r => new ProducerRecord(toTopic, r.key, r.value)), Serde.string, Serde.string, - OffsetBatch(records.map(_.offset)) + OffsetBatch.from(records) ) _ <- consumedMessagesCounter.update(_ + records.size) } yield Chunk.empty @@ -1205,9 +1185,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { result <- Consumer .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(11) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) // Hangs without timeout + .mapChunksZIO(Consumer.commitRecords(_).as(Chunk.empty)) .runDrain .exit .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index d33bdab523..fb93c7df1d 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -1,14 +1,14 @@ package zio.kafka.consumer import io.github.embeddedkafka.EmbeddedKafka -import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import zio._ import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit.{ Kafka, KafkaRandom } -import zio.stream.{ ZSink, ZStream } +import zio.stream.ZStream import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ @@ -42,12 +42,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runCollect) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) (records1, records2) = records - kvOut1 = records1.map(r => (r.record.key, r.record.value)).toList - kvOut2 = records2.map(r => (r.record.key, r.record.value)).toList + kvOut1 = records1.map(r => (r.key, r.value)).toList + kvOut2 = records2.map(r => (r.key, r.value)).toList } yield assertTrue(kvOut1 == kvs) && assertTrue(kvOut2 == kvs) && - assertTrue(records1.map(_.record.topic()).forall(_ == topic1)) && - assertTrue(records2.map(_.record.topic()).forall(_ == topic2)) + assertTrue(records1.map(_.topic()).forall(_ == topic1)) && + assertTrue(records2.map(_.topic()).forall(_ == topic2)) }, test("consumes from two pattern subscriptions") { val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i")) @@ -71,12 +71,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runCollect) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) (records1, records2) = records - kvOut1 = records1.map(r => (r.record.key, r.record.value)).toList - kvOut2 = records2.map(r => (r.record.key, r.record.value)).toList + kvOut1 = records1.map(r => (r.key, r.value)).toList + kvOut2 = records2.map(r => (r.key, r.value)).toList } yield assertTrue(kvOut1 == kvs) && assertTrue(kvOut2 == kvs) && - assertTrue(records1.map(_.record.topic()).forall(_ == topic1)) && - assertTrue(records2.map(_.record.topic()).forall(_ == topic2)) + assertTrue(records1.map(_.topic()).forall(_ == topic1)) && + assertTrue(records2.map(_.topic()).forall(_ == topic2)) }, test( "gives an error when attempting to subscribe using a manual subscription when there is already a topic subscription" @@ -261,23 +261,18 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceMany(topic1, kvs) - recordsConsumed <- Ref.make(Chunk.empty[CommittableRecord[String, String]]) + recordsConsumed <- Ref.make(Chunk.empty[ConsumerRecord[String, String]]) _ <- Consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) .take(40) - .transduce( - Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink - .collectAll[CommittableRecord[String, String]] - ) - .mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) } - .flattenChunks + .mapChunksZIO(records => Consumer.commitRecords(records).as(records)) .runCollect .tap(records => recordsConsumed.update(_ ++ records)) .repeatN(24) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) consumed <- recordsConsumed.get - } yield assert(consumed.map(r => r.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2)))) + } yield assert(consumed.map(_.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2)))) } @@ TestAspect.nonFlaky(3) ) .provideSome[Scope & Kafka](producer) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala deleted file mode 100644 index aabaf426e0..0000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ /dev/null @@ -1,52 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } -import org.apache.kafka.common.TopicPartition -import zio.RIO -import zio.kafka.serde.Deserializer - -final case class CommittableRecord[K, V]( - record: ConsumerRecord[K, V], - private val consumerGroupMetadata: Option[ConsumerGroupMetadata] -) { - def deserializeWith[R, K1, V1]( - keyDeserializer: Deserializer[R, K1], - valueDeserializer: Deserializer[R, V1] - )(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] = - for { - key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) - value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) - } yield copy( - record = new ConsumerRecord[K1, V1]( - record.topic(), - record.partition(), - record.offset(), - record.timestamp(), - record.timestampType(), - record.serializedKeySize(), - record.serializedValueSize(), - key, - value, - record.headers(), - record.leaderEpoch() - ) - ) - - def key: K = record.key - def value: V = record.value() - def partition: Int = record.partition() - def timestamp: Long = record.timestamp() - - lazy val topicPartition: TopicPartition = new TopicPartition(record.topic(), record.partition()) -} - -object CommittableRecord { - def apply[K, V]( - record: ConsumerRecord[K, V], - consumerGroupMetadata: Option[ConsumerGroupMetadata] - ): CommittableRecord[K, V] = - new CommittableRecord( - record = record, - consumerGroupMetadata = consumerGroupMetadata - ) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala index a47c51e5d3..255573f244 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala @@ -1,33 +1,37 @@ package zio.kafka.consumer -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.consumer.ConsumerRecord +import zio.kafka.consumer.types.OffsetBatch import zio.kafka.utils.PendingCommit import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO } trait Committer { - def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] + def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] - final def commitAndAwait(records: Chunk[CommittableRecord[_, _]]): Task[Unit] = + final def commit(records: Chunk[ConsumerRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + commit(OffsetBatch.from(records)) + + final def commitAndAwait(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = commit(records).flatMap(_.awaitCommit) - final def commitAndForget(records: Chunk[CommittableRecord[_, _]]): UIO[Unit] = + final def commitAndForget(records: Chunk[ConsumerRecord[_, _]]): UIO[Unit] = commit(records).unit } //noinspection ConvertExpressionToSAM object Committer { - private val emptyState: (Map[TopicPartition, Long], List[Promise[Throwable, Unit]]) = - (Map.empty[TopicPartition, Long], List.empty[Promise[Throwable, Unit]]) + private val emptyState: (OffsetBatch, List[Promise[Throwable, Unit]]) = + (OffsetBatch.empty, List.empty[Promise[Throwable, Unit]]) val unit: Committer = new Committer { - override def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] = ZIO.succeed(PendingCommit.unit.asInstanceOf[PendingCommit[Throwable, Unit]]) } private[zio] def fromSchedule[R]( commitSchedule: Schedule[R, Any, Any], - commit: Map[TopicPartition, Long] => Task[Unit], + commit: OffsetBatch => Task[Unit], scope: Scope ): URIO[R, Committer] = for { @@ -45,11 +49,10 @@ object Committer { .schedule(commitSchedule) .forkIn(scope) } yield new Committer { - override def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] = for { p <- Promise.make[Throwable, Unit] - newOffsets = records.map(record => record.topicPartition -> record.record.offset()) - _ <- acc.update { case (offsets, promises) => (offsets ++ newOffsets, promises :+ p) } + _ <- acc.update { case (offsets, promises) => (offsets merge offsetBatch, promises :+ p) } } yield PendingCommit(p) } } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index bcecab5f60..70d9a7668c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -1,12 +1,18 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, Consumer => JConsumer} +import org.apache.kafka.clients.consumer.{ + Consumer => JConsumer, + ConsumerRecord, + OffsetAndMetadata, + OffsetAndTimestamp +} import org.apache.kafka.common._ import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.{ConsumerAccess, RunloopAccess} -import zio.kafka.serde.{Deserializer, Serde} +import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } +import zio.kafka.consumer.types.{ deserializeWith, OffsetBatch } +import zio.kafka.serde.{ Deserializer, Serde } import zio.kafka.utils.SslHelper import zio.stream._ @@ -32,7 +38,9 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Long]] - def commit(record: CommittableRecord[_, _]): Task[Unit] + def commit(record: ConsumerRecord[_, _]): Task[Unit] + def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] + def commit(offsetBatch: OffsetBatch): Task[Unit] /** * Retrieve the last committed offset for the given topic-partitions @@ -42,7 +50,9 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Option[OffsetAndMetadata]]] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: ConsumerRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] @@ -68,7 +78,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] /** * Create a stream with messages on the subscribed topic-partitions by topic-partition @@ -91,7 +101,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] + ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])] /** * Create a stream with all messages on the subscribed topic-partitions @@ -116,7 +126,7 @@ trait Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R, Throwable, CommittableRecord[K, V]] + ): ZStream[R, Throwable, ConsumerRecord[K, V]] /** * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit @@ -215,6 +225,15 @@ object Consumer { ): RIO[Consumer, Map[TopicPartition, Long]] = ZIO.serviceWithZIO(_.beginningOffsets(partitions, timeout)) + def commitRecord(record: ConsumerRecord[_, _]): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(record)) + + def commitRecords(records: Chunk[ConsumerRecord[_, _]]): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(records)) + + def commitOffsetBatch(offsetBatch: OffsetBatch): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(offsetBatch)) + /** * Accessor method */ @@ -251,7 +270,7 @@ object Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = + ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] = ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer)) /** @@ -264,7 +283,7 @@ object Consumer { ): ZStream[ Consumer, Throwable, - (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]]) + (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]]) ] = ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer)) @@ -276,7 +295,7 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R & Consumer, Throwable, CommittableRecord[K, V]] = + ): ZStream[R & Consumer, Throwable, ConsumerRecord[K, V]] = ZStream.serviceWithStream[Consumer]( _.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize) ) @@ -441,8 +460,14 @@ private[consumer] final class ConsumerLive private[consumer] ( offs.asScala.map { case (k, v) => k -> v.longValue() }.toMap } - override def commit(record: CommittableRecord[_, _]): Task[Unit] = - runloopAccess.commit(record) + override def commit(record: ConsumerRecord[_, _]): Task[Unit] = + runloopAccess.commit(OffsetBatch.from(record)) + + override def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = + runloopAccess.commit(OffsetBatch.from(records)) + + override def commit(offsetBatch: OffsetBatch): Task[Unit] = + runloopAccess.commit(offsetBatch) override def committed( partitions: Set[TopicPartition], @@ -452,8 +477,16 @@ private[consumer] final class ConsumerLive private[consumer] ( _.committed(partitions.asJava, timeout.asJava).asScala.map { case (k, v) => k -> Option(v) }.toMap ) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(record) + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: ConsumerRecord[_, _]): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(OffsetBatch.from(record)) + + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])( + records: Chunk[ConsumerRecord[_, _]] + ): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(OffsetBatch.from(records)) + + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(offsetBatch) override def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = runloopAccess.commitAccumBatch(commitSchedule) @@ -465,7 +498,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] = { val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) ZStream.unwrapScoped { @@ -477,10 +510,10 @@ private[consumer] final class ConsumerLive private[consumer] ( .map { _.collect { case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => - val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = + val stream: ZStream[R, Throwable, ConsumerRecord[K, V]] = if (onlyByteArraySerdes) - partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] - else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) + partitionStream.asInstanceOf[ZStream[R, Throwable, ConsumerRecord[K, V]]] + else partitionStream.mapChunksZIO(_.mapZIO(deserializeWith(keyDeserializer, valueDeserializer))) tp -> stream } @@ -492,7 +525,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] = + ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])] = partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks override def plainStream[R, K, V]( @@ -500,7 +533,7 @@ private[consumer] final class ConsumerLive private[consumer] ( keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int - ): ZStream[R, Throwable, CommittableRecord[K, V]] = + ): ZStream[R, Throwable, ConsumerRecord[K, V]] = partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar( n = Int.MaxValue, bufferSize = bufferSize @@ -523,9 +556,7 @@ private[consumer] final class ConsumerLive private[consumer] ( committer <- commitAccumBatch(commitRetryPolicy) _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - partitionStream.mapChunksZIO(records => - records.mapZIO((r: CommittableRecord[K, V]) => f(r.record)).as(records) - ) + partitionStream.mapChunksZIO(records => records.mapZIO(f).as(records)) } .mapChunksZIO(committer.commitAndAwait(_).as(Chunk.empty)) .runDrain diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 4928f7d685..1319423c2f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -2,14 +2,14 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.Runloop.ByteArrayConsumerRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } final class PartitionStreamControl private ( val tp: TopicPartition, - stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], - dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], + stream: ZStream[Any, Throwable, ByteArrayConsumerRecord], + dataQueue: Queue[Take[Throwable, ByteArrayConsumerRecord]], interruptionPromise: Promise[Throwable, Unit], completedPromise: Promise[Nothing, Unit], queueSizeRef: Ref[Int] @@ -21,7 +21,7 @@ final class PartitionStreamControl private ( ) /** Offer new data for the stream to process. */ - private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] = + private[internal] def offerRecords(data: Chunk[ByteArrayConsumerRecord]): ZIO[Any, Nothing, Unit] = queueSizeRef.update(_ + data.size) *> dataQueue.offer(Take.chunk(data)).unit def queueSize: UIO[Int] = queueSizeRef.get @@ -45,7 +45,7 @@ final class PartitionStreamControl private ( private[internal] def isRunning: ZIO[Any, Nothing, Boolean] = isCompleted.negate - private[internal] val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayCommittableRecord]) = + private[internal] val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayConsumerRecord]) = (tp, stream) } @@ -60,7 +60,7 @@ object PartitionStreamControl { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") interruptionPromise <- Promise.make[Throwable, Unit] completedPromise <- Promise.make[Nothing, Unit] - dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] + dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayConsumerRecord]] queueSize <- Ref.make(0) requestAndAwaitData = for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 10e7476199..bb36132c8a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -11,6 +11,7 @@ import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.fetch.FetchStrategy import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.types.OffsetBatch import zio.stream._ import java.util @@ -116,13 +117,8 @@ private[consumer] final class Runloop private ( } } - private[internal] def commit(record: CommittableRecord[_, _]): Task[Unit] = - commit.apply(Map(record.topicPartition -> record.record.offset())) - - private[internal] def commitOrRetry[R]( - policy: Schedule[R, Throwable, Any] - )(record: CommittableRecord[_, _]): RIO[R, Unit] = - commit(record).retry( + private[internal] def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + commit(offsetBatch).retry( Schedule.recurWhile[Throwable] { case _: RetriableCommitFailedException => true case _ => false @@ -133,14 +129,13 @@ private[consumer] final class Runloop private ( private[internal] def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = Committer.fromSchedule(commitSchedule, commit, runloopScope) - private val commit: Map[TopicPartition, Long] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - } yield () + private[internal] def commit(offsets: OffsetBatch): Task[Unit] = + for { + p <- Promise.make[Throwable, Unit] + _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + } yield () private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = { val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } @@ -206,7 +201,7 @@ private[consumer] final class Runloop private ( ignoreRecordsForTps: Set[TopicPartition], polledRecords: ConsumerRecords[Array[Byte], Array[Byte]] ): UIO[Runloop.FulfillResult] = { - type Record = CommittableRecord[Array[Byte], Array[Byte]] + type Record = ConsumerRecord[Array[Byte], Array[Byte]] // The most efficient way to get the records from [[ConsumerRecords]] per // topic-partition, is by first getting the set of topic-partitions, and @@ -219,7 +214,7 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { - consumerGroupMetadata <- getConsumerGroupMetadataIfAny + /*consumerGroupMetadata*/ _ <- getConsumerGroupMetadataIfAny // TODO Jules: Use: how? _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) @@ -227,15 +222,8 @@ private[consumer] final class Runloop private ( else { val builder = ChunkBuilder.make[Record](records.size()) val iterator = records.iterator() - while (iterator.hasNext) { - val consumerRecord = iterator.next() - builder += - CommittableRecord[Array[Byte], Array[Byte]]( - record = consumerRecord, - commitHandle = commit, - consumerGroupMetadata = consumerGroupMetadata - ) - } + while (iterator.hasNext) + builder += iterator.next() streamControl.offerRecords(builder.result()) } } @@ -545,7 +533,7 @@ private[consumer] object Runloop { } } - type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] + type ByteArrayConsumerRecord = ConsumerRecord[Array[Byte], Array[Byte]] private final case class PollResult( startingTps: Set[TopicPartition], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 2afa2f2c9a..6acf838e49 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -3,11 +3,12 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.Runloop.ByteArrayConsumerRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ CommittableRecord, Committer, ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ Committer, ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ +import zio.kafka.consumer.types.OffsetBatch private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -70,11 +71,11 @@ private[consumer] final class RunloopAccess private ( } } yield stream - def commit(record: CommittableRecord[_, _]): Task[Unit] = - withRunloopZIO(shouldStartIfNot = false)(_.commit(record)) + def commit(offsetBatch: OffsetBatch): Task[Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commit(offsetBatch)) - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = - withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record)) + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(offsetBatch)) def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] = withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))(ZIO.succeed(Committer.unit)) @@ -82,7 +83,7 @@ private[consumer] final class RunloopAccess private ( } private[consumer] object RunloopAccess { - type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord]) + type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayConsumerRecord]) def make( settings: ConsumerSettings, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala new file mode 100644 index 0000000000..5eeb37050f --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala @@ -0,0 +1,58 @@ +package zio.kafka.consumer + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import zio.kafka.serde.Deserializer +import zio.prelude.Subtype +import zio.{ Chunk, RIO } + +object types { + + @inline + private[zio] def topicPartition(record: ConsumerRecord[_, _]): TopicPartition = + new TopicPartition(record.topic(), record.partition()) + + private[zio] def deserializeWith[R, K, V]( + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V] + )(record: ConsumerRecord[Array[Byte], Array[Byte]]): RIO[R, ConsumerRecord[K, V]] = + for { + key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) + value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) + } yield new ConsumerRecord[K, V]( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + record.serializedKeySize(), + record.serializedValueSize(), + key, + value, + record.headers(), + record.leaderEpoch() + ) + + type Offset = Offset.Type + object Offset extends Subtype[(TopicPartition, Long)] { + def from(record: ConsumerRecord[_, _]): Offset = + Offset.wrap((topicPartition(record), record.offset())) + } + + type OffsetBatch = OffsetBatch.Type + object OffsetBatch extends Subtype[Map[TopicPartition, Long]] { + def empty: OffsetBatch = OffsetBatch(Map.empty) + + def from(record: ConsumerRecord[_, _]): OffsetBatch = + OffsetBatch(Map(topicPartition(record) -> record.offset())) + + def from(records: Chunk[ConsumerRecord[_, _]]): OffsetBatch = + OffsetBatch.wrap(records.map(record => topicPartition(record) -> record.offset()).toMap) + + implicit final class OffsetBatchOps(private val self: OffsetBatch) extends AnyVal { + def merge(other: OffsetBatch): OffsetBatch = ??? // TODO Jules + def add(offset: Offset): OffsetBatch = ??? // TODO Jules + } + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 4285758180..22cc095841 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -1,7 +1,7 @@ package zio.kafka.producer import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } -import zio.kafka.consumer.{ Offset, OffsetBatch } +import zio.kafka.consumer.types.{ Offset, OffsetBatch } import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort } import zio.kafka.serde.Serializer import zio.{ Chunk, IO, RIO, Ref, UIO, ZIO } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 848acc5425..cde9232109 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -1,13 +1,13 @@ package zio.kafka.producer -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } import org.apache.kafka.clients.producer.{ KafkaProducer, RecordMetadata } import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail import zio._ -import zio.kafka.consumer.OffsetBatch +import zio.kafka.consumer.types.OffsetBatch import java.util import scala.jdk.CollectionConverters._ @@ -26,7 +26,10 @@ object TransactionalProducer { ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) - private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = { + private def commitTransactionWithOffsets( + offsetBatch: OffsetBatch, + consumerGroupMetadata: Option[ConsumerGroupMetadata] + ): Task[Unit] = { val sendOffsetsToTransaction: Task[Unit] = ZIO.suspendSucceed { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = @@ -36,11 +39,11 @@ object TransactionalProducer { ) ) - offsetBatch.consumerGroupMetadata match { + consumerGroupMetadata match { case None => invalidGroupIdException case Some(consumerGroupMetadata) => val offsets: util.Map[TopicPartition, OffsetAndMetadata] = - offsetBatch.offsets.map { case (topicPartition, offset) => + offsetBatch.map { case (topicPartition, offset) => topicPartition -> new OffsetAndMetadata(offset + 1) }.asJava @@ -48,14 +51,19 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) + sendOffsetsToTransaction.when(offsetBatch.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) } private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get - .flatMap(offsetBatch => commitTransactionWithOffsets(offsetBatch).retryN(5).orDie) + .flatMap(offsetBatch => + commitTransactionWithOffsets( + offsetBatch, + consumerGroupMetadata = None // TODO Jules + ).retryN(5).orDie + ) case Exit.Failure(Fail(UserInitiatedAbort, _)) => abortTransaction.retryN(5).orDie case Exit.Failure(_) => abortTransaction.retryN(5).orDie }