From 0b3510546af227b41c84ed374dc40aeb5056c49e Mon Sep 17 00:00:00 2001 From: Zen Yui Date: Tue, 6 Apr 2021 17:40:23 -0400 Subject: [PATCH] V1 migration works if old offset table does not exist (#294) --- .../migration/versions/v1/V1.scala | 19 +++++++++---------- .../migration/versions/v1/V1Spec.scala | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v1/V1.scala b/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v1/V1.scala index 806f6f5d..24d7fe87 100644 --- a/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v1/V1.scala +++ b/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v1/V1.scala @@ -76,17 +76,16 @@ case class V1( /** * move the data from the old read side offset store table to the new temporay offset store table by * reading all records into memory. - * - * @return the number of the record inserted into the table */ - private def migrate(): Int = { - // let us fetch the records - val data: Seq[OffsetRow] = fetchOffsetRows() - - log.info(s"num records migrating to $tempTable: ${data.size}") - - // let us insert the data into the temporary table - insertInto(tempTable, journalJdbcConfig, data) + private[v1] def migrate(): Unit = { + val oldTable: String = transformTableName() + if (DbUtil.tableExists(projectionJdbcConfig, oldTable)) { + // let us fetch the records + val data: Seq[OffsetRow] = fetchOffsetRows() + log.info(s"num records migrating to $tempTable: ${data.size}") + // let us insert the data into the temporary table + insertInto(tempTable, journalJdbcConfig, data) + } } /** diff --git a/code/migration/src/test/scala/com/namely/chiefofstate/migration/versions/v1/V1Spec.scala b/code/migration/src/test/scala/com/namely/chiefofstate/migration/versions/v1/V1Spec.scala index becb2188..8a8907d5 100644 --- a/code/migration/src/test/scala/com/namely/chiefofstate/migration/versions/v1/V1Spec.scala +++ b/code/migration/src/test/scala/com/namely/chiefofstate/migration/versions/v1/V1Spec.scala @@ -184,6 +184,20 @@ class V1Spec extends BaseSpec with ForAllTestContainer { count(journalJdbcConfig) shouldBe 3 } + "succeed if old table does not exist" in { + val v1: V1 = V1(journalJdbcConfig, projectionJdbcConfig, "not_a_table") + // then run beforeUpgrade + v1.beforeUpgrade().isSuccess shouldBe true + // assert no records + count(journalJdbcConfig) shouldBe 0 + } + } + ".migrate" should { + "succeed even if old table does not exist" in { + val v1: V1 = V1(journalJdbcConfig, projectionJdbcConfig, "not_a_table") + // create the old read side offset store + noException shouldBe thrownBy(v1.migrate()) + } } ".upgrade" should { "drop the old table and index" in {