diff --git a/commands/commands.go b/commands/commands.go index 145a87f..45bdf2c 100644 --- a/commands/commands.go +++ b/commands/commands.go @@ -1,8 +1,12 @@ package journey -import "github.com/urfave/cli" +import ( + "context" -func Commands() cli.Commands { + "github.com/urfave/cli" +) + +func Commands(ctx context.Context) cli.Commands { return cli.Commands{ { Name: "migrate", diff --git a/commands/migrate_commands.go b/commands/migrate_commands.go index 82ca700..9c51e7a 100644 --- a/commands/migrate_commands.go +++ b/commands/migrate_commands.go @@ -1,12 +1,16 @@ package journey import ( + "context" + "os" + "os/signal" "strconv" "strings" log "github.com/sirupsen/logrus" "github.com/db-journey/migrate" + "github.com/db-journey/migrate/file" "github.com/urfave/cli" ) @@ -30,6 +34,8 @@ func MigrateCommands() cli.Commands { redoCommand, versionCommand, migrateCommand, + applyCommand, + rollbackCommand, gotoCommand, } } @@ -50,7 +56,10 @@ var createCommand = cli.Command{ name = strings.Join(ctx.Args(), "_") } - migrationFile, err := migrate.Create(ctx.GlobalString("url"), ctx.GlobalString("path"), name) + migrate, _, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + + migrationFile, err := migrate.Create(name) if err != nil { logErr(err).Fatal("Migration failed") } @@ -69,11 +78,13 @@ var upCommand = cli.Command{ Flags: MigrateFlags, Action: func(ctx *cli.Context) error { log.Info("Applying all -up- migrations") - err := migrate.Up(ctx.GlobalString("url"), ctx.GlobalString("path")) + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + err := migrate.Up(mctx) if err != nil { logErr(err).Fatal("Failed to apply all -up- migrations") } - logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path")) + logCurrentVersion(mctx, migrate) return nil }, } @@ -84,11 +95,13 @@ var downCommand = cli.Command{ Flags: MigrateFlags, Action: func(ctx *cli.Context) error { log.Info("Applying all -down- migrations") - err := migrate.Down(ctx.GlobalString("url"), ctx.GlobalString("path")) + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + err := migrate.Down(mctx) if err != nil { logErr(err).Fatal("Failed to apply all -down- migrations") } - logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path")) + logCurrentVersion(mctx, migrate) return nil }, } @@ -100,9 +113,11 @@ var redoCommand = cli.Command{ Flags: MigrateFlags, Action: func(ctx *cli.Context) error { log.Info("Redoing last migration") - err := migrate.Redo(ctx.GlobalString("url"), ctx.GlobalString("path")) + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + err := migrate.Redo(mctx) logErr(err).Fatal("Failed to redo last migration") - logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path")) + logCurrentVersion(mctx, migrate) return nil }, } @@ -113,7 +128,9 @@ var versionCommand = cli.Command{ Usage: "Show current migration version", Flags: MigrateFlags, Action: func(ctx *cli.Context) error { - version, err := migrate.Version(ctx.GlobalString("url"), ctx.GlobalString("path")) + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + version, err := migrate.Version(mctx) if err != nil { logErr(err).Fatal("Unable to fetch version") } @@ -129,11 +146,13 @@ var resetCommand = cli.Command{ Flags: MigrateFlags, Action: func(ctx *cli.Context) error { log.Info("Reseting database") - err := migrate.Redo(ctx.GlobalString("url"), ctx.GlobalString("path")) + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + err := migrate.Redo(mctx) if err != nil { logErr(err).Fatal("Failed to reset database") } - logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path")) + logCurrentVersion(mctx, migrate) return nil }, } @@ -154,11 +173,67 @@ var migrateCommand = cli.Command{ log.Infof("Applying %d migrations", relativeNInt) - err = migrate.Migrate(ctx.GlobalString("url"), ctx.GlobalString("path"), relativeNInt) + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + err = migrate.Migrate(mctx, relativeNInt) if err != nil { logErr(err).Fatalf("Failed to apply %d migrations", relativeNInt) } - logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path")) + logCurrentVersion(mctx, migrate) + return nil + }, +} + +var applyCommand = cli.Command{ + Name: "apply", + Aliases: []string{"a"}, + Usage: "Run up migration for specific version", + ArgsUsage: "", + Flags: MigrateFlags, + SkipFlagParsing: true, + Action: func(ctx *cli.Context) error { + version := ctx.Args().First() + versionInt, err := strconv.Atoi(version) + if err != nil { + logErr(err).Fatal("Unable to parse param ") + } + + log.Infof("Applying version %d", versionInt) + + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + err = migrate.ApplyVersion(mctx, file.Version(versionInt)) + if err != nil { + logErr(err).Fatalf("Failed to apply version %d", versionInt) + } + logCurrentVersion(mctx, migrate) + return nil + }, +} + +var rollbackCommand = cli.Command{ + Name: "rollback", + Aliases: []string{"r"}, + Usage: "Run down migration for specific version", + ArgsUsage: "", + Flags: MigrateFlags, + SkipFlagParsing: true, + Action: func(ctx *cli.Context) error { + version := ctx.Args().First() + versionInt, err := strconv.Atoi(version) + if err != nil { + logErr(err).Fatal("Unable to parse param ") + } + + log.Infof("Applying version %d", versionInt) + + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + err = migrate.RollbackVersion(mctx, file.Version(versionInt)) + if err != nil { + logErr(err).Fatalf("Failed to rollback version %d", versionInt) + } + logCurrentVersion(mctx, migrate) return nil }, } @@ -178,28 +253,77 @@ var gotoCommand = cli.Command{ log.Infof("Migrating to version %d", toVersionInt) - currentVersion, err := migrate.Version(ctx.GlobalString("url"), ctx.GlobalString("path")) + migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path")) + defer cancel() + currentVersion, err := migrate.Version(mctx) if err != nil { logErr(err).Fatalf("failed to migrate to version %d", toVersionInt) } relativeNInt := toVersionInt - int(currentVersion) - err = migrate.Migrate(ctx.GlobalString("url"), ctx.GlobalString("path"), relativeNInt) + err = migrate.Migrate(mctx, relativeNInt) if err != nil { logErr(err).Fatalf("Failed to migrate to vefrsion %d", toVersionInt) } - logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path")) + logCurrentVersion(mctx, migrate) return nil }, } +func newMigrateWithCtx(url, migrationsPath string) (*migrate.Handle, context.Context, func()) { + done := make(chan struct{}) + m, err := migrate.Open(url, migrationsPath, migrate.WithHooks( + func(f file.File) error { + log.Infof("Applying %s migration for version %d (%s)", f.Direction, f.Version, f.Name) + return nil + }, + func(f file.File) error { + done <- struct{}{} + return nil + }, + )) + if err != nil { + log.Fatalf("Initialization failed: %s", err) + } + ctx, cancel := newOsInterruptCtx(done) + return m, ctx, cancel +} + +// newOsInterruptCtx returns new context that will be cancelled +// on os.Interrupt signal. +func newOsInterruptCtx(done <-chan struct{}) (context.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + exit := false + for loop := true; loop; { + select { + case <-done: + if exit { + loop = false + } + case <-c: + if exit { + os.Exit(5) + } + cancel() + exit = true + log.Info("Aborting after this migration... Hit again to force quit.") + } + } + signal.Stop(c) + }() + return ctx, cancel +} + func logErr(err error) *log.Entry { return log.WithError(err) } -func logCurrentVersion(url, migrationsPath string) { - version, err := migrate.Version(url, migrationsPath) +func logCurrentVersion(ctx context.Context, migrate *migrate.Handle) { + version, err := migrate.Version(ctx) if err != nil { logErr(err).Fatal("Unable to fetch version") }