Skip to content

Commit

Permalink
scalafix for the rest of the BQ API changes (#4625)
Browse files Browse the repository at this point in the history
* scalafix for the rest of the BQ API changes

* rebase to master

* avoid using strings for pattern matching

* PR feedbacks

* format

* merge master
  • Loading branch information
farzad-sedghi authored Jan 25, 2023
1 parent 9a9dec6 commit a68b370
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 137 deletions.
10 changes: 9 additions & 1 deletion scalafix/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ def scio11(version: String) =
"com.spotify" %% "scio-avro"
).map(_ % version)

def scio12(version: String) =
List(
"com.spotify" %% "scio-core",
"com.spotify" %% "scio-avro",
"com.spotify" %% "scio-google-cloud-platform",
"com.spotify" %% "scio-extra"
).map(_ % version)

lazy val `input-0_7` = project
.settings(
libraryDependencies ++= scio(Scio.`0.6`)
Expand Down Expand Up @@ -84,7 +92,7 @@ lazy val `input-0_12` = project

lazy val `output-0_12` = project
.settings(
libraryDependencies ++= scio10(Scio.`0.12`)
libraryDependencies ++= scio12(Scio.`0.12`)
)

lazy val tests = project
Expand Down
23 changes: 16 additions & 7 deletions scalafix/input-0_12/src/main/scala/fix/FixBqSaveAsTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write._

object FixBqSaveAsTable {
val tableRef = new TableReference()
val schema: Schema = null
val writeDisposition: WriteDisposition = null
val createDisposition: CreateDisposition = null
val tableDescription: String = null
val s: Schema = null
val wd: WriteDisposition = null
val cd: CreateDisposition = null
val td: String = null

def saveAsBigQueryTable(in: SCollection[GenericRecord]): Unit =
in.saveAvroAsBigQuery(tableRef)
Expand All @@ -25,11 +25,20 @@ object FixBqSaveAsTable {
in.saveAvroAsBigQuery(table = tableRef)

def saveAsBigQueryTableMultiParamsWithoutSchema(in: SCollection[GenericRecord]): Unit =
in.saveAvroAsBigQuery(tableRef, writeDisposition = writeDisposition, createDisposition = createDisposition, tableDescription = tableDescription)
in.saveAvroAsBigQuery(tableRef, writeDisposition = wd, createDisposition = cd, tableDescription = td)

def saveAsBigQueryTableMultiParamsWithoutSchemaDiffOrder(in: SCollection[GenericRecord]): Unit =
in.saveAvroAsBigQuery(tableRef, createDisposition = createDisposition, writeDisposition = writeDisposition, tableDescription = tableDescription)
in.saveAvroAsBigQuery(tableRef, createDisposition = cd, writeDisposition = wd, tableDescription = td)

def saveAsBigQueryTableMultiParamsAllNamed(in: SCollection[GenericRecord]): Unit =
in.saveAvroAsBigQuery(table = tableRef, writeDisposition = writeDisposition, createDisposition = createDisposition, tableDescription = tableDescription)
in.saveAvroAsBigQuery(table = tableRef, writeDisposition = wd, createDisposition = cd, tableDescription = td)

def saveAsBigQueryTableMultiParamsWithSchemaUnnamed(in: SCollection[GenericRecord]): Unit =
in.saveAvroAsBigQuery(tableRef, s, wd, cd, td)

def saveAsBigQueryTableMultiParamsWithSchemaNamed(in: SCollection[GenericRecord]): Unit =
in.saveAvroAsBigQuery(tableRef, avroSchema = s, writeDisposition = wd, createDisposition = cd, tableDescription = td)

def saveAsBigQueryTableMultiParamsNamedOrderChanged(in: SCollection[GenericRecord]): Unit =
in.saveAvroAsBigQuery(tableRef, writeDisposition = wd, avroSchema = s)
}
30 changes: 0 additions & 30 deletions scalafix/input-0_12/src/main/scala/fix/LintBqSaveAsTable.scala

This file was deleted.

24 changes: 17 additions & 7 deletions scalafix/output-0_12/src/main/scala/fix/FixBqSaveAsTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write._
import com.spotify.scio.bigquery._
import com.spotify.scio.extra.bigquery.AvroConverters.toTableSchema

object FixBqSaveAsTable {
val tableRef = new TableReference()
val schema: Schema = null
val writeDisposition: WriteDisposition = null
val createDisposition: CreateDisposition = null
val tableDescription: String = null
val s: Schema = null
val wd: WriteDisposition = null
val cd: CreateDisposition = null
val td: String = null

def saveAsBigQueryTable(in: SCollection[GenericRecord]): Unit =
in.saveAsBigQueryTable(Table.Ref(tableRef))
Expand All @@ -23,12 +24,21 @@ object FixBqSaveAsTable {
in.saveAsBigQueryTable(table = Table.Ref(tableRef))

def saveAsBigQueryTableMultiParamsWithoutSchema(in: SCollection[GenericRecord]): Unit =
in.saveAsBigQueryTable(Table.Ref(tableRef), writeDisposition = writeDisposition, createDisposition = createDisposition, tableDescription = tableDescription)
in.saveAsBigQueryTable(Table.Ref(tableRef), writeDisposition = wd, createDisposition = cd, tableDescription = td)

def saveAsBigQueryTableMultiParamsWithoutSchemaDiffOrder(in: SCollection[GenericRecord]): Unit =
in.saveAsBigQueryTable(Table.Ref(tableRef), createDisposition = createDisposition, writeDisposition = writeDisposition, tableDescription = tableDescription)
in.saveAsBigQueryTable(Table.Ref(tableRef), createDisposition = cd, writeDisposition = wd, tableDescription = td)

def saveAsBigQueryTableMultiParamsAllNamed(in: SCollection[GenericRecord]): Unit =
in.saveAsBigQueryTable(table = Table.Ref(tableRef), writeDisposition = writeDisposition, createDisposition = createDisposition, tableDescription = tableDescription)
in.saveAsBigQueryTable(table = Table.Ref(tableRef), writeDisposition = wd, createDisposition = cd, tableDescription = td)

def saveAsBigQueryTableMultiParamsWithSchemaUnnamed(in: SCollection[GenericRecord]): Unit =
in.saveAsBigQueryTable(Table.Ref(tableRef), toTableSchema(s), wd, cd, td)

def saveAsBigQueryTableMultiParamsWithSchemaNamed(in: SCollection[GenericRecord]): Unit =
in.saveAsBigQueryTable(Table.Ref(tableRef), schema = toTableSchema(s), writeDisposition = wd, createDisposition = cd, tableDescription = td)

def saveAsBigQueryTableMultiParamsNamedOrderChanged(in: SCollection[GenericRecord]): Unit =
in.saveAsBigQueryTable(Table.Ref(tableRef), writeDisposition = wd, schema = toTableSchema(s))
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ fix.v0_8_0.FixBigQueryDeprecations
fix.v0_8_0.ConsistenceJoinNames
fix.v0_10_0.FixCoderPropagation
fix.v0_12_0.FixBqSaveAsTable
fix.v0_12_0.LintBqSaveAsTable
fix.v0_12_0.FixPubsubSpecializations
78 changes: 49 additions & 29 deletions scalafix/rules/src/main/scala/fix/FixBqSaveAsTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,48 @@ class FixBqSaveAsTable extends SemanticRule("FixBqSaveAsTable") {
override def fix(implicit doc: SemanticDocument): Patch = {
val patches =
doc.tree.collect {
case a @ Term.Apply(fun, head :: tail) =>
case a @ Term.Apply(fun, params) =>
if (fun.symbol.normalized.toString.contains(methodName)) {
fun match {
case Term.Select(qual, name) =>
name match {
case Term.Name("saveAvroAsBigQuery") if expectedType(qual, scoll) =>
// the rest of args should be named because the parameter order has changed
if (
tail.exists(!_.toString.contains("=")) ||
// `avroSchema` doesn't exist in the list of the new method
tail.exists(_.toString.contains("avroSchema"))
) {
Patch.empty // not possible to fix, leave it to `LintBqSaveAsTable`
} else {
val headParam = if (head.toString.contains("=")) {
s"table = Table.Ref(${head.toString.split("=").last.trim})"
} else {
s"Table.Ref($head)"
}
val allArgs = (headParam :: tail).mkString(", ")
Patch.replaceTree(a, s"$qual.saveAsBigQueryTable($allArgs)")
}
val paramsUpdated =
params
.zipWithIndex
.map { case (param, index) =>
index match {
// table is always the first param and without default value
case 0 =>
param match {
case Term.Assign((_, value)) =>
q"table = Table.Ref(${value})"
case _ =>
q"Table.Ref($param)"
}
case _ =>
param match {
case Term.Assign((name, value)) =>
// parameter name has changes from `avroSchema` to `schema`
if (name.toString == "avroSchema") {
q"schema = toTableSchema($value)"
} else {
param
}
case _ =>
// if not a named param, `avroSchema` param should come second
if (index == 1) {
q"toTableSchema($param)"
} else {
param
}
}
}
}
.mkString(", ")

Patch.replaceTree(a, s"$qual.saveAsBigQueryTable($paramsUpdated)")

case _ =>
Patch.empty
}
Expand All @@ -43,20 +63,20 @@ class FixBqSaveAsTable extends SemanticRule("FixBqSaveAsTable") {
} else {
Patch.empty
}
case Source(Pkg(_, imp :: _) :: _) =>
Patch.addGlobalImport(
importer"com.spotify.scio.bigquery._"
)
case _ => Patch.empty
}
.filter(_ != Patch.empty)
}.filter(_ != Patch.empty)

// in case the import is the only change, drop the entire patch
if (patches.size == 1) {
List()
} else {
patches
}
// in case the import is the only change, drop the entire patch
if (!patches.isEmpty) {
patches ++ List(
Patch.addGlobalImport(importer"com.spotify.scio.bigquery._"),
Patch.addGlobalImport(
importer"com.spotify.scio.extra.bigquery.AvroConverters.toTableSchema"
)
)
} else {
patches
}

}.asPatch

Expand Down
60 changes: 0 additions & 60 deletions scalafix/rules/src/main/scala/fix/LintBqSaveAsTable.scala

This file was deleted.

4 changes: 2 additions & 2 deletions site/src/main/paradox/migrations/v0.12.0-Migration-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ For usages of `saveAvroAsBigQuery`, use `saveAsBigQueryTable` from `com.spotify.
+ scoll.saveAsBigQueryTable(table)
```

Note: you can run the following sbt command to run the relevant [scalafix](https://scalacenter.github.io/scalafix/docs/developers/tutorial.html#run-the-rule-from-source-code) rules to update your BQ API usages or get a hint on how to upgrade them:
Note: you can run the following sbt command to run the relevant [scalafix](https://scalacenter.github.io/scalafix/docs/developers/tutorial.html#run-the-rule-from-source-code) rules to update your BQ API usages:

```
sbt "scalafixEnable; scalafix github:spotify/scio/FixBqSaveAsTable github:spotify/scio/LintBqSaveAsTable"
sbt "scalafixEnable; scalafix github:spotify/scio/FixBqSaveAsTable"
```

## Removal of `com.spotify.scio.pubsub` specializations
Expand Down

0 comments on commit a68b370

Please sign in to comment.