From de40ab7f9a54392d6756de7eda2f46dfd91f67c7 Mon Sep 17 00:00:00 2001 From: Zen Yui Date: Thu, 20 May 2021 10:13:08 -0400 Subject: [PATCH] auto create COS schema on boot (#357) --- .../chiefofstate/migration/Migrator.scala | 31 +++++++++++++++-- .../migration/versions/v6/V6.scala | 6 ++++ .../chiefofstate/migration/MigratorSpec.scala | 34 +++++++++++++++---- .../migration/helper/DbHelper.scala | 30 ++++++++++++++++ .../chiefofstate/ServiceBootstrapper.scala | 2 +- .../chiefofstate/ServiceMigrationRunner.scala | 4 ++- 6 files changed, 95 insertions(+), 12 deletions(-) diff --git a/code/migration/src/main/scala/com/namely/chiefofstate/migration/Migrator.scala b/code/migration/src/main/scala/com/namely/chiefofstate/migration/Migrator.scala index cbd4274b..003f5353 100644 --- a/code/migration/src/main/scala/com/namely/chiefofstate/migration/Migrator.scala +++ b/code/migration/src/main/scala/com/namely/chiefofstate/migration/Migrator.scala @@ -18,7 +18,13 @@ import scala.concurrent.{ Await, Future } import scala.concurrent.duration.Duration import scala.util.{ Success, Try } -class Migrator(val journalDbConfig: DatabaseConfig[JdbcProfile]) { +/** + * Runs the provided migrations + * + * @param journalDbConfig a jdbc config + * @param schema the schema name + */ +class Migrator(val journalDbConfig: DatabaseConfig[JdbcProfile], schema: String) { val logger: Logger = Migrator.logger // create the priority queue of versions sorted by version number @@ -53,9 +59,11 @@ class Migrator(val journalDbConfig: DatabaseConfig[JdbcProfile]) { * runs before all migration steps, used to configure the migrator state, etc. */ private[migration] def beforeAll(): Try[Unit] = { - // create the versions table + // create the schema Migrator - .createMigrationsTable(journalDbConfig) + .createSchema(journalDbConfig, schema) + // create the versions table + .flatMap(_ => Migrator.createMigrationsTable(journalDbConfig)) // set initial version if provided .flatMap(_ => Migrator.setInitialVersion(journalDbConfig)) } @@ -240,4 +248,21 @@ object Migrator { } } } + + /** + * creates the COS schema + * + * @param journalJdbcConfig a journal jdbc config + * @return success/failure + */ + private[migration] def createSchema(journalJdbcConfig: DatabaseConfig[JdbcProfile], schema: String): Try[Unit] = Try { + logger.info(s"creating schema '$schema' if not exists") + // create the schema + val conn = journalJdbcConfig.db.source.createConnection() + try { + conn.createStatement().execute(s"create schema if not exists $schema") + } finally { + conn.close() + } + } } diff --git a/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v6/V6.scala b/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v6/V6.scala index 67496961..157711f3 100644 --- a/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v6/V6.scala +++ b/code/migration/src/main/scala/com/namely/chiefofstate/migration/versions/v6/V6.scala @@ -11,6 +11,12 @@ import slick.basic.DatabaseConfig import slick.dbio.DBIO import slick.jdbc.JdbcProfile +/** + * V6 migration + * + * @param journalJdbcConfig a db config + * @param schema the COS schema name + */ case class V6(journalJdbcConfig: DatabaseConfig[JdbcProfile]) extends Version { final val log: Logger = LoggerFactory.getLogger(getClass) override def versionNumber: Int = 6 diff --git a/code/migration/src/test/scala/com/namely/chiefofstate/migration/MigratorSpec.scala b/code/migration/src/test/scala/com/namely/chiefofstate/migration/MigratorSpec.scala index 904d3f55..16d51efb 100644 --- a/code/migration/src/test/scala/com/namely/chiefofstate/migration/MigratorSpec.scala +++ b/code/migration/src/test/scala/com/namely/chiefofstate/migration/MigratorSpec.scala @@ -20,6 +20,7 @@ import java.sql.DriverManager import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.util.Success +import com.namely.chiefofstate.migration.helper.DbHelper class MigratorSpec extends BaseSpec with ForAllTestContainer { @@ -85,7 +86,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { ".addVersion" should { "add to the versions queue in order" in { val dbConfig = getDbConfig() - val migrator: Migrator = new Migrator(dbConfig) + val migrator: Migrator = new Migrator(dbConfig, cosSchema) // add versions out of order migrator.addVersion(getMockVersion(2)) @@ -105,7 +106,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { ".getVersions" should { "filter versions" in { val dbConfig = getDbConfig() - val migrator: Migrator = new Migrator(dbConfig) + val migrator: Migrator = new Migrator(dbConfig, cosSchema) // add versions migrator.addVersion(getMockVersion(1)) @@ -127,7 +128,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { ".beforeAll" should { "create the versions table" in { val dbConfig = getDbConfig() - val migrator = new Migrator(dbConfig) + val migrator = new Migrator(dbConfig, cosSchema) DbUtil.tableExists(dbConfig, Migrator.COS_MIGRATIONS_TABLE) shouldBe false @@ -140,7 +141,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { setEnv(Migrator.COS_MIGRATIONS_INITIAL_VERSION, "3") val dbConfig = getDbConfig() - val migrator = new Migrator(dbConfig) + val migrator = new Migrator(dbConfig, cosSchema) migrator.beforeAll().isSuccess shouldBe true @@ -165,7 +166,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { .once() // define a migrator with two versions - val migrator = new Migrator(dbConfig).addVersion(version1).addVersion(version2) + val migrator = new Migrator(dbConfig, cosSchema).addVersion(version1).addVersion(version2) val result = migrator.run() @@ -182,7 +183,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { Migrator.getCurrentVersionNumber(dbConfig) shouldBe Some(1) // define a migrator - val migrator = new Migrator(dbConfig) + val migrator = new Migrator(dbConfig, cosSchema) // define 3 dynamic versions (2 to 4).foreach(versionNumber => { @@ -208,7 +209,10 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { // define a migrator with versions that should not run (nothing mocked) val migrator = - new Migrator(dbConfig).addVersion(getMockVersion(1)).addVersion(getMockVersion(2)).addVersion(getMockVersion(3)) + new Migrator(dbConfig, cosSchema) + .addVersion(getMockVersion(1)) + .addVersion(getMockVersion(2)) + .addVersion(getMockVersion(3)) // set db version number to the highest version Migrator.createMigrationsTable(dbConfig) @@ -430,4 +434,20 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer { actual.failed.map(_.getMessage.endsWith("cannot be 'X'")) shouldBe Success(true) } } + + ".createSchema" should { + "create the schema if not exists" in { + // first, drop the schema + DbHelper.dropSchema(container, cosSchema) + // ensure schema not exists + DbHelper.schemaExists(container, cosSchema) shouldBe false + // construct db config + val dbConfig: DatabaseConfig[JdbcProfile] = + dbConfigFromUrl(container.jdbcUrl, container.username, container.password) + // run create schema + Migrator.createSchema(dbConfig, cosSchema) + // ensure exists + DbHelper.schemaExists(container, cosSchema) shouldBe true + } + } } diff --git a/code/migration/src/test/scala/com/namely/chiefofstate/migration/helper/DbHelper.scala b/code/migration/src/test/scala/com/namely/chiefofstate/migration/helper/DbHelper.scala index 6f882a04..a0598dad 100644 --- a/code/migration/src/test/scala/com/namely/chiefofstate/migration/helper/DbHelper.scala +++ b/code/migration/src/test/scala/com/namely/chiefofstate/migration/helper/DbHelper.scala @@ -9,6 +9,7 @@ package com.namely.chiefofstate.migration.helper import com.namely.protobuf.chiefofstate.v1.persistence.{ EventWrapper, StateWrapper } import com.dimafeng.testcontainers.{ ForAllTestContainer, PostgreSQLContainer } import java.sql.{ Connection, DriverManager } +import org.checkerframework.checker.units.qual.s object DbHelper { val serializerId = 5001 @@ -88,4 +89,33 @@ object DbHelper { statement.executeBatch() conn.close() } + + // drop the schema for creation testing + def dropSchema(container: PostgreSQLContainer, schema: String): Unit = { + val conn = getConnection(container) + val statement = conn.createStatement() + statement.addBatch(s"drop schema if exists $schema cascade") + statement.executeBatch() + conn.close() + } + + /** + * returns true if schema exists (for testing) + * + * @param container test container for postgres + * @param schema schema name + * @return true if schema exists + */ + def schemaExists(container: PostgreSQLContainer, schema: String): Boolean = { + val conn = getConnection(container) + val statement = conn.createStatement() + val sql = s"SELECT count(schema_name) FROM information_schema.schemata WHERE schema_name = '$schema';" + val result = statement.executeQuery(sql) + require(result.next, "broken resultset") + val numSchemas = result.getInt(1) + require(numSchemas < 2, "broken resultset") + conn.close() + // return true if found + numSchemas == 1 + } } diff --git a/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala b/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala index 37f377f2..86df6db5 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala @@ -58,7 +58,7 @@ object ServiceBootstrapper { TelemetryTools(cosConfig.telemetryConfig).start() // We only proceed when the data stores and various migrations are done successfully. - log.info("Journal and snapshot store created successfully. About to start...") + log.info("Data store migration complete. About to start...") val channel: ManagedChannel = NettyHelper diff --git a/code/service/src/main/scala/com/namely/chiefofstate/ServiceMigrationRunner.scala b/code/service/src/main/scala/com/namely/chiefofstate/ServiceMigrationRunner.scala index 9fe5ddcd..00694b99 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/ServiceMigrationRunner.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/ServiceMigrationRunner.scala @@ -64,6 +64,8 @@ object ServiceMigrationRunner { val priorOffsetStoreName: String = config.getString("chiefofstate.migration.v1.slick.offset-store.table") + val schema: String = config.getString("jdbc-default.schema") + val v1: V1 = V1(journalJdbcConfig, priorProjectionJdbcConfig, priorOffsetStoreName) val v2: V2 = V2(journalJdbcConfig, projectionJdbcConfig)(context.system) val v3: V3 = V3(journalJdbcConfig) @@ -73,7 +75,7 @@ object ServiceMigrationRunner { // instance of the migrator val migrator: Migrator = - new Migrator(journalJdbcConfig) + new Migrator(journalJdbcConfig, schema) .addVersion(v1) .addVersion(v2) .addVersion(v3)