Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross-compile for Scala 3 #388

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ jobs:
strategy:
matrix:
scala:
- 2.13.8
- 2.12.15
- 2.13.12
- 2.12.18
- 3.3.1

steps:
- uses: actions/checkout@v3
Expand Down
28 changes: 25 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,48 @@ ThisBuild / versionScheme := Some("early-semver")

ThisBuild / evictionErrorLevel := Level.Warn

def crossSettings[T](scalaVersion: String, if3: List[T], if2: List[T]) =
CrossVersion.partialVersion(scalaVersion) match {
case Some((3, _)) => if3
case Some((2, 12 | 13)) => if2
case _ => Nil
}

lazy val commonSettings = Seq(
organization := "com.evolutiongaming",
homepage := Some(new URL("https://github.com/evolution-gaming/skafka")),
startYear := Some(2018),
organizationName := "Evolution",
organizationHomepage := Some(url("https://evolution.com")),
scalaVersion := crossScalaVersions.value.head,
crossScalaVersions := Seq("2.13.8", "2.12.15"),
crossScalaVersions := Seq("2.13.12", "3.3.1", "2.12.18"),
licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))),
releaseCrossBuild := true,
Compile / doc / scalacOptions += "-no-link-warnings",
libraryDependencies += compilerPlugin(`kind-projector` cross CrossVersion.full),
scalacOptions ++= crossSettings(
scalaVersion.value,
if3 = List("-Ykind-projector", "-language:implicitConversions", "-explain", "-deprecation"),
if2 = List("-Xsource:3"),
),
libraryDependencies ++= crossSettings(
scalaVersion.value,
if3 = Nil,
if2 = List(compilerPlugin(`kind-projector` cross CrossVersion.full))
),
scalacOptsFailOnWarn := Some(false),
publishTo := Some(Resolver.evolutionReleases),
// KeyRanks.Invisible to suppress sbt warning about key not being used in root/tests where MiMa plugin is disabled
mimaPreviousArtifacts.withRank(KeyRanks.Invisible) := {
val versions = List(
"11.0.0",
)
versions.map(organization.value %% moduleName.value % _).toSet

// check against all versions once Scala 3 lib version is published
crossSettings(
scalaVersion.value,
if3 = Nil,
if2 = versions.map(organization.value %% moduleName.value % _)
).toSet
},
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.skafka.consumer.Consumer.subscribe"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.evolutiongaming.skafka.producer

import cats.Applicative
import com.evolutiongaming.catshelper.FromTry
import com.evolutiongaming.jsonitertool.PlayJsonJsoniter
import com.evolution.playjson.jsoniter.PlayJsonJsoniter
import com.evolutiongaming.skafka.{ToBytes, Topic}
import play.api.libs.json.{JsValue, Json}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class JsonProducerSpec extends AnyFunSuite with Matchers {

test("apply") {
val metadata = RecordMetadata(TopicPartition("topic", Partition.min))
var actual = Option.empty[(Option[Bytes], Option[Bytes])]
Expand All @@ -18,7 +17,7 @@ class JsonProducerSpec extends AnyFunSuite with Matchers {
record: ProducerRecord[K, V])(implicit
toBytesK: ToBytes[Id, K],
toBytesV: ToBytes[Id, V]
) = {
): RecordMetadata = {
val topic = record.topic
val value = record.value.map(toBytesV(_, topic))
val key = record.key.map(toBytesK(_, topic))
Expand Down
23 changes: 11 additions & 12 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import sbt._

object Dependencies {

val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0"
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17"
val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.9.0"
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0"
val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.13"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.17"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"

Expand All @@ -20,19 +19,19 @@ object Dependencies {
}

object Logback {
private val version = "1.2.11"
private val version = "1.4.11"
val core = "ch.qos.logback" % "logback-core" % version
val classic = "ch.qos.logback" % "logback-classic" % version
}

object Slf4j {
private val version = "1.7.36"
private val version = "2.0.9"
val api = "org.slf4j" % "slf4j-api" % version
val `log4j-over-slf4j` = "org.slf4j" % "log4j-over-slf4j" % version
}

object Cats {
private val version = "2.8.0"
private val version = "2.10.0"
val core = "org.typelevel" %% "cats-core" % version
val laws = "org.typelevel" %% "cats-laws" % version
}
Expand All @@ -44,7 +43,7 @@ object Dependencies {
}

object Smetrics {
private val version = "2.0.0"
private val version = "2.1.0"
val smetrics = "com.evolutiongaming" %% "smetrics" % version
val `smetrics-prometheus` = "com.evolutiongaming" %% "smetrics-prometheus" % version
}
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.7.1
sbt.version=1.9.7
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.9")

addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.2")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.11")

addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0")

addSbtPlugin("com.evolution" % "sbt-scalac-opts-plugin" % "0.0.9")

addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")

addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.0.1")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.3")
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ object CommonConfig {

val Default: CommonConfig = CommonConfig()

private implicit val SecurityProtocolFromConf = FromConf[SecurityProtocol] { (conf, path) =>
private implicit val SecurityProtocolFromConf: FromConf[SecurityProtocol] = FromConf[SecurityProtocol] { (conf, path) =>
val str = conf.getString(path)
val value = SecurityProtocol.Values.find { _.name equalsIgnoreCase str }
value getOrElse {
throw new ConfigException.BadValue(conf.origin(), path, s"Cannot parse SecurityProtocol from $str")
}
}

private implicit val ClientDnsLookupFromConf = FromConf[ClientDnsLookup] { (conf, path) =>
private implicit val ClientDnsLookupFromConf: FromConf[ClientDnsLookup] = FromConf[ClientDnsLookup] { (conf, path) =>
val str = conf.getString(path)
val value = ClientDnsLookup.Values.find { _.name.equalsIgnoreCase(str) }
value.getOrElse(throw new ConfigException.BadValue(conf.origin(), path, s"Cannot parse ClientDnsLookup from $str"))
Expand Down
16 changes: 6 additions & 10 deletions skafka/src/main/scala/com/evolutiongaming/skafka/Converters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.{Optional, Collection => CollectionJ, Map => MapJ, Set => SetJ,

import cats.Monad
import cats.data.{NonEmptyList => Nel, NonEmptySet => Nes, NonEmptyMap => Nem}
import cats.implicits._
import cats.syntax.all.*
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{ApplicativeThrowable, FromTry, MonadThrowable, ToTry}
import org.apache.kafka.clients.consumer.{OffsetAndMetadata => OffsetAndMetadataJ}
Expand Down Expand Up @@ -95,20 +95,16 @@ object Converters {
}

implicit class MapJOps[K, V](val self: MapJ[K, V]) extends AnyVal {

def asScalaMap[F[_]: Monad, A, B](ka: K => F[A], vb: V => F[B], keepNullValues: Boolean): F[Map[A, B]] = {
self
.asScala
.toList
.collect {
case (k, v) if k != null && (keepNullValues || v != null) =>
for {
a <- ka(k)
b <- vb(v)
} yield (a, b)
// at the moment we cannot use partial functions inside `AnyVal`, see: https://github.com/lampepfl/dotty/issues/18769
.traverseFilter { case (k, v) =>
if (k != null && (keepNullValues || v != null)) (ka(k), vb(v)).mapN((_, _).some)
else none[(A, B)].pure[F]
Copy link
Contributor Author

@grzegorz-bielski grzegorz-bielski Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to Scala 3 bug compiler crashed here, so I rewrote it without partial function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment about this bug.

}
.sequence
.map { _.toMap }
.map(_.toMap)
}

def asScalaMap[F[_]: Monad, A, B](ka: K => F[A], vb: V => F[B]): F[Map[A, B]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object FromBytes {

implicit def functorFromBytes[F[_]: Functor]: Functor[FromBytes[F, *]] = new Functor[FromBytes[F, *]] {

def map[A, B](fa: FromBytes[F, A])(f: A => B) = new FromBytes[F, B] {
def map[A, B](fa: FromBytes[F, A])(f: A => B): FromBytes[F, B] = new FromBytes[F, B] {

def apply(bytes: Bytes, topic: Topic) = fa(bytes, topic).map(f)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object OffsetAndMetadata {
implicit val showOffset: Show[OffsetAndMetadata] = Show.fromToString

implicit val orderOffset: Order[OffsetAndMetadata] =
Order.whenEqual(Order.by { a: OffsetAndMetadata => a.offset }, Order.by { a: OffsetAndMetadata => a.metadata })
Order.whenEqual(Order.by(_.offset), Order.by(_.metadata))
t3hnar marked this conversation as resolved.
Show resolved Hide resolved

implicit val orderingOffset: Ordering[OffsetAndMetadata] = orderOffset.toOrdering
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object TopicPartition {
val empty: TopicPartition = TopicPartition("", Partition.min)

implicit val orderTopicPartition: Order[TopicPartition] =
Order.whenEqual(Order.by { a: TopicPartition => a.topic }, Order.by { a: TopicPartition => a.partition })
Order.whenEqual(Order.by(_.topic), Order.by(_.partition))

implicit val showTopicPartition: Show[TopicPartition] = Show.fromToString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Semaphore
import cats.implicits._
import cats.{Applicative, Monad, MonadError, ~>}
import cats.{Applicative, Monad, Monoid, MonadError, ~>}
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper._
import com.evolutiongaming.skafka.Converters._
Expand Down Expand Up @@ -590,7 +590,7 @@ object Consumer {
metrics: ConsumerMetrics[F]
)(implicit F: MonadError[F, E], measureDuration: MeasureDuration[F]): Consumer[F, K, V] = {

implicit val monoidUnit = Applicative.monoid[F, Unit]
implicit val monoidUnit: Monoid[F[Unit]] = Applicative.monoid[F, Unit]

val topics = for {
topicPartitions <- self.assignment
Expand Down Expand Up @@ -953,7 +953,7 @@ object Consumer {
metrics: ConsumerMetrics[F]
)(implicit F: MonadError[F, E], measureDuration: MeasureDuration[F], clock: Clock[F]): Consumer[F, K, V] = {

implicit val monoidUnit = Applicative.monoid[F, Unit]
implicit val monoidUnit: Monoid[F[Unit]] = Applicative.monoid[F, Unit]

val topics = for {
topicPartitions <- self.assignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ object ConsumerConfig {

val Default: ConsumerConfig = ConsumerConfig()

private implicit val AutoOffsetResetFromConf = FromConf[AutoOffsetReset] { (conf, path) =>
private implicit val AutoOffsetResetFromConf: FromConf[AutoOffsetReset] = FromConf[AutoOffsetReset] { (conf, path) =>
val str = conf.getString(path)
val value = AutoOffsetReset.Values.find { _.toString equalsIgnoreCase str }
value getOrElse {
throw new ConfigException.BadValue(conf.origin(), path, s"Cannot parse AutoOffsetReset from $str")
}
}

private implicit val IsolationLevelFromConf = FromConf[IsolationLevel] { (conf, path) =>
private implicit val IsolationLevelFromConf: FromConf[IsolationLevel] = FromConf[IsolationLevel] { (conf, path) =>
val str = conf.getString(path)
val value = IsolationLevel.Values.find { _.name equalsIgnoreCase str }
value getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object ConsumerMetrics {
quantiles = Quantiles.Default,
labels = LabelNames("client", "topic")
)
} yield { clientId: ClientId =>
} yield { (clientId: ClientId) =>
new ConsumerMetrics[F] {

def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ object ConsumerRecord {
implicit def orderConsumerRecord[K: Order, V]: Order[ConsumerRecord[K, V]] = {
Order.whenEqual(
Order.whenEqual(
Order.by { a: ConsumerRecord[K, V] => a.topicPartition },
Order.by { a: ConsumerRecord[K, V] => a.key }
Order.by(_.topicPartition),
Order.by(_.key)
),
Order.by { a: ConsumerRecord[K, V] => a.offset }
Order.by(_.offset)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ object RebalanceCallback extends RebalanceCallbackInstances with RebalanceCallba
case HandleErrorWith(source, fe) => HandleErrorWith(() => source().mapK(fg), fe andThen (_.mapK(fg)))
}
}

}

implicit class RebalanceCallbackNothingOps[A](val self: RebalanceCallback[Nothing, A]) extends AnyVal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ object ProducerConfig {

val Default: ProducerConfig = ProducerConfig()

private implicit val CompressionTypeFromConf = FromConf[CompressionType] { (conf, path) =>
private implicit val CompressionTypeFromConf: FromConf[CompressionType] = FromConf[CompressionType] { (conf, path) =>
val str = conf.getString(path)
val value = CompressionType.Values.find { _.toString equalsIgnoreCase str }
value getOrElse {
throw new ConfigException.BadValue(conf.origin(), path, s"Cannot parse CompressionType from $str")
}
}

private implicit val AcksFromConf = FromConf[Acks] { (conf, path) =>
private implicit val AcksFromConf: FromConf[Acks] = FromConf[Acks] { (conf, path) =>
val str = conf.getString(path)

val values = Acks.Values.filter(_.names.exists(str.equalsIgnoreCase))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object ProducerMetrics {
resultCounter <- resultCounter
callLatency <- callLatency
callCount <- callCount
} yield { clientId: ClientId =>
} yield { (clientId: ClientId) =>
{

def sendMeasure(result: String, topic: Topic, latency: FiniteDuration) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ final case class ProducerRecord[+K, +V](

object ProducerRecord {

def apply[K, V](topic: Topic, value: V, key: K): ProducerRecord[K, V] = {
ProducerRecord(topic = topic, value = Some(value), key = Some(key))
}
def apply[K, V](topic: Topic, value: V, key: K): ProducerRecord[K, V] =
ProducerRecord(
topic = topic,
value = Some(value),
key = Some(key),
partition = None,
Z1kkurat marked this conversation as resolved.
Show resolved Hide resolved
timestamp = None,
headers = Nil
)

implicit class ProducerRecordOps[K, V](val self: ProducerRecord[K, V]) extends AnyVal {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.evolutiongaming.skafka.IOSuite._
import com.evolutiongaming.skafka._
import com.evolutiongaming.skafka.consumer.ConsumerConverters._
import org.apache.kafka.clients.consumer.{Consumer => ConsumerJ, ConsumerGroupMetadata => ConsumerGroupMetadataJ, ConsumerRebalanceListener => ConsumerRebalanceListenerJ, ConsumerRecords => ConsumerRecordsJ, OffsetAndMetadata => OffsetAndMetadataJ, OffsetCommitCallback => OffsetCommitCallbackJ}
import org.apache.kafka.common.{Node, TopicPartition => TopicPartitionJ}
import org.apache.kafka.common.{Node, TopicPartition => TopicPartitionJ, MetricName, Metric}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

Expand Down Expand Up @@ -399,7 +399,7 @@ class ConsumerSpec extends AnyWordSpec with Matchers {
else
Map(new TopicPartitionJ(topic, partition.value) -> null).asJavaMap(identity, identity)

def metrics() = new java.util.HashMap()
def metrics(): MapJ[MetricName, _ <: Metric] = new java.util.HashMap()

def partitionsFor(topic: Topic) = {
List(partitionInfo.asJava).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ExplodingConsumer extends ConsumerJ[String, String] {
def metrics(): MapJ[MetricName, _ <: Metric] = notImplemented
def pause(partitions: CollectionJ[TopicPartitionJ]): Unit = notImplemented
def resume(partitions: CollectionJ[TopicPartitionJ]): Unit = notImplemented
def enforceRebalance() = notImplemented
def enforceRebalance(): Unit = notImplemented
def close(): Unit = notImplemented
def close(timeout: Long, unit: TimeUnit): Unit = notImplemented
def close(timeout: DurationJ): Unit = notImplemented
Expand Down
Loading
Loading