Skip to content

Commit

Permalink
log topic-partition and key in KeyDatabase/Keys logging when deleting…
Browse files Browse the repository at this point in the history
… a key (#670)
  • Loading branch information
Z1kkurat authored Jan 21, 2025
1 parent c86104f commit d3dfc30
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.mtl.Stateful
import cats.syntax.all._
import cats.{Applicative, Monad}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.LogPrefix
import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._
import com.evolutiongaming.skafka.TopicPartition
import com.evolutiongaming.sstream.Stream
Expand All @@ -19,7 +20,7 @@ trait KeyDatabase[F[_], K] {

def all(applicationId: String, groupId: String, topicPartition: TopicPartition): Stream[F, K]

def keysOf(implicit F: Monad[F], logOf: LogOf[F]): F[KeysOf[F, K]] =
def keysOf(implicit F: Monad[F], logOf: LogOf[F], logPrefix: LogPrefix[K]): F[KeysOf[F, K]] =
logOf(KeyDatabase.getClass) map { implicit log => KeysOf(this) }

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.evolutiongaming.kafka.flow.key

import cats.Applicative
import cats.Monad
import cats.syntax.all._
import cats.{Applicative, Monad}
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.flow.LogPrefix

trait Keys[F[_]] extends KeyWriter[F]

Expand All @@ -24,17 +24,18 @@ trait KeyWriter[F[_]] {
object Keys {

/** Creates a buffer for a given writer */
private[key] def apply[F[_]: Monad: Log, K](
private[key] def apply[F[_]: Monad: Log, K: LogPrefix](
key: K,
database: KeyDatabase[F, K]
): Keys[F] = new Keys[F] {

private val prefixedLog = Log[F].prefixed(s"[${LogPrefix[K].extract(key)}]")

def flush: F[Unit] = database.persist(key)

def delete(persist: Boolean): F[Unit] =
if (persist) {
database.delete(key) *>
Log[F].info("deleted key")
database.delete(key) *> prefixedLog.info("deleted key")
} else {
().pure[F]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.evolutiongaming.kafka.flow.key

import cats.Monad
import cats.effect.Sync
import cats.syntax.all._
import cats.Monad
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.flow.LogPrefix
import com.evolutiongaming.skafka.TopicPartition
import com.evolutiongaming.sstream.Stream

Expand All @@ -16,13 +17,13 @@ trait KeysOf[F[_], K] {
}
object KeysOf {

def memory[F[_]: Sync: Log, K]: F[KeysOf[F, K]] =
def memory[F[_]: Sync: Log, K: LogPrefix]: F[KeysOf[F, K]] =
KeyDatabase.memory[F, K] map { database =>
KeysOf(database)
}

/** Creates `KeysOf` with a passed logger */
def apply[F[_]: Monad: Log, K](
def apply[F[_]: Monad: Log, K: LogPrefix](
database: KeyDatabase[F, K]
): KeysOf[F, K] = new KeysOf[F, K] {
def apply(key: K) = Keys(key, database)
Expand Down

0 comments on commit d3dfc30

Please sign in to comment.