diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..b858d64 --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,30 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go Tests + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + + build: + runs-on: ubuntu-latest + strategy: + matrix: + go: [ '1.20', '1.19' ] + + name: Go ${{ matrix.go }} tests + steps: + - uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + - name: Build + run: go build -v ./... + - name: Test + run: ./test.sh diff --git a/cmd/lightningstream/commands/sync.go b/cmd/lightningstream/commands/sync.go index a446623..e2616e6 100644 --- a/cmd/lightningstream/commands/sync.go +++ b/cmd/lightningstream/commands/sync.go @@ -63,23 +63,33 @@ func runSync(receiveOnly bool) error { eg, ctx := errgroup.WithContext(ctx) for name, lc := range conf.LMDBs { + l := logrus.WithField("db", name) + env, err := syncer.OpenEnv(l, lc) + if err != nil { + return err + } + opt := syncer.Options{ ReceiveOnly: receiveOnly, } - s, err := syncer.New(name, st, conf, lc, opt) + s, err := syncer.New(name, env, st, conf, lc, opt) if err != nil { return err } - name := name eg.Go(func() error { + defer func() { + if err := env.Close(); err != nil { + l.WithError(err).Error("Env close failed") + } + }() err := s.Sync(ctx) if err != nil { if err == context.Canceled { - logrus.WithField("db", name).Error("Sync cancelled") + l.Error("Sync cancelled") return err } - logrus.WithError(err).WithField("db", name).Error("Sync failed") + l.WithError(err).Error("Sync failed") } return err }) diff --git a/syncer/env.go b/syncer/env.go new file mode 100644 index 0000000..6aa5075 --- /dev/null +++ b/syncer/env.go @@ -0,0 +1,33 @@ +package syncer + +import ( + "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/c2h5oh/datasize" + "github.com/sirupsen/logrus" + "powerdns.com/platform/lightningstream/config" + "powerdns.com/platform/lightningstream/lmdbenv" +) + +// OpenEnv opens the LMDB env with the right options +func OpenEnv(l logrus.FieldLogger, lc config.LMDB) (env *lmdb.Env, err error) { + l.WithField("lmdbpath", lc.Path).Info("Opening LMDB") + env, err = lmdbenv.NewWithOptions(lc.Path, lc.Options) + if err != nil { + return nil, err + } + + // Print some env info + info, err := env.Info() + if err != nil { + return nil, err + } + l.WithFields(logrus.Fields{ + "MapSize": datasize.ByteSize(info.MapSize).HumanReadable(), + "LastTxnID": info.LastTxnID, + }).Info("Env info") + + // TODO: Perhaps check data if SchemaTracksChanges is set. Check if + // the timestamp is in a reasonable range or 0. + + return env, nil +} diff --git a/syncer/send_test.go b/syncer/send_test.go index 90c71c5..47b3c3c 100644 --- a/syncer/send_test.go +++ b/syncer/send_test.go @@ -37,15 +37,12 @@ func doBenchmarkSyncerSendOnce(b *testing.B, native, dupsort bool) { extraDBIFlags = lmdb.DupSort } - syncer, err := New("test", memory.New(), config.Config{}, config.LMDB{ - SchemaTracksChanges: native, - DupSortHack: dupsort, - }, Options{}) - require.NoError(t, err) - l, hook := test.NewNullLogger() _ = hook - syncer.l = l + lc := config.LMDB{ + SchemaTracksChanges: native, + DupSortHack: dupsort, + } // Fixed value // We add a header, but we can also benchmark this as all app value @@ -53,11 +50,15 @@ func doBenchmarkSyncerSendOnce(b *testing.B, native, dupsort bool) { header.PutBasic(val, header.TimestampFromTime(now), 42, header.NoFlags) val = append(val, "TESTING-123456789"...) - err = lmdbenv.TestEnv(func(env *lmdb.Env) error { + err := lmdbenv.TestEnv(func(env *lmdb.Env) error { info, err := env.Info() require.NoError(t, err) t.Logf("env info: %+v", info) + syncer, err := New("test", env, memory.New(), config.Config{}, lc, Options{}) + require.NoError(t, err) + syncer.l = l + // Fill some data to dump err = env.Update(func(txn *lmdb.Txn) error { // First insert the initial data into the main database diff --git a/syncer/shadow_test.go b/syncer/shadow_test.go index c989d2b..8dbd6cc 100644 --- a/syncer/shadow_test.go +++ b/syncer/shadow_test.go @@ -39,12 +39,12 @@ func TestSyncer_shadow(t *testing.T) { ts2 := testTS(2) ts3 := testTS(3) - s, err := New("test", nil, config.Config{}, config.LMDB{}, Options{}) - assert.NoError(t, err) + err := lmdbenv.TestEnv(func(env *lmdb.Env) error { + s, err := New("test", env, nil, config.Config{}, config.LMDB{}, Options{}) + assert.NoError(t, err) - err = lmdbenv.TestEnv(func(env *lmdb.Env) error { // Initial data - err := env.Update(func(txn *lmdb.Txn) error { + err = env.Update(func(txn *lmdb.Txn) error { // First insert the initial data into the main database dbi, err := txn.OpenDBI("foo", lmdb.Create) assert.NoError(t, err) diff --git a/syncer/sync.go b/syncer/sync.go index 53858f3..e801aec 100644 --- a/syncer/sync.go +++ b/syncer/sync.go @@ -27,13 +27,7 @@ const ( // Sync opens the env and starts the two-way sync loop. func (s *Syncer) Sync(ctx context.Context) error { - // Open the env - env, err := s.openEnv() - if err != nil { - return err - } - defer s.closeEnv(env) - + env := s.env status.AddLMDBEnv(s.name, env) defer status.RemoveLMDBEnv(s.name) diff --git a/syncer/sync_test.go b/syncer/sync_test.go index 60693c2..91e310e 100644 --- a/syncer/sync_test.go +++ b/syncer/sync_test.go @@ -13,7 +13,7 @@ import ( "github.com/PowerDNS/simpleblob" "github.com/PowerDNS/simpleblob/backends/memory" "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "powerdns.com/platform/lightningstream/config" "powerdns.com/platform/lightningstream/config/logger" "powerdns.com/platform/lightningstream/lmdbenv" @@ -23,7 +23,7 @@ import ( const testLMDBName = "default" const testDBIName = "test" -const tick = 100 * time.Millisecond +const tick = 10 * time.Millisecond func TestSyncer_Sync_startup(t *testing.T) { t.Run("with-timestamped-schema", func(t *testing.T) { @@ -44,6 +44,13 @@ func doTest(t *testing.T, withHeader bool) { syncerA, envA := createInstance(t, "a", st, withHeader) syncerB, envB := createInstance(t, "b", st, withHeader) + // For some reason trying to close the envs in this test segfaults on Linux (not on macOS). + // It appears like this is caused by the syncer still running after cancellation + // (and thus after the env is closed), but I did not get to the bottom of this yet. + // [signal SIGSEGV: segmentation violation code=0x1 addr=0x7fd015132090 pc=0x8dc942] + //defer envA.Close() + //defer envB.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() ctxA, cancelA := context.WithCancel(ctx) @@ -55,46 +62,36 @@ func doTest(t *testing.T, withHeader bool) { t.Log("Starting syncer A") go runSync(ctxA, syncerA) - t.Log("----------") - time.Sleep(2 * tick) t.Log("----------") // Expecting one snapshot on startup, because not empty - logSnapshotList(t, st) - entries := listInstanceSnapshots(st, "a") - assert.Len(t, entries, 1, "A") + requireSnapshotsLenWait(t, st, 1, "A") // Start syncer B with an empty LMDB + // Starting with an empty LMDB is a special case that will not trigger any + // local snapshot. t.Log("Starting syncer B") go runSync(ctxB, syncerB) - t.Log("----------") - time.Sleep(2 * tick) t.Log("----------") - // No snapshot, because empty - logSnapshotList(t, st) - entries = listInstanceSnapshots(st, "b") - assert.Len(t, entries, 0, "B") + // Wait until the data from A was synced to B + assertKeyWait(t, envB, "foo", "v1", withHeader) - assertKey(t, envB, "foo", "v1", withHeader) + // No snapshot made by B, because we started empty + requireSnapshotsLenWait(t, st, 0, "B") // Now set something in B setKey(t, envB, "foo", "v2", withHeader) - t.Log("----------") - time.Sleep(3 * tick) t.Log("----------") // New snapshot in B, no new one in A - logSnapshotList(t, st) - entries = listInstanceSnapshots(st, "b") - assert.Len(t, entries, 1, "B") - entries = listInstanceSnapshots(st, "a") - assert.Len(t, entries, 1, "A") + requireSnapshotsLenWait(t, st, 1, "B") + requireSnapshotsLenWait(t, st, 1, "A") // New value should be present in A - assertKey(t, envB, "foo", "v2", withHeader) + assertKeyWait(t, envB, "foo", "v2", withHeader) // Restart syncer for A t.Log("Restarting syncer A") @@ -103,16 +100,13 @@ func doTest(t *testing.T, withHeader bool) { t.Log("----------") go runSync(ctxA, syncerA) - t.Log("----------") - time.Sleep(3 * tick) t.Log("----------") // Check is the contents of A are still correct after restart - assertKey(t, envA, "foo", "v2", withHeader) - entries = listInstanceSnapshots(st, "a") + assertKeyWait(t, envA, "foo", "v2", withHeader) // A new snapshot should always be created on startup, in case the LMDB // was modified while it was down. - assert.Len(t, entries, 2, "A") + requireSnapshotsLenWait(t, st, 2, "A") // Stopping syncer for A t.Log("Stopping syncer A") @@ -127,15 +121,12 @@ func doTest(t *testing.T, withHeader bool) { ctxA, cancelA = context.WithCancel(ctx) go runSync(ctxA, syncerA) t.Log("----------") - time.Sleep(6 * tick) - t.Log("----------") + // New value in A should get synced to B + assertKeyWait(t, envB, "new", "hello", withHeader) // Check if the contents of A are still correct after restart - assertKey(t, envA, "new", "hello", withHeader) - // It should also be synced to B - assertKey(t, envB, "new", "hello", withHeader) - entries = listInstanceSnapshots(st, "a") - assert.Len(t, entries, 3, "A") + assertKeyWait(t, envA, "new", "hello", withHeader) + requireSnapshotsLenWait(t, st, 3, "A") cancelA() cancelB() @@ -144,16 +135,16 @@ func doTest(t *testing.T, withHeader bool) { func createInstance(t *testing.T, name string, st simpleblob.Interface, timestamped bool) (*Syncer, *lmdb.Env) { env, tmp, err := createLMDB(t) - assert.NoError(t, err) + require.NoError(t, err) c := createConfig(name, tmp, timestamped) - syncer, err := New("default", st, c, c.LMDBs[testLMDBName], Options{}) - assert.NoError(t, err) + syncer, err := New("default", env, st, c, c.LMDBs[testLMDBName], Options{}) + require.NoError(t, err) return syncer, env } -func logSnapshotList(t *testing.T, st simpleblob.Interface) { +func LogSnapshotList(t *testing.T, st simpleblob.Interface) { ctx := context.Background() entries, _ := st.List(ctx, "") var lines []string @@ -178,6 +169,28 @@ func listInstanceSnapshots(st simpleblob.Interface, instance string) simpleblob. return entries } +func requireSnapshotsLenWait(t *testing.T, st simpleblob.Interface, expLen int, instance string) { + var list simpleblob.BlobList + // Retry until it succeeds + var i int + const maxIter = 150 + const sleepTime = 10 * time.Millisecond + defer func() { + t.Logf("Waited %d/%d iterations for the expected snapshot length", i, maxIter) + }() + for i = 0; i < maxIter; i++ { + list = listInstanceSnapshots(st, strings.ToLower(instance)) + l := len(list) + if l == expLen { + return + } + time.Sleep(sleepTime) + } + // This one is actually expected to fail, call it for the formatting + t.Logf("Gave up on waiting for the expected snapshot length") + require.Len(t, list, expLen, instance) +} + func runSync(ctx context.Context, syncer *Syncer) { err := syncer.Sync(ctx) if err != nil && err != context.Canceled { @@ -185,10 +198,28 @@ func runSync(ctx context.Context, syncer *Syncer) { } } -func assertKey(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) { - kv, err := dumpData(env, withHeader) - assert.NoError(t, err) - assert.Equal(t, val, kv[key]) +func assertKeyWait(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) { + var kv map[string]string + var err error + var i int + const maxIter = 150 + const sleepTime = 10 * time.Millisecond + defer func() { + t.Logf("Waited %d/%d iterations for the expected key", i, maxIter) + }() + for i = 0; i < maxIter; i++ { + kv, err = dumpData(env, withHeader) + if err != nil && !lmdb.IsNotFound(err) { + require.NoError(t, err) + } + if kv[key] == val { + return + } + time.Sleep(sleepTime) + } + // Expected to fail now, called for formatting + t.Logf("Gave up on waiting for the expected key") + require.Equal(t, val, kv[key]) } func setKey(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) { @@ -210,13 +241,13 @@ func setKey(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) { err = txn.Put(dbi, []byte(key), valb, 0) return err }) - assert.NoError(t, err) + require.NoError(t, err) } func dumpData(env *lmdb.Env, withHeader bool) (map[string]string, error) { data := make(map[string]string) err := env.View(func(txn *lmdb.Txn) error { - dbi, err := txn.OpenDBI(testDBIName, lmdb.Create) + dbi, err := txn.OpenDBI(testDBIName, 0) if err != nil { return err } diff --git a/syncer/syncer.go b/syncer/syncer.go index 6af3608..50402a6 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/PowerDNS/lmdb-go/lmdb" "github.com/PowerDNS/simpleblob" "github.com/sirupsen/logrus" "powerdns.com/platform/lightningstream/syncer/cleaner" @@ -13,7 +14,7 @@ import ( "powerdns.com/platform/lightningstream/status/starttracker" ) -func New(name string, st simpleblob.Interface, c config.Config, lc config.LMDB, opt Options) (*Syncer, error) { +func New(name string, env *lmdb.Env, st simpleblob.Interface, c config.Config, lc config.LMDB, opt Options) (*Syncer, error) { l := logrus.WithField("db", name) // Start cleaner, but make sure it is disabled if we run in receive-only mode @@ -35,6 +36,7 @@ func New(name string, st simpleblob.Interface, c config.Config, lc config.LMDB, opt: opt, shadow: true, generation: 0, + env: env, lastByInstance: make(map[string]time.Time), cleaner: cl, storageStoreHealth: healthtracker.New(c.Health.StorageStore, fmt.Sprintf("%s_storage_store", name), "write to storage backend"), @@ -63,6 +65,7 @@ type Syncer struct { l logrus.FieldLogger shadow bool // use shadow database for timestamps? generation uint64 + env *lmdb.Env // lastByInstance tracks the last snapshot loaded by instance, so that the // cleaner can make safe decisions about when to remove stale snapshots. diff --git a/syncer/utils.go b/syncer/utils.go index 6d9b241..37bca0e 100644 --- a/syncer/utils.go +++ b/syncer/utils.go @@ -9,10 +9,7 @@ import ( "time" "github.com/PowerDNS/lmdb-go/lmdb" - "github.com/c2h5oh/datasize" "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "powerdns.com/platform/lightningstream/lmdbenv" "powerdns.com/platform/lightningstream/lmdbenv/dbiflags" "powerdns.com/platform/lightningstream/lmdbenv/header" "powerdns.com/platform/lightningstream/lmdbenv/stats" @@ -83,37 +80,6 @@ func (s *Syncer) generationID() string { return fmt.Sprintf("G-%016x", s.generation) } -// openEnv opens the LMDB env with the right options -func (s *Syncer) openEnv() (env *lmdb.Env, err error) { - s.l.WithField("lmdbpath", s.lc.Path).Info("Opening LMDB") - env, err = lmdbenv.NewWithOptions(s.lc.Path, s.lc.Options) - if err != nil { - return nil, err - } - - // Print some env info - info, err := env.Info() - if err != nil { - return nil, err - } - s.l.WithFields(logrus.Fields{ - "MapSize": datasize.ByteSize(info.MapSize).HumanReadable(), - "LastTxnID": info.LastTxnID, - }).Info("Env info") - - // TODO: Perhaps check data if SchemaTracksChanges is set. Check if - // the timestamp is in a reasonable range or 0. - - return env, nil -} - -// closeEnv closes the LMDB env, logging any unexpected errors for easy defer -func (s *Syncer) closeEnv(env *lmdb.Env) { - if err := env.Close(); err != nil { - s.l.WithError(err).Warn("Env close returned error") - } -} - // readDBI reads a DBI into a snapshot DBI. // By default, the headers of values will be split out to the corresponding snapshot fields. // If rawValues is true, the value will be stored as is and the headers will diff --git a/test-in-docker.sh b/test-in-docker.sh index a78e0d9..d105bb5 100755 --- a/test-in-docker.sh +++ b/test-in-docker.sh @@ -4,6 +4,6 @@ set -ex image=lightningstream-test -docker build -t "$image" . +docker build --target=builder -t "$image" . docker run -w /src --entrypoint '' "$image" /src/test.sh "$@" diff --git a/test.sh b/test.sh index 158078f..a13946a 100755 --- a/test.sh +++ b/test.sh @@ -14,7 +14,13 @@ set -ex go test -count=1 "$@" ./... go test -count=1 "$@" github.com/PowerDNS/simpleblob/... +# Run again with race detector +go test -race -count=1 "$@" ./... + +# This one used to be flaky, run a few more times +go test -count 20 -run TestSyncer_Sync_startup powerdns.com/platform/lightningstream/syncer + # Configure linters in .golangci.yml -GOBIN="$PWD/bin" go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.50.1 +GOBIN="$PWD/bin" go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.52.0 ./bin/golangci-lint run