Skip to content

Commit

Permalink
apply/rollback commands, progress logging
Browse files Browse the repository at this point in the history
Also moved to using instance methods
rather than migrate package funcs.
  • Loading branch information
josephbuchma committed Jan 18, 2018
1 parent 95d347d commit 338f81b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 19 deletions.
8 changes: 6 additions & 2 deletions commands/commands.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package journey

import "github.com/urfave/cli"
import (
"context"

func Commands() cli.Commands {
"github.com/urfave/cli"
)

func Commands(ctx context.Context) cli.Commands {
return cli.Commands{
{
Name: "migrate",
Expand Down
158 changes: 141 additions & 17 deletions commands/migrate_commands.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package journey

import (
"context"
"os"
"os/signal"
"strconv"
"strings"

log "github.com/sirupsen/logrus"

"github.com/db-journey/migrate"
"github.com/db-journey/migrate/file"
"github.com/urfave/cli"
)

Expand All @@ -30,6 +34,8 @@ func MigrateCommands() cli.Commands {
redoCommand,
versionCommand,
migrateCommand,
applyCommand,
rollbackCommand,
gotoCommand,
}
}
Expand All @@ -50,7 +56,10 @@ var createCommand = cli.Command{
name = strings.Join(ctx.Args(), "_")
}

migrationFile, err := migrate.Create(ctx.GlobalString("url"), ctx.GlobalString("path"), name)
migrate, _, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()

migrationFile, err := migrate.Create(name)
if err != nil {
logErr(err).Fatal("Migration failed")
}
Expand All @@ -69,11 +78,13 @@ var upCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Applying all -up- migrations")
err := migrate.Up(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Up(mctx)
if err != nil {
logErr(err).Fatal("Failed to apply all -up- migrations")
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -84,11 +95,13 @@ var downCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Applying all -down- migrations")
err := migrate.Down(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Down(mctx)
if err != nil {
logErr(err).Fatal("Failed to apply all -down- migrations")
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -100,9 +113,11 @@ var redoCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Redoing last migration")
err := migrate.Redo(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Redo(mctx)
logErr(err).Fatal("Failed to redo last migration")
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -113,7 +128,9 @@ var versionCommand = cli.Command{
Usage: "Show current migration version",
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
version, err := migrate.Version(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
version, err := migrate.Version(mctx)
if err != nil {
logErr(err).Fatal("Unable to fetch version")
}
Expand All @@ -129,11 +146,13 @@ var resetCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Reseting database")
err := migrate.Redo(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Redo(mctx)
if err != nil {
logErr(err).Fatal("Failed to reset database")
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -154,11 +173,67 @@ var migrateCommand = cli.Command{

log.Infof("Applying %d migrations", relativeNInt)

err = migrate.Migrate(ctx.GlobalString("url"), ctx.GlobalString("path"), relativeNInt)
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err = migrate.Migrate(mctx, relativeNInt)
if err != nil {
logErr(err).Fatalf("Failed to apply %d migrations", relativeNInt)
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}

var applyCommand = cli.Command{
Name: "apply",
Aliases: []string{"a"},
Usage: "Run up migration for specific version",
ArgsUsage: "<version>",
Flags: MigrateFlags,
SkipFlagParsing: true,
Action: func(ctx *cli.Context) error {
version := ctx.Args().First()
versionInt, err := strconv.Atoi(version)
if err != nil {
logErr(err).Fatal("Unable to parse param <n>")
}

log.Infof("Applying version %d", versionInt)

migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err = migrate.ApplyVersion(mctx, file.Version(versionInt))
if err != nil {
logErr(err).Fatalf("Failed to apply version %d", versionInt)
}
logCurrentVersion(mctx, migrate)
return nil
},
}

var rollbackCommand = cli.Command{
Name: "rollback",
Aliases: []string{"r"},
Usage: "Run down migration for specific version",
ArgsUsage: "<version>",
Flags: MigrateFlags,
SkipFlagParsing: true,
Action: func(ctx *cli.Context) error {
version := ctx.Args().First()
versionInt, err := strconv.Atoi(version)
if err != nil {
logErr(err).Fatal("Unable to parse param <n>")
}

log.Infof("Applying version %d", versionInt)

migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err = migrate.RollbackVersion(mctx, file.Version(versionInt))
if err != nil {
logErr(err).Fatalf("Failed to rollback version %d", versionInt)
}
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -178,28 +253,77 @@ var gotoCommand = cli.Command{

log.Infof("Migrating to version %d", toVersionInt)

currentVersion, err := migrate.Version(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
currentVersion, err := migrate.Version(mctx)
if err != nil {
logErr(err).Fatalf("failed to migrate to version %d", toVersionInt)
}

relativeNInt := toVersionInt - int(currentVersion)

err = migrate.Migrate(ctx.GlobalString("url"), ctx.GlobalString("path"), relativeNInt)
err = migrate.Migrate(mctx, relativeNInt)
if err != nil {
logErr(err).Fatalf("Failed to migrate to vefrsion %d", toVersionInt)
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}

func newMigrateWithCtx(url, migrationsPath string) (*migrate.Handle, context.Context, func()) {
done := make(chan struct{})
m, err := migrate.Open(url, migrationsPath, migrate.WithHooks(
func(f file.File) error {
log.Infof("Applying %s migration for version %d (%s)", f.Direction, f.Version, f.Name)
return nil
},
func(f file.File) error {
done <- struct{}{}
return nil
},
))
if err != nil {
log.Fatalf("Initialization failed: %s", err)
}
ctx, cancel := newOsInterruptCtx(done)
return m, ctx, cancel
}

// newOsInterruptCtx returns new context that will be cancelled
// on os.Interrupt signal.
func newOsInterruptCtx(done <-chan struct{}) (context.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
exit := false
for loop := true; loop; {
select {
case <-done:
if exit {
loop = false
}
case <-c:
if exit {
os.Exit(5)
}
cancel()
exit = true
log.Info("Aborting after this migration... Hit again to force quit.")
}
}
signal.Stop(c)
}()
return ctx, cancel
}

func logErr(err error) *log.Entry {
return log.WithError(err)
}

func logCurrentVersion(url, migrationsPath string) {
version, err := migrate.Version(url, migrationsPath)
func logCurrentVersion(ctx context.Context, migrate *migrate.Handle) {
version, err := migrate.Version(ctx)
if err != nil {
logErr(err).Fatal("Unable to fetch version")
}
Expand Down

0 comments on commit 338f81b

Please sign in to comment.