Skip to content

Commit

Permalink
auto create COS schema on boot (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zen Yui authored May 20, 2021
1 parent 8fe0de7 commit de40ab7
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import scala.util.{ Success, Try }

class Migrator(val journalDbConfig: DatabaseConfig[JdbcProfile]) {
/**
* Runs the provided migrations
*
* @param journalDbConfig a jdbc config
* @param schema the schema name
*/
class Migrator(val journalDbConfig: DatabaseConfig[JdbcProfile], schema: String) {
val logger: Logger = Migrator.logger

// create the priority queue of versions sorted by version number
Expand Down Expand Up @@ -53,9 +59,11 @@ class Migrator(val journalDbConfig: DatabaseConfig[JdbcProfile]) {
* runs before all migration steps, used to configure the migrator state, etc.
*/
private[migration] def beforeAll(): Try[Unit] = {
// create the versions table
// create the schema
Migrator
.createMigrationsTable(journalDbConfig)
.createSchema(journalDbConfig, schema)
// create the versions table
.flatMap(_ => Migrator.createMigrationsTable(journalDbConfig))
// set initial version if provided
.flatMap(_ => Migrator.setInitialVersion(journalDbConfig))
}
Expand Down Expand Up @@ -240,4 +248,21 @@ object Migrator {
}
}
}

/**
* creates the COS schema
*
* @param journalJdbcConfig a journal jdbc config
* @return success/failure
*/
private[migration] def createSchema(journalJdbcConfig: DatabaseConfig[JdbcProfile], schema: String): Try[Unit] = Try {
logger.info(s"creating schema '$schema' if not exists")
// create the schema
val conn = journalJdbcConfig.db.source.createConnection()
try {
conn.createStatement().execute(s"create schema if not exists $schema")
} finally {
conn.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import slick.basic.DatabaseConfig
import slick.dbio.DBIO
import slick.jdbc.JdbcProfile

/**
* V6 migration
*
* @param journalJdbcConfig a db config
* @param schema the COS schema name
*/
case class V6(journalJdbcConfig: DatabaseConfig[JdbcProfile]) extends Version {
final val log: Logger = LoggerFactory.getLogger(getClass)
override def versionNumber: Int = 6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.sql.DriverManager
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Success
import com.namely.chiefofstate.migration.helper.DbHelper

class MigratorSpec extends BaseSpec with ForAllTestContainer {

Expand Down Expand Up @@ -85,7 +86,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {
".addVersion" should {
"add to the versions queue in order" in {
val dbConfig = getDbConfig()
val migrator: Migrator = new Migrator(dbConfig)
val migrator: Migrator = new Migrator(dbConfig, cosSchema)

// add versions out of order
migrator.addVersion(getMockVersion(2))
Expand All @@ -105,7 +106,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {
".getVersions" should {
"filter versions" in {
val dbConfig = getDbConfig()
val migrator: Migrator = new Migrator(dbConfig)
val migrator: Migrator = new Migrator(dbConfig, cosSchema)

// add versions
migrator.addVersion(getMockVersion(1))
Expand All @@ -127,7 +128,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {
".beforeAll" should {
"create the versions table" in {
val dbConfig = getDbConfig()
val migrator = new Migrator(dbConfig)
val migrator = new Migrator(dbConfig, cosSchema)

DbUtil.tableExists(dbConfig, Migrator.COS_MIGRATIONS_TABLE) shouldBe false

Expand All @@ -140,7 +141,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {
setEnv(Migrator.COS_MIGRATIONS_INITIAL_VERSION, "3")

val dbConfig = getDbConfig()
val migrator = new Migrator(dbConfig)
val migrator = new Migrator(dbConfig, cosSchema)

migrator.beforeAll().isSuccess shouldBe true

Expand All @@ -165,7 +166,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {
.once()

// define a migrator with two versions
val migrator = new Migrator(dbConfig).addVersion(version1).addVersion(version2)
val migrator = new Migrator(dbConfig, cosSchema).addVersion(version1).addVersion(version2)

val result = migrator.run()

Expand All @@ -182,7 +183,7 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {
Migrator.getCurrentVersionNumber(dbConfig) shouldBe Some(1)

// define a migrator
val migrator = new Migrator(dbConfig)
val migrator = new Migrator(dbConfig, cosSchema)

// define 3 dynamic versions
(2 to 4).foreach(versionNumber => {
Expand All @@ -208,7 +209,10 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {

// define a migrator with versions that should not run (nothing mocked)
val migrator =
new Migrator(dbConfig).addVersion(getMockVersion(1)).addVersion(getMockVersion(2)).addVersion(getMockVersion(3))
new Migrator(dbConfig, cosSchema)
.addVersion(getMockVersion(1))
.addVersion(getMockVersion(2))
.addVersion(getMockVersion(3))

// set db version number to the highest version
Migrator.createMigrationsTable(dbConfig)
Expand Down Expand Up @@ -430,4 +434,20 @@ class MigratorSpec extends BaseSpec with ForAllTestContainer {
actual.failed.map(_.getMessage.endsWith("cannot be 'X'")) shouldBe Success(true)
}
}

".createSchema" should {
"create the schema if not exists" in {
// first, drop the schema
DbHelper.dropSchema(container, cosSchema)
// ensure schema not exists
DbHelper.schemaExists(container, cosSchema) shouldBe false
// construct db config
val dbConfig: DatabaseConfig[JdbcProfile] =
dbConfigFromUrl(container.jdbcUrl, container.username, container.password)
// run create schema
Migrator.createSchema(dbConfig, cosSchema)
// ensure exists
DbHelper.schemaExists(container, cosSchema) shouldBe true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package com.namely.chiefofstate.migration.helper
import com.namely.protobuf.chiefofstate.v1.persistence.{ EventWrapper, StateWrapper }
import com.dimafeng.testcontainers.{ ForAllTestContainer, PostgreSQLContainer }
import java.sql.{ Connection, DriverManager }
import org.checkerframework.checker.units.qual.s

object DbHelper {
val serializerId = 5001
Expand Down Expand Up @@ -88,4 +89,33 @@ object DbHelper {
statement.executeBatch()
conn.close()
}

// drop the schema for creation testing
def dropSchema(container: PostgreSQLContainer, schema: String): Unit = {
val conn = getConnection(container)
val statement = conn.createStatement()
statement.addBatch(s"drop schema if exists $schema cascade")
statement.executeBatch()
conn.close()
}

/**
* returns true if schema exists (for testing)
*
* @param container test container for postgres
* @param schema schema name
* @return true if schema exists
*/
def schemaExists(container: PostgreSQLContainer, schema: String): Boolean = {
val conn = getConnection(container)
val statement = conn.createStatement()
val sql = s"SELECT count(schema_name) FROM information_schema.schemata WHERE schema_name = '$schema';"
val result = statement.executeQuery(sql)
require(result.next, "broken resultset")
val numSchemas = result.getInt(1)
require(numSchemas < 2, "broken resultset")
conn.close()
// return true if found
numSchemas == 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object ServiceBootstrapper {
TelemetryTools(cosConfig.telemetryConfig).start()

// We only proceed when the data stores and various migrations are done successfully.
log.info("Journal and snapshot store created successfully. About to start...")
log.info("Data store migration complete. About to start...")

val channel: ManagedChannel =
NettyHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ object ServiceMigrationRunner {
val priorOffsetStoreName: String =
config.getString("chiefofstate.migration.v1.slick.offset-store.table")

val schema: String = config.getString("jdbc-default.schema")

val v1: V1 = V1(journalJdbcConfig, priorProjectionJdbcConfig, priorOffsetStoreName)
val v2: V2 = V2(journalJdbcConfig, projectionJdbcConfig)(context.system)
val v3: V3 = V3(journalJdbcConfig)
Expand All @@ -73,7 +75,7 @@ object ServiceMigrationRunner {

// instance of the migrator
val migrator: Migrator =
new Migrator(journalJdbcConfig)
new Migrator(journalJdbcConfig, schema)
.addVersion(v1)
.addVersion(v2)
.addVersion(v3)
Expand Down

0 comments on commit de40ab7

Please sign in to comment.