From bcfca50ff3b64bc6e25b7ab9a0618873af894999 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Fri, 16 Feb 2018 11:46:11 +0100 Subject: [PATCH 01/11] added neo4j migration hooks --- database/neo4j/README.md | 47 +++++++ database/neo4j/neo4j.go | 233 +++++++++++++++++++++++++++++++++++ database/neo4j/neo4j_test.go | 44 +++++++ 3 files changed, 324 insertions(+) create mode 100644 database/neo4j/neo4j.go create mode 100644 database/neo4j/neo4j_test.go diff --git a/database/neo4j/README.md b/database/neo4j/README.md index e69de29b..c691493e 100644 --- a/database/neo4j/README.md +++ b/database/neo4j/README.md @@ -0,0 +1,47 @@ +# Neo4J + +`http://user:password@host:port?query` + +| URL Query | WithInstance Config | Description | +|------------|---------------------|-------------| +| `x-migrations-label` | `SchemaMigration` | Name of the migrations node | + +## Use with existing client + +```go + +import ( + "log" + "github.com/mattes/migrate" + "github.com/mattes/migrate/database/neo4j" + "database/sql" + + _ "github.com/mattes/migrate/source/file" + _ "gopkg.in/cq.v1" +) + +func main() { + + db, err := sql.Open("neo4j-cypher", "http://neo4j:password@localhost:7474") + if err != nil { + log.Fatal(err) + } + + driver, err := neo4j.WithInstance(db, &neo4j.Config{}) + if err != nil { + panic(err) + } + + migration, err := migrate.NewWithDatabaseInstance( + "file:///migrations", + "", driver) + if err != nil { + panic(err) + } + + err = migration.Up() + if err != nil { + panic(err) + } +} +``` diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go new file mode 100644 index 00000000..cc3049e3 --- /dev/null +++ b/database/neo4j/neo4j.go @@ -0,0 +1,233 @@ +package neo4j + +import ( + "fmt" + "io" + "io/ioutil" + nurl "net/url" + "strings" + + "database/sql" + _ "gopkg.in/cq.v1" + "github.com/mattes/migrate" + "github.com/mattes/migrate/database" +) + +func init() { + database.Register("neo4j", &Neo4j{}) +} + +var DefaultMigrationsLabel = "SchemaMigration" + +var ( + ErrNilConfig = fmt.Errorf("no config") +) + +type Config struct { + MigrationsLabel string +} + +type Neo4j struct { + db *sql.DB + tx *sql.Tx + isLocked bool + config *Config +} + +func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { + if config == nil { + return nil, ErrNilConfig + } + + if err := instance.Ping(); err != nil { + return nil, err + } + + if len(config.MigrationsLabel) == 0 { + config.MigrationsLabel = DefaultMigrationsLabel + } + + mx := &Neo4j{ + db: instance, + config: config, + } + + return mx, nil +} + +func (m *Neo4j) Open(url string) (database.Driver, error) { + purl, err := nurl.Parse(url) + if err != nil { + return nil, err + } + + db, err := sql.Open("neo4j-cypher", migrate.FilterCustomQuery(purl).String()) + if err != nil { + return nil, err + } + + migrationsLabel := purl.Query().Get("x-migrations-label") + if len(migrationsLabel) == 0 { + migrationsLabel = DefaultMigrationsLabel + } + + mx, err := WithInstance(db, &Config{ + MigrationsLabel: migrationsLabel, + }) + if err != nil { + return nil, err + } + + return mx, nil +} + +func (m *Neo4j) Close() error { + return m.db.Close() +} + +func (m *Neo4j) Lock() error { + if m.isLocked { + return database.ErrLocked + } + tx, err := m.db.Begin() + if err != nil { + return &database.Error{OrigErr: err, Err: "transaction start failed"} + } + m.tx = tx + m.isLocked = true + return nil +} + +func (m *Neo4j) Unlock() (err error) { + m.isLocked = false + if m.tx != nil { + if err := m.tx.Commit(); err != nil { + err = &database.Error{OrigErr: err, Err: "transaction commit failed"} + } + m.tx = nil + } + return +} + +func (m *Neo4j) Rollback() (err error) { + if m.tx != nil { + if err := m.tx.Rollback(); err != nil { + err = &database.Error{OrigErr: err, Err: "transaction rollback failed"} + } + m.tx = nil + } + return +} + +func (m *Neo4j) Run(migration io.Reader) error { + migr, err := ioutil.ReadAll(migration) + if err != nil { + return err + } + + contents := string(migr[:]) + queries := strings.Split(contents, ";\n") + + for _, query := range queries { + + if len(strings.TrimSpace(query)) == 0 { + continue + } + + stmt, err := m.db.Prepare(query) + if err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + defer stmt.Close() + + if _, err := stmt.Exec(); err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(query)} + } + } + + return nil +} + +func (m *Neo4j) SetVersion(version int, dirty bool) error { + + query := "MATCH (m:" + m.config.MigrationsLabel + ") delete m" + stmt, err := m.db.Prepare(query) + if err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + defer stmt.Close() + + if _, err := stmt.Exec(version); err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + if version >= 0 { + + query := "MATCH (m:" + m.config.MigrationsLabel + ") where m.version={0} delete m" + stmt, err := m.db.Prepare(query) + if err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + defer stmt.Close() + + if _, err := stmt.Exec(version); err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + query = "CREATE (:" + m.config.MigrationsLabel + " {version:{0}, dirty:{1}})" + stmt, err = m.db.Prepare(query) + if err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + defer stmt.Close() + if _, err := stmt.Exec(version, dirty); err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + } + + return nil +} + +func (m *Neo4j) Version() (version int, dirty bool, err error) { + query := "MATCH (m:" + m.config.MigrationsLabel + ") return m.version, m.dirty ORDER BY m.version LIMIT 1" + stmt, err := m.db.Prepare(query) + if err != nil { + return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} + } + defer stmt.Close() + err = stmt.QueryRow(query).Scan(&version, &dirty) + switch { + case err == sql.ErrNoRows: + return database.NilVersion, false, nil + + case err != nil: + return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} + + default: + return int(version), dirty, nil + } +} + +func (m *Neo4j) Drop() error { + // select all tables + query := "MATCH (m:" + m.config.MigrationsLabel + ") delete m" + stmt, err := m.db.Prepare(query) + if err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + defer stmt.Close() + _, err = stmt.Exec() + if err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + return nil +} diff --git a/database/neo4j/neo4j_test.go b/database/neo4j/neo4j_test.go new file mode 100644 index 00000000..e640b460 --- /dev/null +++ b/database/neo4j/neo4j_test.go @@ -0,0 +1,44 @@ +package neo4j + +import ( + "fmt" + "testing" + "database/sql" + sqldriver "database/sql/driver" + + dt "github.com/mattes/migrate/database/testing" + mt "github.com/mattes/migrate/testing" +) + +var versions = []mt.Version{ + {Image: "neo4j:3", ENV: []string{"x-migrations-label=SchemaMigrationTest"}}, +} + +func isReady(i mt.Instance) bool { + db, err := sql.Open("neo4j-cypher", fmt.Sprintf("http://%v:%v", i.Host(), i.Port())) + if err != nil { + return false + } + defer db.Close() + err = db.Ping() + + if err == sqldriver.ErrBadConn { + return false + } + + return true +} + +func Test(t *testing.T) { + + mt.ParallelTest(t, versions, isReady, + func(t *testing.T, i mt.Instance) { + p := &Neo4j{} + addr := fmt.Sprintf("http://%v:%v", i.Host(), i.Port()) + d, err := p.Open(addr) + if err != nil { + t.Fatalf("%v", err) + } + dt.Test(t, d, []byte("CREATE (:Test)")) + }) +} From 619485959416f5aa2a6c23d4f3ae33d4af948977 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 19 Feb 2018 16:12:06 +0100 Subject: [PATCH 02/11] local var overlapped with named result --- database/neo4j/neo4j.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index cc3049e3..98de26ce 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -101,7 +101,7 @@ func (m *Neo4j) Lock() error { func (m *Neo4j) Unlock() (err error) { m.isLocked = false if m.tx != nil { - if err := m.tx.Commit(); err != nil { + if e := m.tx.Commit(); e != nil { err = &database.Error{OrigErr: err, Err: "transaction commit failed"} } m.tx = nil @@ -111,7 +111,7 @@ func (m *Neo4j) Unlock() (err error) { func (m *Neo4j) Rollback() (err error) { if m.tx != nil { - if err := m.tx.Rollback(); err != nil { + if e := m.tx.Rollback(); e != nil { err = &database.Error{OrigErr: err, Err: "transaction rollback failed"} } m.tx = nil From 5a6227556c00c79fc17a2132e39dd7a98b4358e5 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 19 Feb 2018 16:42:18 +0100 Subject: [PATCH 03/11] switching to bolt implementation as cq is old and tx support is broken --- database/neo4j/neo4j.go | 89 +++++++++++++---------------------------- 1 file changed, 27 insertions(+), 62 deletions(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index 98de26ce..cf81564d 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -7,8 +7,7 @@ import ( nurl "net/url" "strings" - "database/sql" - _ "gopkg.in/cq.v1" + bolt "github.com/johnnadratowski/golang-neo4j-bolt-driver" "github.com/mattes/migrate" "github.com/mattes/migrate/database" ) @@ -28,21 +27,17 @@ type Config struct { } type Neo4j struct { - db *sql.DB - tx *sql.Tx + db bolt.Conn + tx bolt.Tx isLocked bool config *Config } -func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { - if config == nil { +func WithInstance(instance bolt.Conn, config *Config) (database.Driver, error) { + if instance == nil || config == nil { return nil, ErrNilConfig } - if err := instance.Ping(); err != nil { - return nil, err - } - if len(config.MigrationsLabel) == 0 { config.MigrationsLabel = DefaultMigrationsLabel } @@ -55,36 +50,6 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { return mx, nil } -func (m *Neo4j) Open(url string) (database.Driver, error) { - purl, err := nurl.Parse(url) - if err != nil { - return nil, err - } - - db, err := sql.Open("neo4j-cypher", migrate.FilterCustomQuery(purl).String()) - if err != nil { - return nil, err - } - - migrationsLabel := purl.Query().Get("x-migrations-label") - if len(migrationsLabel) == 0 { - migrationsLabel = DefaultMigrationsLabel - } - - mx, err := WithInstance(db, &Config{ - MigrationsLabel: migrationsLabel, - }) - if err != nil { - return nil, err - } - - return mx, nil -} - -func (m *Neo4j) Close() error { - return m.db.Close() -} - func (m *Neo4j) Lock() error { if m.isLocked { return database.ErrLocked @@ -134,14 +99,14 @@ func (m *Neo4j) Run(migration io.Reader) error { continue } - stmt, err := m.db.Prepare(query) + stmt, err := m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } defer stmt.Close() - if _, err := stmt.Exec(); err != nil { + if _, err := stmt.ExecNeo(nil); err != nil { m.Rollback() return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(query)} } @@ -153,41 +118,41 @@ func (m *Neo4j) Run(migration io.Reader) error { func (m *Neo4j) SetVersion(version int, dirty bool) error { query := "MATCH (m:" + m.config.MigrationsLabel + ") delete m" - stmt, err := m.db.Prepare(query) + stmt, err := m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } defer stmt.Close() - if _, err := stmt.Exec(version); err != nil { + if _, err := stmt.ExecNeo(nil); err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } if version >= 0 { - query := "MATCH (m:" + m.config.MigrationsLabel + ") where m.version={0} delete m" - stmt, err := m.db.Prepare(query) + query := "MATCH (m:" + m.config.MigrationsLabel + ") where m.version={version} delete m" + stmt, err := m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } defer stmt.Close() - if _, err := stmt.Exec(version); err != nil { + if _, err := stmt.ExecNeo(map[string]interface{}{"version": version}); err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } - query = "CREATE (:" + m.config.MigrationsLabel + " {version:{0}, dirty:{1}})" - stmt, err = m.db.Prepare(query) + query = "CREATE (:" + m.config.MigrationsLabel + " {version:{version}, dirty:{dirty}})" + stmt, err = m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } defer stmt.Close() - if _, err := stmt.Exec(version, dirty); err != nil { + if _, err := stmt.ExecNeo(map[string]interface{}{"version": version, "dirty": dirty}); err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } @@ -198,33 +163,33 @@ func (m *Neo4j) SetVersion(version int, dirty bool) error { func (m *Neo4j) Version() (version int, dirty bool, err error) { query := "MATCH (m:" + m.config.MigrationsLabel + ") return m.version, m.dirty ORDER BY m.version LIMIT 1" - stmt, err := m.db.Prepare(query) + stmt, err := m.db.PrepareNeo(query) if err != nil { return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} } defer stmt.Close() - err = stmt.QueryRow(query).Scan(&version, &dirty) - switch { - case err == sql.ErrNoRows: - return database.NilVersion, false, nil - - case err != nil: + rows, err := stmt.QueryNeo(nil) + data, _, err := rows.NextNeo() + if err != nil { + if err == io.EOF { + return database.NilVersion, false, nil + } return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} - - default: - return int(version), dirty, nil } + + return data[0].(int), data[1].(bool), nil + } func (m *Neo4j) Drop() error { // select all tables query := "MATCH (m:" + m.config.MigrationsLabel + ") delete m" - stmt, err := m.db.Prepare(query) + stmt, err := m.db.PrepareNeo(query) if err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } defer stmt.Close() - _, err = stmt.Exec() + _, err = stmt.ExecNeo(nil) if err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } From 05ddbda17496c7fd65d774b8ecdca9de94703f0d Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 19 Feb 2018 16:51:00 +0100 Subject: [PATCH 04/11] add open & close methods --- database/neo4j/neo4j.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index cf81564d..1f5b1dc3 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -4,11 +4,9 @@ import ( "fmt" "io" "io/ioutil" - nurl "net/url" "strings" bolt "github.com/johnnadratowski/golang-neo4j-bolt-driver" - "github.com/mattes/migrate" "github.com/mattes/migrate/database" ) @@ -50,6 +48,25 @@ func WithInstance(instance bolt.Conn, config *Config) (database.Driver, error) { return mx, nil } +func (m *Neo4j) Open(url string) (database.Driver, error) { + boltDriver := bolt.NewDriver() + conn, err := boltDriver.OpenNeo(url) + if err != nil { + return nil, err + } + defer conn.Close() + + driver, err := WithInstance(conn, &Config{}) + if err != nil { + return nil, err + } + return driver, nil +} + +func (m *Neo4j) Close() error { + return m.db.Close() +} + func (m *Neo4j) Lock() error { if m.isLocked { return database.ErrLocked From 00f0abdf10d0456432892978be98434595108aef Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 19 Feb 2018 16:58:47 +0100 Subject: [PATCH 05/11] use references --- database/neo4j/neo4j.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index 1f5b1dc3..e504c367 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -56,7 +56,7 @@ func (m *Neo4j) Open(url string) (database.Driver, error) { } defer conn.Close() - driver, err := WithInstance(conn, &Config{}) + driver, err := WithInstance(&conn, &Config{}) if err != nil { return nil, err } @@ -75,7 +75,7 @@ func (m *Neo4j) Lock() error { if err != nil { return &database.Error{OrigErr: err, Err: "transaction start failed"} } - m.tx = tx + m.tx = &tx m.isLocked = true return nil } From f54a21d2299f4e50b1280d03cbc981f054e89fa6 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 19 Feb 2018 17:03:15 +0100 Subject: [PATCH 06/11] nope, implementation already returns references --- database/neo4j/neo4j.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index e504c367..a01e9a20 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -56,7 +56,7 @@ func (m *Neo4j) Open(url string) (database.Driver, error) { } defer conn.Close() - driver, err := WithInstance(&conn, &Config{}) + driver, err := WithInstance(conn, &Config{}) if err != nil { return nil, err } From af2ea3e87a5943cf0ccdc455253d64e248ac15e5 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 19 Feb 2018 17:04:25 +0100 Subject: [PATCH 07/11] nope, implementation already returns references --- database/neo4j/neo4j.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index a01e9a20..1f5b1dc3 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -75,7 +75,7 @@ func (m *Neo4j) Lock() error { if err != nil { return &database.Error{OrigErr: err, Err: "transaction start failed"} } - m.tx = &tx + m.tx = tx m.isLocked = true return nil } From e3abb7e785669464c24904b32a9564627fdd6922 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 19 Feb 2018 17:28:36 +0100 Subject: [PATCH 08/11] works with tx --- database/neo4j/neo4j.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index 1f5b1dc3..1acb54ad 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -127,6 +127,7 @@ func (m *Neo4j) Run(migration io.Reader) error { m.Rollback() return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(query)} } + stmt.Close() } return nil @@ -135,44 +136,47 @@ func (m *Neo4j) Run(migration io.Reader) error { func (m *Neo4j) SetVersion(version int, dirty bool) error { query := "MATCH (m:" + m.config.MigrationsLabel + ") delete m" - stmt, err := m.db.PrepareNeo(query) + stmt1, err := m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } - defer stmt.Close() + defer stmt1.Close() - if _, err := stmt.ExecNeo(nil); err != nil { + if _, err := stmt1.ExecNeo(map[string]interface{}{}); err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } + stmt1.Close() if version >= 0 { query := "MATCH (m:" + m.config.MigrationsLabel + ") where m.version={version} delete m" - stmt, err := m.db.PrepareNeo(query) + stmt2, err := m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } - defer stmt.Close() + defer stmt2.Close() - if _, err := stmt.ExecNeo(map[string]interface{}{"version": version}); err != nil { + if _, err := stmt2.ExecNeo(map[string]interface{}{"version": version}); err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } + stmt2.Close() query = "CREATE (:" + m.config.MigrationsLabel + " {version:{version}, dirty:{dirty}})" - stmt, err = m.db.PrepareNeo(query) + stmt3, err := m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } - defer stmt.Close() - if _, err := stmt.ExecNeo(map[string]interface{}{"version": version, "dirty": dirty}); err != nil { + defer stmt3.Close() + if _, err := stmt3.ExecNeo(map[string]interface{}{"version": version, "dirty": dirty}); err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } + stmt3.Close() } return nil From 9d349f4b73f05cd0fe8f4fa84e75492b368f82d0 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Tue, 20 Feb 2018 17:19:48 +0100 Subject: [PATCH 09/11] fix statement usage --- database/neo4j/README.md | 19 +++++----- database/neo4j/neo4j.go | 77 +++++++++++++++++++++------------------- 2 files changed, 49 insertions(+), 47 deletions(-) diff --git a/database/neo4j/README.md b/database/neo4j/README.md index c691493e..f90f3ac8 100644 --- a/database/neo4j/README.md +++ b/database/neo4j/README.md @@ -1,10 +1,6 @@ # Neo4J -`http://user:password@host:port?query` - -| URL Query | WithInstance Config | Description | -|------------|---------------------|-------------| -| `x-migrations-label` | `SchemaMigration` | Name of the migrations node | +`http://user:password@host:port` ## Use with existing client @@ -14,20 +10,21 @@ import ( "log" "github.com/mattes/migrate" "github.com/mattes/migrate/database/neo4j" - "database/sql" - _ "github.com/mattes/migrate/source/file" - _ "gopkg.in/cq.v1" + bolt "github.com/johnnadratowski/golang-neo4j-bolt-driver" ) func main() { - db, err := sql.Open("neo4j-cypher", "http://neo4j:password@localhost:7474") + boltDriver := bolt.NewDriver() + conn, err := boltDriver.OpenNeo("bolt://neo4j:root@localhost:7687") if err != nil { - log.Fatal(err) + panic(err) } + defer conn.Close() + - driver, err := neo4j.WithInstance(db, &neo4j.Config{}) + driver, err := neo4j.WithInstance(conn, &neo4j.Config{}) if err != nil { panic(err) } diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index 1acb54ad..6d72241e 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -127,6 +127,7 @@ func (m *Neo4j) Run(migration io.Reader) error { m.Rollback() return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(query)} } + // have to close statements in loop stmt.Close() } @@ -135,55 +136,60 @@ func (m *Neo4j) Run(migration io.Reader) error { func (m *Neo4j) SetVersion(version int, dirty bool) error { - query := "MATCH (m:" + m.config.MigrationsLabel + ") delete m" - stmt1, err := m.db.PrepareNeo(query) + if version >= 0 { + + v, _, err := m.Version() + if err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Err: "Could not get version"} + } + + if v == version { + // update + m.updateVersion(version, dirty) + } else { + // create + m.createVersion(version, dirty) + } + } + + return nil +} +func (m *Neo4j) updateVersion(version int, dirty bool) error { + + query := "MATCH (m:" + m.config.MigrationsLabel + ") where m.version={version} SET m.dirty = {dirty}" + stmt, err := m.db.PrepareNeo(query) if err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } - defer stmt1.Close() - - if _, err := stmt1.ExecNeo(map[string]interface{}{}); err != nil { + defer stmt.Close() + if _, err := stmt.ExecNeo(map[string]interface{}{"version": version, "dirty": dirty}); err != nil { m.Rollback() return &database.Error{OrigErr: err, Query: []byte(query)} } - stmt1.Close() - - if version >= 0 { - - query := "MATCH (m:" + m.config.MigrationsLabel + ") where m.version={version} delete m" - stmt2, err := m.db.PrepareNeo(query) - if err != nil { - m.Rollback() - return &database.Error{OrigErr: err, Query: []byte(query)} - } - defer stmt2.Close() - if _, err := stmt2.ExecNeo(map[string]interface{}{"version": version}); err != nil { - m.Rollback() - return &database.Error{OrigErr: err, Query: []byte(query)} - } - stmt2.Close() + return nil +} +func (m *Neo4j) createVersion(version int, dirty bool) error { - query = "CREATE (:" + m.config.MigrationsLabel + " {version:{version}, dirty:{dirty}})" - stmt3, err := m.db.PrepareNeo(query) - if err != nil { - m.Rollback() - return &database.Error{OrigErr: err, Query: []byte(query)} - } - defer stmt3.Close() - if _, err := stmt3.ExecNeo(map[string]interface{}{"version": version, "dirty": dirty}); err != nil { - m.Rollback() - return &database.Error{OrigErr: err, Query: []byte(query)} - } - stmt3.Close() + query := "CREATE (:" + m.config.MigrationsLabel + " {version:{version}, dirty:{dirty}})" + stmt, err := m.db.PrepareNeo(query) + if err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} + } + defer stmt.Close() + if _, err := stmt.ExecNeo(map[string]interface{}{"version": version, "dirty": dirty}); err != nil { + m.Rollback() + return &database.Error{OrigErr: err, Query: []byte(query)} } return nil } func (m *Neo4j) Version() (version int, dirty bool, err error) { - query := "MATCH (m:" + m.config.MigrationsLabel + ") return m.version, m.dirty ORDER BY m.version LIMIT 1" + query := "MATCH (m:" + m.config.MigrationsLabel + ") return m.version, m.dirty ORDER BY m.version DESC LIMIT 1" stmt, err := m.db.PrepareNeo(query) if err != nil { return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} @@ -198,8 +204,7 @@ func (m *Neo4j) Version() (version int, dirty bool, err error) { return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} } - return data[0].(int), data[1].(bool), nil - + return int(data[0].(int64)), data[1].(bool), nil } func (m *Neo4j) Drop() error { From f71dbd4c6492b411ee21d2ddc7eda753b42ed376 Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Mon, 5 Mar 2018 10:57:31 +0100 Subject: [PATCH 10/11] only keep one migration node --- database/neo4j/neo4j.go | 37 +++++++------------------------------ 1 file changed, 7 insertions(+), 30 deletions(-) diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index 6d72241e..76f18a14 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -136,41 +136,18 @@ func (m *Neo4j) Run(migration io.Reader) error { func (m *Neo4j) SetVersion(version int, dirty bool) error { - if version >= 0 { - - v, _, err := m.Version() - if err != nil { - m.Rollback() - return &database.Error{OrigErr: err, Err: "Could not get version"} - } - - if v == version { - // update - m.updateVersion(version, dirty) - } else { - // create - m.createVersion(version, dirty) - } - } - - return nil -} -func (m *Neo4j) updateVersion(version int, dirty bool) error { - - query := "MATCH (m:" + m.config.MigrationsLabel + ") where m.version={version} SET m.dirty = {dirty}" - stmt, err := m.db.PrepareNeo(query) - if err != nil { + if err := m.Drop(); err != nil { m.Rollback() - return &database.Error{OrigErr: err, Query: []byte(query)} + return &database.Error{OrigErr: err, Err: "Could not delete migration nodes"} } - defer stmt.Close() - if _, err := stmt.ExecNeo(map[string]interface{}{"version": version, "dirty": dirty}); err != nil { - m.Rollback() - return &database.Error{OrigErr: err, Query: []byte(query)} + + if version >= 0 { + m.createVersion(version, dirty) } return nil } + func (m *Neo4j) createVersion(version int, dirty bool) error { query := "CREATE (:" + m.config.MigrationsLabel + " {version:{version}, dirty:{dirty}})" @@ -208,7 +185,7 @@ func (m *Neo4j) Version() (version int, dirty bool, err error) { } func (m *Neo4j) Drop() error { - // select all tables + // delete all migration nodes query := "MATCH (m:" + m.config.MigrationsLabel + ") delete m" stmt, err := m.db.PrepareNeo(query) if err != nil { From 90da90bc1e42579e8b60a264247c9b68c2095aba Mon Sep 17 00:00:00 2001 From: Jaco Kruger Date: Tue, 20 Mar 2018 13:36:13 +0100 Subject: [PATCH 11/11] support turning transactions on/off in case the user wants to run schema migrations instead of data migrations --- database/neo4j/README.md | 2 +- database/neo4j/neo4j.go | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/database/neo4j/README.md b/database/neo4j/README.md index f90f3ac8..22808dfd 100644 --- a/database/neo4j/README.md +++ b/database/neo4j/README.md @@ -24,7 +24,7 @@ func main() { defer conn.Close() - driver, err := neo4j.WithInstance(conn, &neo4j.Config{}) + driver, err := neo4j.WithInstance(conn, &neo4j.Config{MigrationsLabel: "DataMigration", UseTransactions: true}) if err != nil { panic(err) } diff --git a/database/neo4j/neo4j.go b/database/neo4j/neo4j.go index 76f18a14..ab64ee44 100644 --- a/database/neo4j/neo4j.go +++ b/database/neo4j/neo4j.go @@ -22,6 +22,7 @@ var ( type Config struct { MigrationsLabel string + UseTransactions bool } type Neo4j struct { @@ -71,11 +72,13 @@ func (m *Neo4j) Lock() error { if m.isLocked { return database.ErrLocked } - tx, err := m.db.Begin() - if err != nil { - return &database.Error{OrigErr: err, Err: "transaction start failed"} + if m.config.UseTransactions { + tx, err := m.db.Begin() + if err != nil { + return &database.Error{OrigErr: err, Err: "transaction start failed"} + } + m.tx = tx } - m.tx = tx m.isLocked = true return nil } @@ -142,7 +145,7 @@ func (m *Neo4j) SetVersion(version int, dirty bool) error { } if version >= 0 { - m.createVersion(version, dirty) + return m.createVersion(version, dirty) } return nil