Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 #9

Merged
merged 2 commits into from
Jan 23, 2018
Merged

v2 #9

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion commands/commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package journey

import "github.com/urfave/cli"
import (
"github.com/urfave/cli"
)

func Commands() cli.Commands {
return cli.Commands{
Expand Down
218 changes: 145 additions & 73 deletions commands/migrate_commands.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package journey

import (
"context"
"os"
"os/signal"
"strconv"
"strings"

log "github.com/sirupsen/logrus"

"github.com/db-journey/migrate"
"github.com/db-journey/migrate/direction"
"github.com/db-journey/migrate/file"
pipep "github.com/db-journey/migrate/pipe"
"github.com/urfave/cli"
)

Expand All @@ -34,6 +34,8 @@ func MigrateCommands() cli.Commands {
redoCommand,
versionCommand,
migrateCommand,
applyCommand,
rollbackCommand,
gotoCommand,
}
}
Expand All @@ -54,7 +56,10 @@ var createCommand = cli.Command{
name = strings.Join(ctx.Args(), "_")
}

migrationFile, err := migrate.Create(ctx.GlobalString("url"), ctx.GlobalString("path"), name)
migrate, _, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()

migrationFile, err := migrate.Create(name)
if err != nil {
logErr(err).Fatal("Migration failed")
}
Expand All @@ -73,13 +78,13 @@ var upCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Applying all -up- migrations")
pipe := pipep.New()
go migrate.Up(pipe, ctx.GlobalString("url"), ctx.GlobalString("path"))
ok := readPipe(pipe)
if !ok {
os.Exit(1)
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Up(mctx)
if err != nil {
logErr(err).Fatal("Failed to apply all -up- migrations")
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -90,13 +95,13 @@ var downCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Applying all -down- migrations")
pipe := pipep.New()
go migrate.Down(pipe, ctx.GlobalString("url"), ctx.GlobalString("path"))
ok := readPipe(pipe)
if !ok {
os.Exit(1)
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Down(mctx)
if err != nil {
logErr(err).Fatal("Failed to apply all -down- migrations")
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -108,13 +113,11 @@ var redoCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Redoing last migration")
pipe := pipep.New()
go migrate.Redo(pipe, ctx.GlobalString("url"), ctx.GlobalString("path"))
ok := readPipe(pipe)
if !ok {
os.Exit(1)
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Redo(mctx)
logErr(err).Fatal("Failed to redo last migration")
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -125,7 +128,9 @@ var versionCommand = cli.Command{
Usage: "Show current migration version",
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
version, err := migrate.Version(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
version, err := migrate.Version(mctx)
if err != nil {
logErr(err).Fatal("Unable to fetch version")
}
Expand All @@ -141,13 +146,13 @@ var resetCommand = cli.Command{
Flags: MigrateFlags,
Action: func(ctx *cli.Context) error {
log.Info("Reseting database")
pipe := pipep.New()
go migrate.Redo(pipe, ctx.GlobalString("url"), ctx.GlobalString("path"))
ok := readPipe(pipe)
if !ok {
os.Exit(1)
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err := migrate.Redo(mctx)
if err != nil {
logErr(err).Fatal("Failed to reset database")
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -168,13 +173,67 @@ var migrateCommand = cli.Command{

log.Infof("Applying %d migrations", relativeNInt)

pipe := pipep.New()
go migrate.Migrate(pipe, ctx.GlobalString("url"), ctx.GlobalString("path"), relativeNInt)
ok := readPipe(pipe)
if !ok {
os.Exit(1)
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err = migrate.Migrate(mctx, relativeNInt)
if err != nil {
logErr(err).Fatalf("Failed to apply %d migrations", relativeNInt)
}
logCurrentVersion(mctx, migrate)
return nil
},
}

var applyCommand = cli.Command{
Name: "apply",
Aliases: []string{"a"},
Usage: "Run up migration for specific version",
ArgsUsage: "<version>",
Flags: MigrateFlags,
SkipFlagParsing: true,
Action: func(ctx *cli.Context) error {
version := ctx.Args().First()
versionInt, err := strconv.Atoi(version)
if err != nil {
logErr(err).Fatal("Unable to parse param <n>")
}

log.Infof("Applying version %d", versionInt)

migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err = migrate.ApplyVersion(mctx, file.Version(versionInt))
if err != nil {
logErr(err).Fatalf("Failed to apply version %d", versionInt)
}
logCurrentVersion(mctx, migrate)
return nil
},
}

var rollbackCommand = cli.Command{
Name: "rollback",
Aliases: []string{"r"},
Usage: "Run down migration for specific version",
ArgsUsage: "<version>",
Flags: MigrateFlags,
SkipFlagParsing: true,
Action: func(ctx *cli.Context) error {
version := ctx.Args().First()
versionInt, err := strconv.Atoi(version)
if err != nil {
logErr(err).Fatal("Unable to parse param <n>")
}

log.Infof("Applying version %d", versionInt)

migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
err = migrate.RollbackVersion(mctx, file.Version(versionInt))
if err != nil {
logErr(err).Fatalf("Failed to rollback version %d", versionInt)
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}
Expand All @@ -194,64 +253,77 @@ var gotoCommand = cli.Command{

log.Infof("Migrating to version %d", toVersionInt)

currentVersion, err := migrate.Version(ctx.GlobalString("url"), ctx.GlobalString("path"))
migrate, mctx, cancel := newMigrateWithCtx(ctx.GlobalString("url"), ctx.GlobalString("path"))
defer cancel()
currentVersion, err := migrate.Version(mctx)
if err != nil {
logErr(err).Fatalf("failed to migrate to version %d", toVersionInt)
}

relativeNInt := toVersionInt - int(currentVersion)

pipe := pipep.New()
go migrate.Migrate(pipe, ctx.GlobalString("url"), ctx.GlobalString("path"), relativeNInt)
ok := readPipe(pipe)
if !ok {
os.Exit(1)
err = migrate.Migrate(mctx, relativeNInt)
if err != nil {
logErr(err).Fatalf("Failed to migrate to vefrsion %d", toVersionInt)
}
logCurrentVersion(ctx.GlobalString("url"), ctx.GlobalString("path"))
logCurrentVersion(mctx, migrate)
return nil
},
}

func logErr(err error) *log.Entry {
return log.WithError(err)
func newMigrateWithCtx(url, migrationsPath string) (*migrate.Handle, context.Context, func()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, why the context.Context is not a field of *migrate.Handle?
I smell a wrong utilization of context here.

Copy link
Member Author

@josephbuchma josephbuchma Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be a field of *migrate.Handle. If you add context to Handle at create time, it will make this Handle useless once you cancel this context.

done := make(chan struct{})
m, err := migrate.Open(url, migrationsPath, migrate.WithHooks(
func(f file.File) error {
log.Infof("Applying %s migration for version %d (%s)", f.Direction, f.Version, f.Name)
return nil
},
func(f file.File) error {
done <- struct{}{}
return nil
},
))
if err != nil {
log.Fatalf("Initialization failed: %s", err)
}
ctx, cancel := newOsInterruptCtx(done)
return m, ctx, cancel
}

// readPipe reads items from a chan and returns a boolean and the number of migration files executed
func readPipe(pipe chan interface{}) (ok bool) {
okFlag := true
if pipe != nil {
for {
// newOsInterruptCtx returns new context that will be cancelled
// on os.Interrupt signal.
func newOsInterruptCtx(done <-chan struct{}) (context.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lib should never presume of context behaviour.
You must pass a context to migrate, and handle this stuff in https://github.com/db-journey/journey/ instead.
That way, you let the users chose where context.Background() is created.

Copy link
Member Author

@josephbuchma josephbuchma Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that commands package is not supposed to be used apart from journey CLI. And it doesn't presume context behavior. The only thing it "presumes" is that context might be canceled, which is kind of normal thing to presume when you work with context.
So passing context here from journey or whatever else won't change/break anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you talking about os signals processing - yes, it does makes sense to move it to journey's main package if commands is used elsewhere beyond journey.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, I mixed up with the migrate project.
You're right!

go func() {
exit := false
for loop := true; loop; {
select {
case item, more := <-pipe:
if !more {
return okFlag
case <-done:
if exit {
loop = false
}
switch item.(type) {

case error:
log.Error(item.(error).Error())
okFlag = false

case file.File:
f := item.(file.File)
dir := "up"
if f.Direction == direction.Down {
dir = "down"
}

log.WithField("dir", dir).Infof("%s", f.FileName)

default:
log.Info(item)
case <-c:
if exit {
os.Exit(5)
}
cancel()
exit = true
log.Info("Aborting after this migration... Hit again to force quit.")
}
}
}
return okFlag
signal.Stop(c)
}()
return ctx, cancel
}

func logErr(err error) *log.Entry {
return log.WithError(err)
}

func logCurrentVersion(url, migrationsPath string) {
version, err := migrate.Version(url, migrationsPath)
func logCurrentVersion(ctx context.Context, migrate *migrate.Handle) {
version, err := migrate.Version(ctx)
if err != nil {
logErr(err).Fatal("Unable to fetch version")
}
Expand Down