Skip to content

Commit

Permalink
Update beam to 2.59 (#5471)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Sep 16, 2024
1 parent 515120c commit c306b34
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 19 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import org.typelevel.scalacoptions.JavaMajorVersion.javaMajorVersion
// To test release candidates, find the beam repo and add it as a resolver
// ThisBuild / resolvers += "apache-beam-staging" at "https://repository.apache.org/content/repositories/"
val beamVendorVersion = "0.1"
val beamVersion = "2.58.1"
val beamVersion = "2.59.0"

// check version used by beam
// https://github.com/apache/beam/blob/v2.58.1/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
// https://github.com/apache/beam/blob/v2.59.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val bigdataossVersion = "2.2.16"
Expand Down Expand Up @@ -61,7 +61,7 @@ val googleApiServicesStorageVersion = s"v1-rev20240319-$googleClientsVersion"
// beam tested versions
val zetasketchVersion = "0.1.0" // sdks/java/extensions/zetasketch/build.gradle
val avroVersion = avroCompilerVersion // sdks/java/extensions/avro/build.gradle
val flinkVersion = "1.18.0" // runners/flink/1.17/build.gradle
val flinkVersion = "1.18.0" // runners/flink/1.18/build.gradle
val flinkMinorVersion = VersionNumber(flinkVersion).numbers.take(2).mkString(".")
val hadoopVersion = "3.2.4" // sdks/java/io/parquet/build.gradle
val sparkVersion = "3.5.0" // runners/spark/3/build.gradle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package com.spotify.scio.testing

import java.lang.{Iterable => JIterable}
import java.util.{Map => JMap}

import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection
import com.twitter.chill.Externalizer
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.testing.PAssert.{IterableAssert, SingletonAssert}
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.transforms.windowing.BoundedWindow
import org.apache.beam.sdk.util.CoderUtils
import org.apache.beam.sdk.util.{CoderUtils, SerializableSupplier}
import org.scalatest.matchers.{MatchResult, Matcher}
import org.{hamcrest => h}
import org.hamcrest.Matchers
Expand Down Expand Up @@ -69,13 +68,10 @@ private object ScioMatchers {
/** Create a hamcrest matcher that can be serialized using a Coder[T]. */
private def supplierFromCoder[A: Coder, B](@transient a: A, @transient context: ScioContext)(
builder: A => B
) = {
): SerializableSupplier[B] = {
val coder = CoderMaterializer.beam(context, Coder[A])
val encoded = CoderUtils.encodeToByteArray(coder, a)
new SerializableMatchers.SerializableSupplier[B] {
def a = CoderUtils.decodeFromByteArray(coder, encoded)
def get() = builder(a)
}
() => builder(CoderUtils.decodeFromByteArray(coder, encoded))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package com.spotify.scio.testing
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.streaming.ACCUMULATING_FIRED_PANES
import com.spotify.scio.values.WindowOptions
import org.apache.beam.sdk.Pipeline.PipelineExecutionException
import org.apache.beam.sdk.transforms.windowing.{
AfterProcessingTime,
AfterWatermark,
Expand Down Expand Up @@ -130,10 +129,10 @@ class SCollectionMatchersTest extends PipelineSpec {
an[AssertionError] should be thrownBy {
runWithContext(_.parallelize(Seq(1)) should containSingleValue(10))
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext(_.parallelize(1 to 10) should containSingleValue(1))
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext {
_.parallelize(Seq.empty[Int]) should containSingleValue(1)
}
Expand All @@ -145,10 +144,10 @@ class SCollectionMatchersTest extends PipelineSpec {
an[AssertionError] should be thrownBy {
runWithContext(_.parallelize(Seq(1)) shouldNot containSingleValue(1))
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext(_.parallelize(1 to 10) shouldNot containSingleValue(1))
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext {
_.parallelize(Seq.empty[Int]) shouldNot containSingleValue(1)
}
Expand Down Expand Up @@ -351,12 +350,12 @@ class SCollectionMatchersTest extends PipelineSpec {
_.parallelize(Seq(1)) should satisfySingleValue[Int](_ == 10)
}
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext {
_.parallelize(1 to 10) should satisfySingleValue[Int](_ == 1)
}
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext {
_.parallelize(Seq.empty[Int]) should satisfySingleValue[Int](_ == 1)
}
Expand All @@ -372,12 +371,12 @@ class SCollectionMatchersTest extends PipelineSpec {
_.parallelize(Seq(1)) shouldNot satisfySingleValue[Int](_ == 1)
}
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext {
_.parallelize(1 to 10) shouldNot satisfySingleValue[Int](_ == 1)
}
}
a[PipelineExecutionException] should be thrownBy {
a[AssertionError] should be thrownBy {
runWithContext {
_.parallelize(Seq.empty[Int]) shouldNot satisfySingleValue[Int](_ == 1)
}
Expand Down

0 comments on commit c306b34

Please sign in to comment.