Skip to content

Commit

Permalink
Refined implementation of #1022 (comment)
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent fc7632c commit 30cdd09
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord }
import org.apache.kafka.common.TopicPartition
import zio.RIO
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }

final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, Long] => Task[Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
) {
def deserializeWith[R, K1, V1](
Expand Down Expand Up @@ -39,26 +38,15 @@ final case class CommittableRecord[K, V](
def timestamp: Long = record.timestamp()

lazy val topicPartition: TopicPartition = new TopicPartition(record.topic(), record.partition())

def offset: Offset =
OffsetImpl(
topic = record.topic(),
partition = record.partition(),
offset = record.offset(),
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata
)
}

object CommittableRecord {
def apply[K, V](
record: ConsumerRecord[K, V],
commitHandle: Map[TopicPartition, Long] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): CommittableRecord[K, V] =
new CommittableRecord(
record = record,
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata
)
}
8 changes: 7 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package zio.kafka.consumer

import org.apache.kafka.common.TopicPartition
import zio.kafka.utils.PendingCommit
import zio.{Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO}
import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO }

trait Committer {
def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]]

final def commitAndAwait(records: Chunk[CommittableRecord[_, _]]): Task[Unit] =
commit(records).flatMap(_.awaitCommit)

final def commitAndForget(records: Chunk[CommittableRecord[_, _]]): UIO[Unit] =
commit(records).unit
}

//noinspection ConvertExpressionToSAM
Expand Down
30 changes: 13 additions & 17 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{
Consumer => JConsumer,
ConsumerRecord,
OffsetAndMetadata,
OffsetAndTimestamp
}
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, Consumer => JConsumer}
import org.apache.kafka.common._
import zio._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess }
import zio.kafka.serde.{ Deserializer, Serde }
import zio.kafka.consumer.internal.{ConsumerAccess, RunloopAccess}
import zio.kafka.serde.{Deserializer, Serde}
import zio.kafka.utils.SslHelper
import zio.stream._

Expand Down Expand Up @@ -169,9 +164,6 @@ object Consumer {
case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace
case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace

val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] =
ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _)

def live: RLayer[ConsumerSettings & Diagnostics, Consumer] =
ZLayer.scoped {
for {
Expand Down Expand Up @@ -232,6 +224,9 @@ object Consumer {
): RIO[Consumer, Map[TopicPartition, Option[OffsetAndMetadata]]] =
ZIO.serviceWithZIO(_.committed(partitions, timeout))

def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R & Consumer, Committer] =
ZIO.serviceWithZIO[Consumer](_.commitAccumBatch(commitschedule))

/**
* Accessor method
*/
Expand Down Expand Up @@ -423,7 +418,6 @@ private[consumer] final class ConsumerLive private[consumer] (
consumer: ConsumerAccess,
runloopAccess: RunloopAccess
) extends Consumer {
import Consumer._

override def assignment: Task[Set[TopicPartition]] =
consumer.withConsumer(_.assignment().asScala.toSet)
Expand Down Expand Up @@ -525,15 +519,17 @@ private[consumer] final class ConsumerLive private[consumer] (
f: ConsumerRecord[K, V] => URIO[R1, Unit]
): ZIO[R & R1, Throwable, Unit] =
for {
r <- ZIO.environment[R & R1]
r <- ZIO.environment[R & R1]
committer <- commitAccumBatch(commitRetryPolicy)
_ <- partitionedStream(subscription, keyDeserializer, valueDeserializer)
.flatMapPar(Int.MaxValue) { case (_, partitionStream) =>
partitionStream.mapChunksZIO(_.mapZIO((c: CommittableRecord[K, V]) => f(c.record).as(c.offset)))
partitionStream.mapChunksZIO(records =>
records.mapZIO((r: CommittableRecord[K, V]) => f(r.record)).as(records)
)
}
.provideEnvironment(r)
.aggregateAsync(offsetBatches)
.mapZIO(_.commitOrRetry(commitRetryPolicy))
.mapChunksZIO(committer.commitAndAwait(_).as(Chunk.empty))
.runDrain
.provideEnvironment(r)
} yield ()

override def offsetsForTimes(
Expand Down
47 changes: 0 additions & 47 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala

This file was deleted.

65 changes: 0 additions & 65 deletions zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala

This file was deleted.

0 comments on commit 30cdd09

Please sign in to comment.