Skip to content

Commit

Permalink
Handle non-existent topics in partitionsFor
Browse files Browse the repository at this point in the history
  • Loading branch information
nviliunov-evolution-throwaway committed Nov 25, 2024
1 parent 04edf3e commit a636a39
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,11 @@ object Consumer {
} yield result
}

def partitions1(f: => ListJ[PartitionInfoJ]) = {
def partitions1(f: => Option[ListJ[PartitionInfoJ]]) = {
for {
result <- serialBlocking { f }
result <- partitionsInfoListF[F](result)
} yield result
result <- result.traverse(partitionsInfoListF[F])
} yield result.getOrElse(List.empty)
}

def topics1(f: => MapJ[Topic, ListJ[PartitionInfoJ]]) = {
Expand Down Expand Up @@ -506,11 +506,11 @@ object Consumer {
}

def partitions(topic: Topic) = {
partitions1 { consumer.partitionsFor(topic) }
partitions1 { Option(consumer.partitionsFor(topic)) }
}

def partitions(topic: Topic, timeout: FiniteDuration) = {
partitions1 { consumer.partitionsFor(topic, timeout.asJava) }
partitions1 { Option(consumer.partitionsFor(topic, timeout.asJava)) }
}

val topics = topics1 { consumer.listTopics() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.time.Instant
import java.util.{Map => MapJ}

import cats.data.{NonEmptyMap => Nem, NonEmptySet => Nes}
import cats.implicits.toTraverseOps
import com.evolutiongaming.skafka.Converters._
import com.evolutiongaming.skafka.consumer.ConsumerConverters._
import com.evolutiongaming.skafka._
Expand Down Expand Up @@ -217,15 +218,15 @@ object RebalanceConsumer {

def partitionsFor(topic: Topic) =
for {
a <- Try { c.partitionsFor(topic) }
a <- partitionsInfoListF[Try](a)
} yield a
a <- Try { Option(c.partitionsFor(topic)) }
a <- a.traverse(partitionsInfoListF[Try])
} yield a.getOrElse(List.empty)

def partitionsFor(topic: Topic, timeout: FiniteDuration) =
for {
a <- Try { c.partitionsFor(topic, timeout.asJava) }
a <- partitionsInfoListF[Try](a)
} yield a
a <- Try { Option(c.partitionsFor(topic, timeout.asJava)) }
a <- a.traverse(partitionsInfoListF[Try])
} yield a.getOrElse(List.empty)

def paused() =
for {
Expand Down

0 comments on commit a636a39

Please sign in to comment.