Skip to content

Commit

Permalink
Merge pull request #471 from dtm-labs/alpha
Browse files Browse the repository at this point in the history
use a new algorithm to do lock one trans
  • Loading branch information
yedf2 authored Oct 21, 2023
2 parents e704733 + a5f0ae1 commit dedf72e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 60 deletions.
92 changes: 46 additions & 46 deletions dtmsvr/storage/boltdb/boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"testing"
"time"

. "github.com/onsi/gomega"
ga "github.com/onsi/gomega"
bolt "go.etcd.io/bbolt"

"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
Expand All @@ -20,13 +20,13 @@ import (

func TestInitializeBuckets(t *testing.T) {
t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

err = initializeBuckets(db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

actualBuckets := [][]byte{}
err = db.View(func(t *bolt.Tx) error {
Expand All @@ -35,42 +35,42 @@ func TestInitializeBuckets(t *testing.T) {
return nil
})
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

g.Expect(actualBuckets).To(Equal(allBuckets))
g.Expect(actualBuckets).To(ga.Equal(allBuckets))
})
}

func TestCleanupExpiredData(t *testing.T) {
t.Run("negative expired seconds", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

err = cleanupExpiredData(-1*time.Second, db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})

t.Run("nil global bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

err = cleanupExpiredData(time.Second, db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})

t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

// Initialize data
err = initializeBuckets(db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

err = db.Update(func(t *bolt.Tx) error {
doneTime := time.Now().Add(-10 * time.Minute)
Expand All @@ -95,10 +95,10 @@ func TestCleanupExpiredData(t *testing.T) {

return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

err = cleanupExpiredData(time.Minute, db)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

actualGids := []string{}
err = db.View(func(t *bolt.Tx) error {
Expand All @@ -108,29 +108,29 @@ func TestCleanupExpiredData(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualGids).To(Equal([]string{"gid0"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualGids).To(ga.Equal([]string{"gid0"}))
})
}

func TestCleanupGlobalWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

err = db.Update(func(t *bolt.Tx) error {
cleanupGlobalWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})

t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

// Initialize data
Expand All @@ -151,7 +151,7 @@ func TestCleanupGlobalWithGids(t *testing.T) {

return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

err = db.Update(func(t *bolt.Tx) error {
cleanupGlobalWithGids(t, map[string]struct{}{
Expand All @@ -160,7 +160,7 @@ func TestCleanupGlobalWithGids(t *testing.T) {
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

actualGids := []string{}
err = db.View(func(t *bolt.Tx) error {
Expand All @@ -170,29 +170,29 @@ func TestCleanupGlobalWithGids(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualGids).To(Equal([]string{"k3"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualGids).To(ga.Equal([]string{"k3"}))
})
}

func TestCleanupBranchWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

err = db.Update(func(t *bolt.Tx) error {
cleanupBranchWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})

t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

// Initialize data
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestCleanupBranchWithGids(t *testing.T) {

return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

err = db.Update(func(t *bolt.Tx) error {
cleanupBranchWithGids(t, map[string]struct{}{
Expand All @@ -241,7 +241,7 @@ func TestCleanupBranchWithGids(t *testing.T) {
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

actualKeys := []string{}
err = db.View(func(t *bolt.Tx) error {
Expand All @@ -251,29 +251,29 @@ func TestCleanupBranchWithGids(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualKeys).To(Equal([]string{"a", "gid201", "z"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualKeys).To(ga.Equal([]string{"a", "gid201", "z"}))
})
}

func TestCleanupIndexWithGids(t *testing.T) {
t.Run("nil bucket", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

err = db.Update(func(t *bolt.Tx) error {
cleanupIndexWithGids(t, nil)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
})

t.Run("normal test", func(t *testing.T) {
g := NewWithT(t)
g := ga.NewWithT(t)
db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())
defer db.Close()

// Initialize data
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestCleanupIndexWithGids(t *testing.T) {

return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

err = db.Update(func(t *bolt.Tx) error {
cleanupIndexWithGids(t, map[string]struct{}{
Expand All @@ -322,7 +322,7 @@ func TestCleanupIndexWithGids(t *testing.T) {
})
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).ToNot(ga.HaveOccurred())

actualKeys := []string{}
err = db.View(func(t *bolt.Tx) error {
Expand All @@ -332,7 +332,7 @@ func TestCleanupIndexWithGids(t *testing.T) {
}
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(actualKeys).To(Equal([]string{"3-gid2", "a", "z"}))
g.Expect(err).ToNot(ga.HaveOccurred())
g.Expect(actualKeys).To(ga.Equal([]string{"3-gid2", "a", "z"}))
})
}
23 changes: 14 additions & 9 deletions dtmsvr/storage/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package sql

import (
"database/sql"
"errors"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -159,23 +161,26 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS
db := dbGet()
owner := shortuuid.New()
nextCronTime := getTimeStr(int64(expireIn / time.Second))
where := map[string]string{
dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted') limit 1`, nextCronTime),
dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted') limit 1 )`, nextCronTime),
where := fmt.Sprintf(`next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted')`, nextCronTime)

order := map[string]string{
dtmimp.DBTypeMysql: `order by rand()`,
dtmimp.DBTypePostgres: `order by random()`,
}[conf.Store.Driver]

ssql := fmt.Sprintf(`select count(1) from trans_global where %s`, where)
var cnt int64
err := db.ToSQLDB().QueryRow(ssql).Scan(&cnt)
dtmimp.PanicIf(err != nil, err)
if cnt == 0 {
ssql := fmt.Sprintf(`select id from trans_global where %s %s limit 1`, where, order)
var id int64
err := db.ToSQLDB().QueryRow(ssql).Scan(&id)
if errors.Is(err, sql.ErrNoRows) {
return nil
}
dtmimp.PanicIf(err != nil, err)

sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE %s`,
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE id=%d and %s`,
getTimeStr(0),
getTimeStr(conf.RetryInterval),
owner,
id,
where)
affected, err := dtmimp.DBExec(conf.Store.Driver, db.ToSQLDB(), sql)

Expand Down
12 changes: 7 additions & 5 deletions dtmsvr/trans_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,15 @@ func (t *TransGlobal) execBranch(ctx context.Context, branch *TransBranch, branc
func (t *TransGlobal) getNextCronInterval(ctype cronType) int64 {
if ctype == cronBackoff {
return t.NextCronInterval * 2
} else if ctype == cronKeep {
}
if ctype == cronKeep {
return t.NextCronInterval
} else if t.RetryInterval != 0 {
}
if t.RetryInterval != 0 {
return t.RetryInterval
} else if t.TimeoutToFail > 0 && t.TimeoutToFail < conf.RetryInterval {
}
if t.TimeoutToFail > 0 && t.TimeoutToFail < conf.RetryInterval {
return t.TimeoutToFail
} else {
return conf.RetryInterval
}
return conf.RetryInterval
}

0 comments on commit dedf72e

Please sign in to comment.