diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60c4dd1de..f2daad29f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: continue-on-error: true steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -48,7 +48,7 @@ jobs: continue-on-error: false steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -88,7 +88,7 @@ jobs: - name: Cache Dependencies uses: coursier/cache-action@v6 - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Test @@ -100,7 +100,7 @@ jobs: if: ${{ github.event_name == 'push' }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -129,7 +129,7 @@ jobs: app_private_key: ${{ secrets.APP_PRIVATE_KEY }} - name: Create Pull Request id: cpr - uses: peter-evans/create-pull-request@v5.0.0 + uses: peter-evans/create-pull-request@v5.0.2 with: body: |- Autogenerated changes after running the `sbt docs/generateReadme` command of the [zio-sbt-website](https://zio.dev/zio-sbt) plugin. @@ -174,7 +174,7 @@ jobs: if: ${{ github.event_name != 'pull_request' }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -203,7 +203,7 @@ jobs: if: ${{ ((github.event_name == 'release') && (github.event.action == 'published')) || (github.event_name == 'workflow_dispatch') }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -234,7 +234,7 @@ jobs: if: ${{ (github.event_name == 'release') && (github.event.action == 'published') }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: notify the main repo about the new release of docs package diff --git a/build.sbt b/build.sbt index 049d5433e..e717a33d1 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,7 @@ lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2" lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11" +lazy val zioPrelude = "dev.zio" %% "zio-prelude" % "1.0.0-RC20" enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) @@ -77,8 +78,8 @@ lazy val root = project def stdSettings(prjName: String) = Seq( name := s"$prjName", scalafmtOnCompile := !insideCI.value, - Compile / compile / scalacOptions ++= - optionsOn("2.13")("-Wconf:cat=unused-nowarn:s").value, + scalacOptions ++= optionsOn("2.13")("-Wconf:cat=unused-nowarn:s").value, + scalacOptions ++= optionsOn("2.12")("-Xfuture", "-Xsource:2.13").value, scalacOptions -= "-Xlint:infer-any", // workaround for bad constant pool issue (Compile / doc) := Def.taskDyn { @@ -102,6 +103,7 @@ lazy val zioKafka = .settings(enableZIO(enableStreaming = true)) .settings( libraryDependencies ++= Seq( + zioPrelude, kafkaClients, jacksonDatabind, scalaCollectionCompat diff --git a/project/plugins.sbt b/project/plugins.sbt index a6c9152fb..f984f070d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -val zioSbtVersion = "0.4.0-alpha.17" +val zioSbtVersion = "0.4.0-alpha.18" addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion) addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion) diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala index b85b413fd..f75357349 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala @@ -5,13 +5,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.openjdk.jmh.annotations._ import zio.kafka.bench.ZioBenchmark.randomThing import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription } +import zio.kafka.consumer.{ Consumer, Subscription } import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.Kafka import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer } -import zio.stream.ZSink -import zio.{ durationInt, Ref, Schedule, ZIO, ZLayer } +import zio.{ Ref, ZIO, ZLayer } import java.util.concurrent.TimeUnit @@ -60,18 +59,15 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { def throughputWithCommits(): Any = runZIO { for { counter <- Ref.make(0) - _ <- ZIO.logAnnotate("consumer", "1") { - Consumer - .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .map(_.offset) - .aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis)) - .tap(batch => counter.update(_ + batch.size)) - .map(OffsetBatch.apply) - .mapZIO(_.commit) - .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) - .runDrain - .provideSome[Kafka](env) - } + _ <- ZIO + .logAnnotate("consumer", "1") { + Consumer + .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) + .mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(records)) + .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) + .runDrain + .provideSome[Kafka](env) + } } yield () } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index 33416e7d0..9a103acf7 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -23,7 +23,7 @@ import zio.kafka.admin.AdminClient.{ } import zio.kafka.admin.acl._ import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType } -import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription } +import zio.kafka.consumer.{ Consumer, Subscription } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit._ @@ -228,13 +228,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .flatMapPar(partitionCount)(_._2) .take(count) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](20)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID))) @@ -301,7 +296,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -344,7 +339,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -645,7 +640,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { groupInstanceId: Option[String] = None ): ZIO[Kafka, Throwable, Unit] = Consumer .plainStream(Subscription.topics(topicName), Serde.string, Serde.string) - .foreach(_.offset.commit) + .foreach(Consumer.commit) .provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId)) private def getStableConsumerGroupDescription( diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index dc870f573..07ae088e5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -1,11 +1,13 @@ package zio.kafka +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig import zio._ import zio.kafka.admin.AdminClient.NewTopic import zio.kafka.consumer._ +import zio.kafka.consumer.types.{ Offset, OffsetBatch } import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort } import zio.kafka.producer.{ ByteRecord, Producer, Transaction, TransactionalProducer } import zio.kafka.serde.Serde @@ -26,7 +28,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { def withConsumerInt( subscription: Subscription, settings: ConsumerSettings - ): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] = + ): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, ConsumerRecord[String, Int]]]] = Consumer.make(settings).flatMap { c => c.plainStream(subscription, Serde.string, Serde.int).toQueue() } @@ -192,8 +194,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { withConsumer(Topics(Set(topic1)), settings).flatMap { consumer => for { messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException)) - record = messages - .filter(rec => rec.record.key == key1 && rec.record.value == value1) + record = messages.filter(rec => rec.key == key1 && rec.value == value1) } yield record } } @@ -201,7 +202,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { withConsumer(Topics(Set(topic2)), settings).flatMap { consumer => for { messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == key2 && rec.record.value == value2) + record = messages.filter(rec => rec.key == key2 && rec.value == value2) } yield record } } @@ -289,7 +290,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") + record = messages.filter(_.key == "bob") } yield record } } @@ -329,7 +330,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") + record = messages.filter(_.key == "bob") } yield record } } @@ -417,7 +418,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } @@ -458,7 +459,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } } @@ -504,7 +505,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } } @@ -544,7 +545,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { aliceAccountFeesPaid, Serde.string, Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) + Some(Offset.from(aliceHadMoneyCommittableMessage)) ) } } @@ -591,7 +592,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { aliceAccountFeesPaid, Serde.string, Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) + Some(Offset.from(aliceHadMoneyCommittableMessage)) ) *> t.abort } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index d68b93a8a..02b6cc28c 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -4,6 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerPartitionAssignor, + ConsumerRecord, CooperativeStickyAssignor, RangeAssignor } @@ -19,6 +20,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ SubscriptionFinalized } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } +import zio.kafka.consumer.types.OffsetBatch import zio.kafka.producer.{ Producer, TransactionalProducer } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ @@ -58,7 +60,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("chunk sizes") { @@ -96,7 +98,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(clientId = client)) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("Consuming+provideCustomLayer") { @@ -113,7 +115,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(100) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("plainStream emits messages for a pattern subscription") { @@ -128,7 +130,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("receive only messages from the subscribed topic-partition when creating a manual subscription") { @@ -148,7 +150,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(1) .runHead .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = record.map(r => (r.record.key, r.record.value)) + kvOut = record.map(r => (r.key, r.value)) } yield assert(kvOut)(isSome(equalTo("key2" -> "msg2"))) }, test("receive from the right offset when creating a manual subscription with manual seeking") { @@ -173,7 +175,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka]( consumer(client, Some(group), offsetRetrieval = offsetRetrieval) ) - kvOut = record.map(r => (r.record.key, r.record.value)) + kvOut = record.map(r => (r.key, r.value)) } yield assert(kvOut)(isSome(equalTo("key2-3" -> "msg2-3"))) }, test("restart from the committed position") { @@ -191,13 +193,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .filter(_._1 == new TopicPartition(topic, 0)) .flatMap(_._2) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](5)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka]( consumer(first, Some(group)) @@ -209,13 +206,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .partitionedStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) .flatMap(_._2) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](20)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka]( consumer(second, Some(group)) @@ -287,7 +279,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipWithIndex .tap { case (record, idx) => (Consumer.stopConsumption <* ZIO.logDebug("Stopped consumption")).when(idx == 3) *> - record.offset.commit <* ZIO.logDebug(s"Committed $idx") + Consumer.commit(record) <* ZIO.logDebug(s"Committed $idx") } .tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") } .runDrain @@ -312,10 +304,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { for { nr <- messagesReceived.updateAndGet(_ + 1) _ <- Consumer.stopConsumption.when(nr == 10) - } yield if (nr < 10) Seq(record.offset) else Seq.empty + } yield if (nr < 10) Seq(record) else Seq.empty } - .transduce(Consumer.offsetBatches) - .mapZIO(_.commit) + .mapChunksZIO(records => Consumer.commit(records).as(records)) .runDrain *> Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -339,11 +330,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { subscription = Subscription.topics(topic) offsets <- (Consumer .partitionedStream(subscription, Serde.string, Serde.string) - .flatMapPar(nrPartitions)(_._2.map(_.offset)) + .flatMapPar(nrPartitions)(_._2) .take(nrMessages.toLong) - .transduce(Consumer.offsetBatches) + .mapChunksZIO(records => Consumer.commit(records).as(records)) .take(1) - .mapZIO(_.commit) .runDrain *> Consumer.committed((0 until nrPartitions).map(new TopicPartition(topic, _)).toSet)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -460,13 +450,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](5)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka](consumer(client1, Some(group))) // Start a new consumer with manual offset before the committed offset @@ -474,7 +459,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { secondResults <- Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) .take(nrRecords.toLong - manualOffsetSeek) - .map(_.record) .runCollect .provideSomeLayer[Kafka]( consumer(client2, Some(group), offsetRetrieval = offsetRetrieval) @@ -522,7 +506,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { newMessage <- Consumer .plainStream(subscription, Serde.string, Serde.string) .take(1) - .map(r => (r.record.key(), r.record.value())) + .map(r => (r.key(), r.value())) .run(ZSink.collectAll[(String, String)]) .map(_.head) .orDie @@ -675,7 +659,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> partitionStream.mapChunksZIO { records => - OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition) + Consumer.commit(records) *> messagesReceived(tp.partition) .update(_ + records.size) .as(records) } @@ -812,9 +796,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .fromIterable(partitions.map(_._2)) .flatMapPar(Int.MaxValue)(s => s) .mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapChunksZIO(Consumer.commit(_).as(Chunk.empty)) .runDrain } .mapZIO(_ => drainCount.updateAndGet(_ + 1)) @@ -837,9 +819,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream(subscription, Serde.string, Serde.string) .mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapChunksZIO(Consumer.commit(_).as(Chunk.empty)) .runDrain .provideSomeLayer[Kafka]( customConsumer("consumer2", Some(group)) @@ -926,7 +906,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { records.map(r => new ProducerRecord(toTopic, r.key, r.value)), Serde.string, Serde.string, - OffsetBatch(records.map(_.offset)) + OffsetBatch.from(records) ) _ <- consumedMessagesCounter.update(_ + records.size) } yield Chunk.empty @@ -1205,9 +1185,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { result <- Consumer .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(11) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) // Hangs without timeout + .mapChunksZIO(Consumer.commit(_).as(Chunk.empty)) .runDrain .exit .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index d33bdab52..bc4f3bede 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -1,14 +1,14 @@ package zio.kafka.consumer import io.github.embeddedkafka.EmbeddedKafka -import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import zio._ import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit.{ Kafka, KafkaRandom } -import zio.stream.{ ZSink, ZStream } +import zio.stream.ZStream import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ @@ -42,12 +42,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runCollect) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) (records1, records2) = records - kvOut1 = records1.map(r => (r.record.key, r.record.value)).toList - kvOut2 = records2.map(r => (r.record.key, r.record.value)).toList + kvOut1 = records1.map(r => (r.key, r.value)).toList + kvOut2 = records2.map(r => (r.key, r.value)).toList } yield assertTrue(kvOut1 == kvs) && assertTrue(kvOut2 == kvs) && - assertTrue(records1.map(_.record.topic()).forall(_ == topic1)) && - assertTrue(records2.map(_.record.topic()).forall(_ == topic2)) + assertTrue(records1.map(_.topic()).forall(_ == topic1)) && + assertTrue(records2.map(_.topic()).forall(_ == topic2)) }, test("consumes from two pattern subscriptions") { val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i")) @@ -71,12 +71,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runCollect) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) (records1, records2) = records - kvOut1 = records1.map(r => (r.record.key, r.record.value)).toList - kvOut2 = records2.map(r => (r.record.key, r.record.value)).toList + kvOut1 = records1.map(r => (r.key, r.value)).toList + kvOut2 = records2.map(r => (r.key, r.value)).toList } yield assertTrue(kvOut1 == kvs) && assertTrue(kvOut2 == kvs) && - assertTrue(records1.map(_.record.topic()).forall(_ == topic1)) && - assertTrue(records2.map(_.record.topic()).forall(_ == topic2)) + assertTrue(records1.map(_.topic()).forall(_ == topic1)) && + assertTrue(records2.map(_.topic()).forall(_ == topic2)) }, test( "gives an error when attempting to subscribe using a manual subscription when there is already a topic subscription" @@ -261,23 +261,18 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceMany(topic1, kvs) - recordsConsumed <- Ref.make(Chunk.empty[CommittableRecord[String, String]]) + recordsConsumed <- Ref.make(Chunk.empty[ConsumerRecord[String, String]]) _ <- Consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) .take(40) - .transduce( - Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink - .collectAll[CommittableRecord[String, String]] - ) - .mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) } - .flattenChunks + .mapChunksZIO(records => Consumer.commit(records).as(records)) .runCollect .tap(records => recordsConsumed.update(_ ++ records)) .repeatN(24) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) consumed <- recordsConsumed.get - } yield assert(consumed.map(r => r.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2)))) + } yield assert(consumed.map(_.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2)))) } @@ TestAspect.nonFlaky(3) ) .provideSome[Scope & Kafka](producer) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala deleted file mode 100644 index f9583f5a7..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ /dev/null @@ -1,62 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } -import org.apache.kafka.common.TopicPartition -import zio.kafka.serde.Deserializer -import zio.{ RIO, Task } - -final case class CommittableRecord[K, V]( - record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, Long] => Task[Unit], - private val consumerGroupMetadata: Option[ConsumerGroupMetadata] -) { - def deserializeWith[R, K1, V1]( - keyDeserializer: Deserializer[R, K1], - valueDeserializer: Deserializer[R, V1] - )(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] = - for { - key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) - value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) - } yield copy( - record = new ConsumerRecord[K1, V1]( - record.topic(), - record.partition(), - record.offset(), - record.timestamp(), - record.timestampType(), - record.serializedKeySize(), - record.serializedValueSize(), - key, - value, - record.headers(), - record.leaderEpoch() - ) - ) - - def key: K = record.key - def value: V = record.value() - def partition: Int = record.partition() - def timestamp: Long = record.timestamp() - - def offset: Offset = - OffsetImpl( - topic = record.topic(), - partition = record.partition(), - offset = record.offset(), - commitHandle = commitHandle, - consumerGroupMetadata = consumerGroupMetadata - ) -} - -object CommittableRecord { - def apply[K, V]( - record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] - ): CommittableRecord[K, V] = - new CommittableRecord( - record = record, - commitHandle = commitHandle, - consumerGroupMetadata = consumerGroupMetadata - ) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 3ab626cb9..90643afdd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -11,6 +11,7 @@ import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } +import zio.kafka.consumer.types.{ deserializeWith, OffsetBatch } import zio.kafka.serde.{ Deserializer, Serde } import zio.kafka.utils.SslHelper import zio.stream._ @@ -37,6 +38,14 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Long]] + def commit(record: ConsumerRecord[_, _]): Task[Unit] + def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] + def commit(offsetBatch: OffsetBatch): Task[Unit] + + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] + /** * Retrieve the last committed offset for the given topic-partitions */ @@ -67,7 +76,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] /** * Create a stream with messages on the subscribed topic-partitions by topic-partition @@ -90,7 +99,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] + ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])] /** * Create a stream with all messages on the subscribed topic-partitions @@ -115,7 +124,7 @@ trait Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R, Throwable, CommittableRecord[K, V]] + ): ZStream[R, Throwable, ConsumerRecord[K, V]] /** * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit @@ -163,9 +172,6 @@ object Consumer { case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace - val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] = - ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _) - def live: RLayer[ConsumerSettings & Diagnostics, Consumer] = ZLayer.scoped { for { @@ -217,6 +223,27 @@ object Consumer { ): RIO[Consumer, Map[TopicPartition, Long]] = ZIO.serviceWithZIO(_.beginningOffsets(partitions, timeout)) + def commit(record: ConsumerRecord[_, _]): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(record)) + + def commit(records: Chunk[ConsumerRecord[_, _]]): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(records)) + + def commit(offsetBatch: OffsetBatch): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(offsetBatch)) + + def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R & Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry(policy, record)) + + def commitOrRetry[R]( + policy: Schedule[R, Throwable, Any], + records: Chunk[ConsumerRecord[_, _]] + ): RIO[R & Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry(policy, records)) + + def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R & Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry(policy, offsetBatch)) + /** * Accessor method */ @@ -250,7 +277,7 @@ object Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = + ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] = ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer)) /** @@ -263,7 +290,7 @@ object Consumer { ): ZStream[ Consumer, Throwable, - (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]]) + (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]]) ] = ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer)) @@ -275,7 +302,7 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R & Consumer, Throwable, CommittableRecord[K, V]] = + ): ZStream[R & Consumer, Throwable, ConsumerRecord[K, V]] = ZStream.serviceWithStream[Consumer]( _.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize) ) @@ -417,7 +444,6 @@ private[consumer] final class ConsumerLive private[consumer] ( consumer: ConsumerAccess, runloopAccess: RunloopAccess ) extends Consumer { - import Consumer._ override def assignment: Task[Set[TopicPartition]] = consumer.withConsumer(_.assignment().asScala.toSet) @@ -441,6 +467,27 @@ private[consumer] final class ConsumerLive private[consumer] ( offs.asScala.map { case (k, v) => k -> v.longValue() }.toMap } + override def commit(record: ConsumerRecord[_, _]): Task[Unit] = + commit(OffsetBatch.from(record)) + + override def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = + commit(OffsetBatch.from(records)) + + override def commit(offsetBatch: OffsetBatch): Task[Unit] = + runloopAccess.commit(offsetBatch) + + override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] = + commitOrRetry(retryPolicy, OffsetBatch.from(record)) + + override def commitOrRetry[R]( + retryPolicy: Schedule[R, Throwable, Any], + records: Chunk[ConsumerRecord[_, _]] + ): RIO[R, Unit] = + commitOrRetry(retryPolicy, OffsetBatch.from(records)) + + override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = + runloopAccess.commitOrRetry(retryPolicy, offsetBatch) + override def committed( partitions: Set[TopicPartition], timeout: Duration = Duration.Infinity @@ -456,7 +503,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] = { val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) ZStream.unwrapScoped { @@ -468,10 +515,10 @@ private[consumer] final class ConsumerLive private[consumer] ( .map { _.collect { case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => - val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = + val stream: ZStream[R, Throwable, ConsumerRecord[K, V]] = if (onlyByteArraySerdes) - partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] - else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) + partitionStream.asInstanceOf[ZStream[R, Throwable, ConsumerRecord[K, V]]] + else partitionStream.mapChunksZIO(_.mapZIO(deserializeWith(keyDeserializer, valueDeserializer))) tp -> stream } @@ -483,7 +530,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] = + ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])] = partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks override def plainStream[R, K, V]( @@ -491,7 +538,7 @@ private[consumer] final class ConsumerLive private[consumer] ( keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int - ): ZStream[R, Throwable, CommittableRecord[K, V]] = + ): ZStream[R, Throwable, ConsumerRecord[K, V]] = partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar( n = Int.MaxValue, bufferSize = bufferSize @@ -513,12 +560,11 @@ private[consumer] final class ConsumerLive private[consumer] ( r <- ZIO.environment[R & R1] _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - partitionStream.mapChunksZIO(_.mapZIO((c: CommittableRecord[K, V]) => f(c.record).as(c.offset))) + partitionStream.mapChunksZIO(records => records.mapZIO(f).as(records)) } - .provideEnvironment(r) - .aggregateAsync(offsetBatches) - .mapZIO(_.commitOrRetry(commitRetryPolicy)) + .mapChunksZIO(commitOrRetry(commitRetryPolicy, _).as(Chunk.empty)) .runDrain + .provideEnvironment(r) } yield () override def offsetsForTimes( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala deleted file mode 100644 index 69f8b9842..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ /dev/null @@ -1,47 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException } -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task } - -sealed trait Offset { - def topic: String - def partition: Int - def offset: Long - def commit: Task[Unit] - def batch: OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) - - final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) -} - -object Offset { - private[consumer] def commitOrRetry[R, B]( - commit: Task[Unit], - policy: Schedule[R, Throwable, B] - ): RIO[R, Unit] = - commit.retry( - Schedule.recurWhile[Throwable] { - case _: RetriableCommitFailedException => true - case _ => false - } && policy - ) -} - -private final case class OffsetImpl( - topic: String, - partition: Int, - offset: Long, - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset)) - def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala deleted file mode 100644 index 3c0c0f6cc..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ /dev/null @@ -1,65 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task, ZIO } - -sealed trait OffsetBatch { - def offsets: Map[TopicPartition, Long] - def commit: Task[Unit] - def add(offset: Offset): OffsetBatch - @deprecated("Use add(Offset) instead", "2.1.4") - def merge(offset: Offset): OffsetBatch - def merge(offsets: OffsetBatch): OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) -} - -object OffsetBatch { - val empty: OffsetBatch = EmptyOffsetBatch - - def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _) -} - -private final case class OffsetBatchImpl( - offsets: Map[TopicPartition, Long], - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) - - override def add(offset: Offset): OffsetBatch = - copy( - offsets = offsets + (offset.topicPartition -> (offsets - .getOrElse(offset.topicPartition, -1L) max offset.offset)) - ) - - override def merge(offset: Offset): OffsetBatch = add(offset) - - override def merge(otherOffsets: OffsetBatch): OffsetBatch = { - val newOffsets = Map.newBuilder[TopicPartition, Long] - newOffsets ++= offsets - otherOffsets.offsets.foreach { case (tp, offset) => - val existing = offsets.getOrElse(tp, -1L) - if (existing < offset) - newOffsets += tp -> offset - } - - copy(offsets = newOffsets.result()) - } -} - -case object EmptyOffsetBatch extends OffsetBatch { - override val offsets: Map[TopicPartition, Long] = Map.empty - override val commit: Task[Unit] = ZIO.unit - override def add(offset: Offset): OffsetBatch = offset.batch - override def merge(offset: Offset): OffsetBatch = add(offset) - override def merge(offsets: OffsetBatch): OffsetBatch = offsets - override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 4928f7d68..1319423c2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -2,14 +2,14 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.Runloop.ByteArrayConsumerRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } final class PartitionStreamControl private ( val tp: TopicPartition, - stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], - dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], + stream: ZStream[Any, Throwable, ByteArrayConsumerRecord], + dataQueue: Queue[Take[Throwable, ByteArrayConsumerRecord]], interruptionPromise: Promise[Throwable, Unit], completedPromise: Promise[Nothing, Unit], queueSizeRef: Ref[Int] @@ -21,7 +21,7 @@ final class PartitionStreamControl private ( ) /** Offer new data for the stream to process. */ - private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] = + private[internal] def offerRecords(data: Chunk[ByteArrayConsumerRecord]): ZIO[Any, Nothing, Unit] = queueSizeRef.update(_ + data.size) *> dataQueue.offer(Take.chunk(data)).unit def queueSize: UIO[Int] = queueSizeRef.get @@ -45,7 +45,7 @@ final class PartitionStreamControl private ( private[internal] def isRunning: ZIO[Any, Nothing, Boolean] = isCompleted.negate - private[internal] val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayCommittableRecord]) = + private[internal] val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayConsumerRecord]) = (tp, stream) } @@ -60,7 +60,7 @@ object PartitionStreamControl { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") interruptionPromise <- Promise.make[Throwable, Unit] completedPromise <- Promise.make[Nothing, Unit] - dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] + dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayConsumerRecord]] queueSize <- Ref.make(0) requestAndAwaitData = for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 6d6c8a92d..e97f4d929 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -11,6 +11,7 @@ import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.fetch.FetchStrategy import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.types.OffsetBatch import zio.stream._ import java.util @@ -115,14 +116,24 @@ private[consumer] final class Runloop private ( } } - private val commit: Map[TopicPartition, Long] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - } yield () + private[internal] def commitOrRetry[R]( + retryPolicy: Schedule[R, Throwable, Any], + offsetBatch: OffsetBatch + ): RIO[R, Unit] = + commit(offsetBatch).retry( + Schedule.recurWhile[Throwable] { + case _: RetriableCommitFailedException => true + case _ => false + } && retryPolicy + ) + + private[internal] def commit(offsets: OffsetBatch): Task[Unit] = + for { + p <- Promise.make[Throwable, Unit] + _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + } yield () private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = { val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } @@ -188,8 +199,6 @@ private[consumer] final class Runloop private ( ignoreRecordsForTps: Set[TopicPartition], polledRecords: ConsumerRecords[Array[Byte], Array[Byte]] ): UIO[Runloop.FulfillResult] = { - type Record = CommittableRecord[Array[Byte], Array[Byte]] - // The most efficient way to get the records from [[ConsumerRecords]] per // topic-partition, is by first getting the set of topic-partitions, and // then requesting the records per topic-partition. @@ -201,25 +210,13 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { - consumerGroupMetadata <- getConsumerGroupMetadataIfAny + /*consumerGroupMetadata*/ _ <- getConsumerGroupMetadataIfAny // TODO Jules: Use: how? _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) + if (records.isEmpty) ZIO.unit - else { - val builder = ChunkBuilder.make[Record](records.size()) - val iterator = records.iterator() - while (iterator.hasNext) { - val consumerRecord = iterator.next() - builder += - CommittableRecord[Array[Byte], Array[Byte]]( - record = consumerRecord, - commitHandle = commit, - consumerGroupMetadata = consumerGroupMetadata - ) - } - streamControl.offerRecords(builder.result()) - } + else streamControl.offerRecords(Chunk.fromJavaIterable(records)) } } yield fulfillResult } @@ -527,7 +524,7 @@ private[consumer] object Runloop { } } - type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] + type ByteArrayConsumerRecord = ConsumerRecord[Array[Byte], Array[Byte]] private final case class PollResult( startingTps: Set[TopicPartition], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 938029890..db76039f5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -1,13 +1,14 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition +import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.Runloop.ByteArrayConsumerRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.types.OffsetBatch import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } -import zio.{ Hub, IO, Ref, Scope, UIO, ZIO, ZLayer } private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -30,11 +31,11 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - private def withRunloopZIO[E]( - requireRunning: Boolean - )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = + private def withRunloopZIO[R, E, A]( + shouldStartIfNot: Boolean + )(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] = runloopStateRef.updateSomeAndGetZIO { - case RunloopState.NotStarted if requireRunning => makeRunloop.map(RunloopState.Started.apply) + case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) }.flatMap { case RunloopState.NotStarted => ZIO.unit case RunloopState.Started(runloop) => whenRunning(runloop) @@ -44,7 +45,7 @@ private[consumer] final class RunloopAccess private ( /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. */ - def stopConsumption: UIO[Unit] = withRunloopZIO(requireRunning = false)(_.stopConsumption) + def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption) /** * We're doing all of these things in this method so that the interface of this class is as simple as possible and @@ -58,17 +59,23 @@ private[consumer] final class RunloopAccess private ( for { stream <- ZStream.fromHubScoped(partitionHub) // starts the Runloop if not already started - _ <- withRunloopZIO(requireRunning = true)(_.addSubscription(subscription)) + _ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription)) _ <- ZIO.addFinalizer { - withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription)) <* + withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <* diagnostics.emit(Finalization.SubscriptionFinalized) } } yield stream + def commit(offsetBatch: OffsetBatch): Task[Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commit(offsetBatch)) + + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(retryPolicy, offsetBatch)) + } private[consumer] object RunloopAccess { - type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord]) + type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayConsumerRecord]) def make( settings: ConsumerSettings, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala new file mode 100644 index 000000000..b7770cf33 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala @@ -0,0 +1,77 @@ +package zio.kafka.consumer + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import zio.kafka.serde.Deserializer +import zio.prelude.Subtype +import zio.{ Chunk, RIO } + +object types { + + @inline + private[zio] def topicPartition(record: ConsumerRecord[_, _]): TopicPartition = + new TopicPartition(record.topic(), record.partition()) + + private[zio] def deserializeWith[R, K, V]( + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V] + )(record: ConsumerRecord[Array[Byte], Array[Byte]]): RIO[R, ConsumerRecord[K, V]] = + for { + key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) + value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) + } yield new ConsumerRecord[K, V]( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + record.serializedKeySize(), + record.serializedValueSize(), + key, + value, + record.headers(), + record.leaderEpoch() + ) + + type Offset = Offset.Type + object Offset extends Subtype[(TopicPartition, Long)] { + def from(record: ConsumerRecord[_, _]): Offset = Offset((topicPartition(record), record.offset())) + } + + type OffsetBatch = OffsetBatch.Type + object OffsetBatch extends Subtype[Map[TopicPartition, Long]] { + def empty: OffsetBatch = OffsetBatch(Map.empty) + + def from(record: ConsumerRecord[_, _]): OffsetBatch = + OffsetBatch(Map(topicPartition(record) -> record.offset())) + + def from(records: Chunk[ConsumerRecord[_, _]]): OffsetBatch = { + val newOffsets = Map.newBuilder[TopicPartition, Long] + records.foreach { record => + newOffsets += topicPartition(record) -> record.offset() + } + OffsetBatch(newOffsets.result()) + } + + implicit final class OffsetBatchOps(private val self: OffsetBatch) extends AnyVal { + def merge(other: OffsetBatch): OffsetBatch = { + val newOffsets = Map.newBuilder[TopicPartition, Long] + newOffsets ++= self + other.foreach { case (tp, offset) => + val existing = self.getOrElse(tp, -1L) + if (existing < offset) { + newOffsets += tp -> offset + } + } + OffsetBatch(newOffsets.result()) + } + + def add(offset: Offset): OffsetBatch = { + val (tp, offsetValue) = offset + val newOffset = self.getOrElse(tp, -1L) max offsetValue + OffsetBatch(self + (tp -> newOffset)) + } + } + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 428575818..22cc09584 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -1,7 +1,7 @@ package zio.kafka.producer import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } -import zio.kafka.consumer.{ Offset, OffsetBatch } +import zio.kafka.consumer.types.{ Offset, OffsetBatch } import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort } import zio.kafka.serde.Serializer import zio.{ Chunk, IO, RIO, Ref, UIO, ZIO } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 848acc542..cde923210 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -1,13 +1,13 @@ package zio.kafka.producer -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } import org.apache.kafka.clients.producer.{ KafkaProducer, RecordMetadata } import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail import zio._ -import zio.kafka.consumer.OffsetBatch +import zio.kafka.consumer.types.OffsetBatch import java.util import scala.jdk.CollectionConverters._ @@ -26,7 +26,10 @@ object TransactionalProducer { ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) - private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = { + private def commitTransactionWithOffsets( + offsetBatch: OffsetBatch, + consumerGroupMetadata: Option[ConsumerGroupMetadata] + ): Task[Unit] = { val sendOffsetsToTransaction: Task[Unit] = ZIO.suspendSucceed { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = @@ -36,11 +39,11 @@ object TransactionalProducer { ) ) - offsetBatch.consumerGroupMetadata match { + consumerGroupMetadata match { case None => invalidGroupIdException case Some(consumerGroupMetadata) => val offsets: util.Map[TopicPartition, OffsetAndMetadata] = - offsetBatch.offsets.map { case (topicPartition, offset) => + offsetBatch.map { case (topicPartition, offset) => topicPartition -> new OffsetAndMetadata(offset + 1) }.asJava @@ -48,14 +51,19 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) + sendOffsetsToTransaction.when(offsetBatch.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) } private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get - .flatMap(offsetBatch => commitTransactionWithOffsets(offsetBatch).retryN(5).orDie) + .flatMap(offsetBatch => + commitTransactionWithOffsets( + offsetBatch, + consumerGroupMetadata = None // TODO Jules + ).retryN(5).orDie + ) case Exit.Failure(Fail(UserInitiatedAbort, _)) => abortTransaction.retryN(5).orDie case Exit.Failure(_) => abortTransaction.retryN(5).orDie }