Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC - 3] New commit interface #1042

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ lazy val kafkaClients = "org.apache.kafka" % "kafka-clients"
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"
lazy val zioPrelude = "dev.zio" %% "zio-prelude" % "1.0.0-RC20"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down Expand Up @@ -102,6 +103,7 @@ lazy val zioKafka =
.settings(enableZIO(enableStreaming = true))
.settings(
libraryDependencies ++= Seq(
zioPrelude,
kafkaClients,
jacksonDatabind,
scalaCollectionCompat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.openjdk.jmh.annotations._
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription }
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer }
import zio.stream.ZSink
import zio.{ durationInt, Ref, Schedule, ZIO, ZLayer }
import zio.{ Chunk, Ref, ZIO, ZLayer }

import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -60,18 +59,18 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
def throughputWithCommits(): Any = runZIO {
for {
counter <- Ref.make(0)
_ <- ZIO.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.map(_.offset)
.aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis))
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.runDrain
.provideSome[Kafka](env)
}
_ <- ZIO
.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.tap { _ =>
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages))
}
.mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(Chunk.empty))
.takeUntilZIO((_: Chunk[_]) => counter.get.map(_ >= nrMessages))
.runDrain
}
.provideSome[Kafka](env)
} yield ()
}
}
17 changes: 6 additions & 11 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import zio.kafka.admin.AdminClient.{
}
import zio.kafka.admin.acl._
import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType }
import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription }
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit._
Expand Down Expand Up @@ -228,13 +228,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.flatMapPar(partitionCount)(_._2)
.take(count)
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](20))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
}
.transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20))
.mapConcatZIO(records => Consumer.commit(records).as(records))
.runCollect
.provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID)))

Expand Down Expand Up @@ -301,7 +296,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.foreach(Consumer.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -344,7 +339,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.foreach(Consumer.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -645,7 +640,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
groupInstanceId: Option[String] = None
): ZIO[Kafka, Throwable, Unit] = Consumer
.plainStream(Subscription.topics(topicName), Serde.string, Serde.string)
.foreach(_.offset.commit)
.foreach(Consumer.commit)
.provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId))

private def getStableConsumerGroupDescription(
Expand Down
22 changes: 12 additions & 10 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package zio.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import zio._
import zio.kafka.admin.AdminClient.NewTopic
import zio.kafka.consumer._
import zio.kafka.consumer.types.{ Offset, OffsetBatch }
import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort }
import zio.kafka.producer.{ ByteRecord, Producer, Transaction, TransactionalProducer }
import zio.kafka.serde.Serde
Expand All @@ -26,7 +28,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
def withConsumerInt(
subscription: Subscription,
settings: ConsumerSettings
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] =
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, ConsumerRecord[String, Int]]]] =
Consumer.make(settings).flatMap { c =>
c.plainStream(subscription, Serde.string, Serde.int).toQueue()
}
Expand Down Expand Up @@ -193,15 +195,15 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
for {
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages
.filter(rec => rec.record.key == key1 && rec.record.value == value1)
.filter(rec => rec.key == key1 && rec.value == value1)
} yield record
}
}
record2 <- ZIO.scoped {
withConsumer(Topics(Set(topic2)), settings).flatMap { consumer =>
for {
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == key2 && rec.record.value == value2)
record = messages.filter(rec => rec.key == key2 && rec.value == value2)
} yield record
}
}
Expand Down Expand Up @@ -289,7 +291,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "bob")
record = messages.filter(_.key == "bob")
} yield record
}
}
Expand Down Expand Up @@ -329,7 +331,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "bob")
record = messages.filter(_.key == "bob")
} yield record
}
}
Expand Down Expand Up @@ -417,7 +419,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record

}
Expand Down Expand Up @@ -458,7 +460,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record
}
}
Expand Down Expand Up @@ -504,7 +506,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record
}
}
Expand Down Expand Up @@ -544,7 +546,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
aliceAccountFeesPaid,
Serde.string,
Serde.int,
Some(aliceHadMoneyCommittableMessage.offset)
Some(Offset.from(aliceHadMoneyCommittableMessage))
)
}
}
Expand Down Expand Up @@ -591,7 +593,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
aliceAccountFeesPaid,
Serde.string,
Serde.int,
Some(aliceHadMoneyCommittableMessage.offset)
Some(Offset.from(aliceHadMoneyCommittableMessage))
) *>
t.abort
}
Expand Down
Loading
Loading