Skip to content

Commit

Permalink
Cleaned implementation of the new commit interface
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent 30cdd09 commit 0fb7fcb
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 236 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ lazy val kafkaClients = "org.apache.kafka" % "kafka-clients"
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"
lazy val zioPrelude = "dev.zio" %% "zio-prelude" % "1.0.0-RC20"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down Expand Up @@ -102,6 +103,7 @@ lazy val zioKafka =
.settings(enableZIO(enableStreaming = true))
.settings(
libraryDependencies ++= Seq(
zioPrelude,
kafkaClients,
jacksonDatabind,
scalaCollectionCompat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.openjdk.jmh.annotations._
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription }
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer }
import zio.stream.ZSink
import zio.{ durationInt, Ref, Schedule, ZIO, ZLayer }
import zio.{ Chunk, Ref, ZIO, ZLayer }

import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -60,18 +59,20 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
def throughputWithCommits(): Any = runZIO {
for {
counter <- Ref.make(0)
_ <- ZIO.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.map(_.offset)
.aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis))
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.runDrain
.provideSome[Kafka](env)
}
_ <- ZIO
.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.tap { _ =>
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages))
}
.mapChunksZIO(records =>
counter.update(_ + records.size) *> Consumer.commitRecords(records).as(Chunk.empty)
)
.takeUntilZIO((_: Chunk[_]) => counter.get.map(_ >= nrMessages))
.runDrain
}
.provideSome[Kafka](env)
} yield ()
}
}
17 changes: 6 additions & 11 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import zio.kafka.admin.AdminClient.{
}
import zio.kafka.admin.acl._
import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType }
import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription }
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit._
Expand Down Expand Up @@ -228,13 +228,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.flatMapPar(partitionCount)(_._2)
.take(count)
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](20))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
}
.transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20))
.mapConcatZIO(records => Consumer.commitRecords(records).as(records))
.runCollect
.provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID)))

Expand Down Expand Up @@ -301,7 +296,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.foreach(Consumer.commitRecord)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -344,7 +339,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.foreach(Consumer.commitRecord)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -645,7 +640,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
groupInstanceId: Option[String] = None
): ZIO[Kafka, Throwable, Unit] = Consumer
.plainStream(Subscription.topics(topicName), Serde.string, Serde.string)
.foreach(_.offset.commit)
.foreach(Consumer.commitRecord)
.provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId))

private def getStableConsumerGroupDescription(
Expand Down
22 changes: 12 additions & 10 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package zio.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import zio._
import zio.kafka.admin.AdminClient.NewTopic
import zio.kafka.consumer._
import zio.kafka.consumer.types.{ Offset, OffsetBatch }
import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort }
import zio.kafka.producer.{ ByteRecord, Producer, Transaction, TransactionalProducer }
import zio.kafka.serde.Serde
Expand All @@ -26,7 +28,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
def withConsumerInt(
subscription: Subscription,
settings: ConsumerSettings
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] =
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, ConsumerRecord[String, Int]]]] =
Consumer.make(settings).flatMap { c =>
c.plainStream(subscription, Serde.string, Serde.int).toQueue()
}
Expand Down Expand Up @@ -193,15 +195,15 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
for {
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages
.filter(rec => rec.record.key == key1 && rec.record.value == value1)
.filter(rec => rec.key == key1 && rec.value == value1)
} yield record
}
}
record2 <- ZIO.scoped {
withConsumer(Topics(Set(topic2)), settings).flatMap { consumer =>
for {
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == key2 && rec.record.value == value2)
record = messages.filter(rec => rec.key == key2 && rec.value == value2)
} yield record
}
}
Expand Down Expand Up @@ -289,7 +291,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "bob")
record = messages.filter(_.key == "bob")
} yield record
}
}
Expand Down Expand Up @@ -329,7 +331,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "bob")
record = messages.filter(_.key == "bob")
} yield record
}
}
Expand Down Expand Up @@ -417,7 +419,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record

}
Expand Down Expand Up @@ -458,7 +460,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record
}
}
Expand Down Expand Up @@ -504,7 +506,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record
}
}
Expand Down Expand Up @@ -544,7 +546,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
aliceAccountFeesPaid,
Serde.string,
Serde.int,
Some(aliceHadMoneyCommittableMessage.offset)
Some(Offset.from(aliceHadMoneyCommittableMessage))
)
}
}
Expand Down Expand Up @@ -591,7 +593,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
aliceAccountFeesPaid,
Serde.string,
Serde.int,
Some(aliceHadMoneyCommittableMessage.offset)
Some(Offset.from(aliceHadMoneyCommittableMessage))
) *>
t.abort
}
Expand Down
Loading

0 comments on commit 0fb7fcb

Please sign in to comment.