Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port kafka-healthcheck to CE2 #437

Open
wants to merge 1 commit into
base: ce2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ object Dependencies {
val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "2.11.0"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "2.15.3-SNAPSHOT" // local release with RandomId
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17"
val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package com.evolutiongaming.skafka

import cats.effect._
import cats.effect.concurrent.Ref
import cats.data.{NonEmptySet => Nes}
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{Applicative, Functor, Monad}
import com.evolutiongaming.catshelper.{FromTry, Log, LogOf, RandomIdOf}
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerOf, Consumer => SKafkaConsumer}
import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerRecord, ProducerOf, Producer => SKafkaProducer}

import scala.concurrent.CancellationException
import scala.concurrent.duration._

/**
* Provides a health check mechanism that repeatedly sends and consumes messages to/from Kafka.
*/
trait KafkaHealthCheck[F[_]] {

/**
* Returns the last error that occurred during the health check.
*/
def error: F[Option[Throwable]]

/**
* Blocks a fiber until the health check is done.
*/
def done: F[Unit]
}

object KafkaHealthCheck {

def empty[F[_]: Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] {

def error = none[Throwable].pure[F]

def done = ().pure[F]
}

def of[F[_]: Concurrent: Timer: LogOf: ConsumerOf: ProducerOf: RandomIdOf: FromTry](
config: Config,
consumerConfig: ConsumerConfig,
producerConfig: ProducerConfig
): Resource[F, KafkaHealthCheck[F]] = {

val result = for {
log <- LogOf[F].apply(KafkaHealthCheck.getClass)
randomId <- RandomIdOf[F].apply
} yield {
val key = randomId.value

val consumer = Consumer.of[F](key, consumerConfig)

val producer = Producer.of[F](config.topic, producerConfig)

of(key = key, config = config, stop = false.pure[F], producer = producer, consumer = consumer, log = log)
}

Resource
.eval(result)
.flatten
}

def of[F[_]: Concurrent: Timer](
key: String,
config: Config,
stop: F[Boolean],
producer: Resource[F, Producer[F]],
consumer: Resource[F, Consumer[F]],
log: Log[F]
): Resource[F, KafkaHealthCheck[F]] = {

val result = for {
ref <- Ref.of[F, Option[Throwable]](None)
fiber <- (producer, consumer)
.tupled
.use { case (producer, consumer) => run(key, config, stop, producer, consumer, ref.set, log) }
.start
} yield {
val result = new KafkaHealthCheck[F] {
def error = ref.get
def done = fiber.join.guaranteeCase {
case ExitCase.Completed => Concurrent[F].unit
case ExitCase.Error(e) => Concurrent[F].raiseError(e)
case ExitCase.Canceled => Concurrent[F].raiseError(new CancellationException("HealthCheck cancelled"))
}
}
(result, fiber.cancel)
}

Resource(result)
}

def run[F[_]: Concurrent: Timer](
key: String,
config: Config,
stop: F[Boolean],
producer: Producer[F],
consumer: Consumer[F],
set: Option[Throwable] => F[Unit],
log: Log[F]
): F[Unit] = {

val sleep = Timer[F].sleep(config.interval)

def produce(value: String) = {
val record = Record(key = key.some, value = value.some)
for {
_ <- log.debug(s"$key send $value")
_ <- producer.send(record)
} yield {}
}

def produceConsume(n: Long) = {
val value = n.toString

def consume(retry: Long) = {
for {
records <- consumer.poll(config.pollTimeout)
found = records.find { record => record.key.contains_(key) && record.value.contains_(value) }
result <- found.fold {
for {
_ <- sleep
_ <- produce(s"$n:$retry")
} yield {
(retry + 1).asLeft[Unit]
}
} { _ =>
().asRight[Long].pure[F]
}
} yield result
}

val produceConsume = for {
_ <- produce(value)
_ <- 0L.tailRecM(consume)
} yield {}

produceConsume
.timeout(config.timeout)
.redeem(_.some, _ => none[Throwable])
}

def check(n: Long) = {
for {
error <- produceConsume(n)
_ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") }
_ <- set(error)
_ <- sleep
stop <- stop
} yield {
if (stop) ().asRight[Long]
else (n + 1).asLeft[Unit]
}
}

for {
_ <- Timer[F].sleep(config.initial)
_ <- consumer.subscribe(config.topic)
_ <- consumer.poll(config.interval)
_ <- produceConsume(0L) // warmup
_ <- 1L.tailRecM(check)
} yield {}
}

trait Producer[F[_]] {
def send(record: Record): F[Unit]
}

object Producer {

def apply[F[_]](implicit F: Producer[F]): Producer[F] = F

def apply[F[_]: Monad: FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = {
new Producer[F] {
def send(record: Record) = {
val record1 = ProducerRecord[String, String](topic = topic, key = record.key, value = record.value)
producer.send(record1).void
}
}
}

def of[F[_]: Monad: ProducerOf: FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = {
for {
producer <- implicitly[ProducerOf[F]].apply(config)
} yield {
Producer[F](topic = topic, producer = producer)
}
}
}

trait Consumer[F[_]] {

def subscribe(topic: Topic): F[Unit]

def poll(timeout: FiniteDuration): F[Iterable[Record]]
}

object Consumer {

def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F

def apply[F[_]: Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = {

new Consumer[F] {

def subscribe(topic: Topic) = {
consumer.subscribe(Nes.of(topic))
}

def poll(timeout: FiniteDuration) = {
for {
records <- consumer.poll(timeout)
} yield for {
record <- records.values.values.flatMap(_.toList)
} yield {
Record(key = record.key.map(_.value), value = record.value.map(_.value))
}
}
}
}

def of[F[_]: Monad: ConsumerOf: FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = {
val config1 = {
val groupId = config.common.clientId.fold(key) { clientId => s"$clientId-$key" }
config.copy(groupId = groupId.some, autoOffsetReset = AutoOffsetReset.Latest)
}

for {
consumer <- implicitly[ConsumerOf[F]].apply[String, String](config1)
} yield {
Consumer[F](consumer)
}
}
}

final case class Record(key: Option[String], value: Option[String])

final case class Config(
topic: Topic = "healthcheck",
initial: FiniteDuration = 10.seconds,
interval: FiniteDuration = 1.second,
timeout: FiniteDuration = 2.minutes,
pollTimeout: FiniteDuration = 10.millis
)

object Config {
val default: Config = Config()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.evolutiongaming.skafka

import cats.effect._
import cats.syntax.all._
import cats.effect.concurrent.Ref
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.skafka.IOSuite._
import com.evolutiongaming.skafka.KafkaHealthCheck.Record
import com.evolutiongaming.skafka.Topic
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._
import scala.util.control.NoStackTrace

class KafkaHealthCheckSpec extends AsyncFunSuite with Matchers {
import KafkaHealthCheckSpec._

test("error") {
implicit val log: Log[IO] = Log.empty[IO]

val producer = new KafkaHealthCheck.Producer[IO] {
def send(record: Record) = ().pure[IO]
}

val consumer = new KafkaHealthCheck.Consumer[IO] {

def subscribe(topic: Topic) = ().pure[IO]

def poll(timeout: FiniteDuration) = {
if (timeout == 1.second) List.empty[Record].pure[IO]
else Error.raiseError[IO, List[Record]]
}
}

val healthCheck = KafkaHealthCheck.of[IO](
key = "key",
config = KafkaHealthCheck.Config(topic = "topic", initial = 0.seconds, interval = 1.second),
stop = false.pure[IO],
producer = Resource.pure[IO, KafkaHealthCheck.Producer[IO]](producer),
consumer = Resource.pure[IO, KafkaHealthCheck.Consumer[IO]](consumer),
log = log
)
val result = for {
error <- healthCheck.use(_.error.untilDefinedM)
} yield {
error shouldEqual error
}
result.run()
}

test("periodic healthcheck") {
final case class State(
checks: Int = 0,
subscribed: Option[Topic] = None,
logs: List[String] = List.empty,
records: List[Record] = List.empty
)

def logOf(ref: Ref[IO, State]): Log[IO] = {
def add(log: String): IO[Unit] =
ref.update(state => state.copy(logs = log :: state.logs))

new Log[IO] {
def trace(msg: => String, mdc: Log.Mdc) = add(s"trace $msg")

def debug(msg: => String, mdc: Log.Mdc) = add(s"debug $msg")

def info(msg: => String, mdc: Log.Mdc) = add(s"info $msg")

def warn(msg: => String, mdc: Log.Mdc) = add(s"warn $msg")

def warn(msg: => String, cause: Throwable, mdc: Log.Mdc) = add(s"warn $msg $cause")

def error(msg: => String, mdc: Log.Mdc) = add(s"error $msg")

def error(msg: => String, cause: Throwable, mdc: Log.Mdc) = add(s"error $msg $cause")
}
}

def consumerOf(ref: Ref[IO, State]) = new KafkaHealthCheck.Consumer[IO] {
def subscribe(topic: Topic): IO[Unit] =
ref.update(_.copy(subscribed = topic.some))

def poll(timeout: FiniteDuration): IO[Iterable[Record]] =
ref
.modify(state =>
if (state.records.size >= 2) (state.copy(records = List.empty), state.records)
else (state, List.empty)
)
}

def producerOf(ref: Ref[IO, State]): KafkaHealthCheck.Producer[IO] = new KafkaHealthCheck.Producer[IO] {
def send(record: Record): IO[Unit] =
ref.update(state => state.copy(records = record :: state.records))
}

def stopOf(ref: Ref[IO, State]): IO[Boolean] =
ref.updateAndGet(state => state.copy(checks = state.checks - 1)).map(_.checks <= 0)

val result = for {
ref <- Ref.of[IO, State](State(checks = 2))
healthCheck = KafkaHealthCheck.of[IO](
key = "key",
config =
KafkaHealthCheck.Config(topic = "topic", initial = 0.millis, interval = 0.millis, timeout = 100.millis),
stop = stopOf(ref),
producer = Resource.pure[IO, KafkaHealthCheck.Producer[IO]](producerOf(ref)),
consumer = Resource.pure[IO, KafkaHealthCheck.Consumer[IO]](consumerOf(ref)),
log = logOf(ref)
)
_ <- healthCheck.use(_.done)
state <- ref.get

} yield state shouldEqual State(
checks = 0,
subscribed = "topic".some,
logs = List(
"debug key send 2:0",
"debug key send 2",
"debug key send 1:0",
"debug key send 1",
"debug key send 0:0",
"debug key send 0"
),
records = List()
)

result.run()
}
}

object KafkaHealthCheckSpec {
val Error: Throwable = new RuntimeException with NoStackTrace
}
Loading