From 30cdd0982b5a4aab243ee1b2bc010bca23f59176 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 20:38:05 +0400 Subject: [PATCH] Refined implementation of https://github.com/zio/zio-kafka/pull/1022#issuecomment-1712502924 --- .../kafka/consumer/CommittableRecord.scala | 14 +--- .../scala/zio/kafka/consumer/Committer.scala | 8 ++- .../scala/zio/kafka/consumer/Consumer.scala | 30 ++++----- .../scala/zio/kafka/consumer/Offset.scala | 47 -------------- .../zio/kafka/consumer/OffsetBatch.scala | 65 ------------------- 5 files changed, 21 insertions(+), 143 deletions(-) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index 6d3a5e494..aabaf426e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -2,12 +2,11 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } import org.apache.kafka.common.TopicPartition +import zio.RIO import zio.kafka.serde.Deserializer -import zio.{ RIO, Task } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, Long] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -39,26 +38,15 @@ final case class CommittableRecord[K, V]( def timestamp: Long = record.timestamp() lazy val topicPartition: TopicPartition = new TopicPartition(record.topic(), record.partition()) - - def offset: Offset = - OffsetImpl( - topic = record.topic(), - partition = record.partition(), - offset = record.offset(), - commitHandle = commitHandle, - consumerGroupMetadata = consumerGroupMetadata - ) } object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, Long] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( record = record, - commitHandle = commitHandle, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala index e2f8ba936..a47c51e5d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala @@ -2,10 +2,16 @@ package zio.kafka.consumer import org.apache.kafka.common.TopicPartition import zio.kafka.utils.PendingCommit -import zio.{Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO} +import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO } trait Committer { def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] + + final def commitAndAwait(records: Chunk[CommittableRecord[_, _]]): Task[Unit] = + commit(records).flatMap(_.awaitCommit) + + final def commitAndForget(records: Chunk[CommittableRecord[_, _]]): UIO[Unit] = + commit(records).unit } //noinspection ConvertExpressionToSAM diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 5ec6e2a2b..bcecab5f6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -1,17 +1,12 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ - Consumer => JConsumer, - ConsumerRecord, - OffsetAndMetadata, - OffsetAndTimestamp -} +import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, Consumer => JConsumer} import org.apache.kafka.common._ import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } -import zio.kafka.serde.{ Deserializer, Serde } +import zio.kafka.consumer.internal.{ConsumerAccess, RunloopAccess} +import zio.kafka.serde.{Deserializer, Serde} import zio.kafka.utils.SslHelper import zio.stream._ @@ -169,9 +164,6 @@ object Consumer { case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace - val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] = - ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _) - def live: RLayer[ConsumerSettings & Diagnostics, Consumer] = ZLayer.scoped { for { @@ -232,6 +224,9 @@ object Consumer { ): RIO[Consumer, Map[TopicPartition, Option[OffsetAndMetadata]]] = ZIO.serviceWithZIO(_.committed(partitions, timeout)) + def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R & Consumer, Committer] = + ZIO.serviceWithZIO[Consumer](_.commitAccumBatch(commitschedule)) + /** * Accessor method */ @@ -423,7 +418,6 @@ private[consumer] final class ConsumerLive private[consumer] ( consumer: ConsumerAccess, runloopAccess: RunloopAccess ) extends Consumer { - import Consumer._ override def assignment: Task[Set[TopicPartition]] = consumer.withConsumer(_.assignment().asScala.toSet) @@ -525,15 +519,17 @@ private[consumer] final class ConsumerLive private[consumer] ( f: ConsumerRecord[K, V] => URIO[R1, Unit] ): ZIO[R & R1, Throwable, Unit] = for { - r <- ZIO.environment[R & R1] + r <- ZIO.environment[R & R1] + committer <- commitAccumBatch(commitRetryPolicy) _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - partitionStream.mapChunksZIO(_.mapZIO((c: CommittableRecord[K, V]) => f(c.record).as(c.offset))) + partitionStream.mapChunksZIO(records => + records.mapZIO((r: CommittableRecord[K, V]) => f(r.record)).as(records) + ) } - .provideEnvironment(r) - .aggregateAsync(offsetBatches) - .mapZIO(_.commitOrRetry(commitRetryPolicy)) + .mapChunksZIO(committer.commitAndAwait(_).as(Chunk.empty)) .runDrain + .provideEnvironment(r) } yield () override def offsetsForTimes( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala deleted file mode 100644 index 69f8b9842..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ /dev/null @@ -1,47 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException } -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task } - -sealed trait Offset { - def topic: String - def partition: Int - def offset: Long - def commit: Task[Unit] - def batch: OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) - - final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) -} - -object Offset { - private[consumer] def commitOrRetry[R, B]( - commit: Task[Unit], - policy: Schedule[R, Throwable, B] - ): RIO[R, Unit] = - commit.retry( - Schedule.recurWhile[Throwable] { - case _: RetriableCommitFailedException => true - case _ => false - } && policy - ) -} - -private final case class OffsetImpl( - topic: String, - partition: Int, - offset: Long, - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset)) - def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala deleted file mode 100644 index 3c0c0f6cc..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ /dev/null @@ -1,65 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task, ZIO } - -sealed trait OffsetBatch { - def offsets: Map[TopicPartition, Long] - def commit: Task[Unit] - def add(offset: Offset): OffsetBatch - @deprecated("Use add(Offset) instead", "2.1.4") - def merge(offset: Offset): OffsetBatch - def merge(offsets: OffsetBatch): OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) -} - -object OffsetBatch { - val empty: OffsetBatch = EmptyOffsetBatch - - def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _) -} - -private final case class OffsetBatchImpl( - offsets: Map[TopicPartition, Long], - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) - - override def add(offset: Offset): OffsetBatch = - copy( - offsets = offsets + (offset.topicPartition -> (offsets - .getOrElse(offset.topicPartition, -1L) max offset.offset)) - ) - - override def merge(offset: Offset): OffsetBatch = add(offset) - - override def merge(otherOffsets: OffsetBatch): OffsetBatch = { - val newOffsets = Map.newBuilder[TopicPartition, Long] - newOffsets ++= offsets - otherOffsets.offsets.foreach { case (tp, offset) => - val existing = offsets.getOrElse(tp, -1L) - if (existing < offset) - newOffsets += tp -> offset - } - - copy(offsets = newOffsets.result()) - } -} - -case object EmptyOffsetBatch extends OffsetBatch { - override val offsets: Map[TopicPartition, Long] = Map.empty - override val commit: Task[Unit] = ZIO.unit - override def add(offset: Offset): OffsetBatch = offset.batch - override def merge(offset: Offset): OffsetBatch = add(offset) - override def merge(offsets: OffsetBatch): OffsetBatch = offsets - override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None -}