From c7a3c17e9f36abf9422d1044689d3134f735b332 Mon Sep 17 00:00:00 2001 From: Dylan Terry Date: Fri, 4 Oct 2024 13:49:58 -0400 Subject: [PATCH 1/6] This commit addresses an issue where acknowledgments (ACKs) were sometimes sent to the master before binlog events were fully written and fsynced to disk during backup operations. Sending ACKs prematurely in semi-synchronous replication could lead to data loss if the replica fails after sending the ACK but before persisting the event. Key changes: - Introduced an `EventHandler` interface with a `HandleEvent` method for processing binlog events. This allows custom event handling logic to be injected into the replication stream. - Added an `eventHandler` field to `BinlogSyncer` and provided a `SetEventHandler` method to assign an event handler. This enables `BinlogSyncer` to delegate event processing to the assigned handler. - Implemented `BackupEventHandler` which writes binlog events to disk and ensures that each event is fsynced before returning. This ensures data durability before ACKs are sent. - Modified the `onStream` method in `BinlogSyncer` to separate event parsing (`parseEvent`) from event handling and ACK sending (`handleEventAndACK`). This adheres to the single-responsibility principle and makes the code cleaner. - Moved state updates (e.g., updating `b.nextPos`) and GTID set handling from `parseEvent` to `handleEventAndACK` to avoid side effects during parsing. - Ensured that ACKs are sent only after the event has been fully processed and fsynced by sending the ACK in `handleEventAndACK` after event handling. --- replication/backup.go | 157 +++++++++++++++++++++++++----------- replication/backup_test.go | 46 +++++++++++ replication/binlogsyncer.go | 122 +++++++++++++++++++++------- 3 files changed, 253 insertions(+), 72 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index 86265ae70..06af71ad4 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -5,6 +5,7 @@ import ( "io" "os" "path" + "sync" "time" . "github.com/go-mysql-org/go-mysql/mysql" @@ -41,77 +42,143 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, // Force use raw mode b.parser.SetRawMode(true) + // Set up the backup event handler + backupHandler := &BackupEventHandler{ + handler: handler, + } + + if b.cfg.SyncMode == SyncModeSync { + // Set the event handler in BinlogSyncer for synchronous mode + b.SetEventHandler(backupHandler) + } + s, err := b.StartSync(p) if err != nil { return errors.Trace(err) } - var filename string - var offset uint32 - - var w io.WriteCloser defer func() { - var closeErr error - if w != nil { - closeErr = w.Close() - } - if retErr == nil { - retErr = closeErr + b.SetEventHandler(nil) // Reset the event handler + if backupHandler.w != nil { + closeErr := backupHandler.w.Close() + if retErr == nil { + retErr = closeErr + } } }() - for { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - e, err := s.GetEvent(ctx) - cancel() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - if err == context.DeadlineExceeded { + if b.cfg.SyncMode == SyncModeSync { + // Synchronous mode: wait for completion or error + select { + case <-ctx.Done(): return nil - } - - if err != nil { + case <-b.ctx.Done(): + return nil + case err := <-s.ech: return errors.Trace(err) } + } else { + // Asynchronous mode: consume events from the streamer + for { + select { + case <-ctx.Done(): + return nil + case <-b.ctx.Done(): + return nil + case err := <-s.ech: + return errors.Trace(err) + case e := <-s.ch: + err = backupHandler.HandleEvent(e) + if err != nil { + return errors.Trace(err) + } + } + } + } +} - offset = e.Header.LogPos +// BackupEventHandler handles writing events for backup +type BackupEventHandler struct { + handler func(binlogFilename string) (io.WriteCloser, error) + w io.WriteCloser + mutex sync.Mutex + fsyncedChan chan struct{} + eventCount int // eventCount used for testing - if e.Header.EventType == ROTATE_EVENT { - rotateEvent := e.Event.(*RotateEvent) - filename = string(rotateEvent.NextLogName) + filename string +} - if e.Header.Timestamp == 0 || offset == 0 { - // fake rotate event - continue - } - } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { - // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new +func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { + h.mutex.Lock() + defer h.mutex.Unlock() - if w != nil { - if err = w.Close(); err != nil { - w = nil - return errors.Trace(err) - } - } + var err error - if len(filename) == 0 { - return errors.Errorf("empty binlog filename for FormateDescriptionEvent") - } + // Update the offset + offset := e.Header.LogPos - w, err = handler(filename) - if err != nil { - return errors.Trace(err) - } + if e.Header.EventType == ROTATE_EVENT { + rotateEvent := e.Event.(*RotateEvent) + h.filename = string(rotateEvent.NextLogName) - // write binlog header fe'bin' - if _, err = w.Write(BinLogFileHeader); err != nil { + if e.Header.Timestamp == 0 || offset == 0 { + // Fake rotate event, skip processing + return nil + } + } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { + // Close the current writer and open a new one + if h.w != nil { + if err = h.w.Close(); err != nil { + h.w = nil return errors.Trace(err) } } - if n, err := w.Write(e.RawData); err != nil { + if len(h.filename) == 0 { + return errors.Errorf("empty binlog filename for FormatDescriptionEvent") + } + + h.w, err = h.handler(h.filename) + if err != nil { + return errors.Trace(err) + } + + // Write binlog header fe'bin' + _, err = h.w.Write(BinLogFileHeader) + if err != nil { return errors.Trace(err) - } else if n != len(e.RawData) { + } + } + + // Write raw event data to the current writer + if h.w != nil { + n, err := h.w.Write(e.RawData) + if err != nil { + return errors.Trace(err) + } + if n != len(e.RawData) { return errors.Trace(io.ErrShortWrite) } + + // Perform Sync if the writer supports it + if f, ok := h.w.(*os.File); ok { + if err := f.Sync(); err != nil { + return errors.Trace(err) + } + // Signal that fsync has completed + if h.fsyncedChan != nil { + h.fsyncedChan <- struct{}{} + } + } + } else { + // If writer is nil and event is not FORMAT_DESCRIPTION_EVENT, we can't write + // This should not happen if events are in expected order + return errors.New("writer is not initialized") } + + h.eventCount++ + return nil } diff --git a/replication/backup_test.go b/replication/backup_test.go index abefd3f8d..91f70c495 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -47,3 +47,49 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { t.T().Fatal("time out error") } } + +// TestAsyncBackup runs the backup process in asynchronous mode and verifies binlog file creation. +func (t *testSyncerSuite) TestAsyncBackup() { + testSyncModeBackup(t, SyncModeAsync) +} + +// TestSyncBackup runs the backup process in synchronous mode and verifies binlog file creation. +func (t *testSyncerSuite) TestSyncBackup() { + testSyncModeBackup(t, SyncModeSync) +} + +// testSyncModeBackup is a helper function that runs the backup process for a given sync mode and checks if binlog files are written correctly. +func testSyncModeBackup(t *testSyncerSuite, syncMode SyncMode) { + t.setupTest(mysql.MySQLFlavor) + t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled + t.b.cfg.SyncMode = syncMode // Set the sync mode + + binlogDir := "./var" + os.RemoveAll(binlogDir) + timeout := 3 * time.Second + + done := make(chan bool) + + // Start the backup process in a goroutine + go func() { + err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) + require.NoError(t.T(), err) + done <- true + }() + + failTimeout := 2 * timeout + ctx, cancel := context.WithTimeout(context.Background(), failTimeout) + defer cancel() + + // Wait for the backup to complete or timeout + select { + case <-done: + // Check if binlog files are written to the specified directory + files, err := os.ReadDir(binlogDir) + require.NoError(t.T(), err, "Failed to read binlog directory") + require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory") + t.T().Logf("Backup completed successfully in %v mode with %d binlog file(s).", syncMode, len(files)) + case <-ctx.Done(): + t.T().Fatalf("Timeout error during backup in %v mode.", syncMode) + } +} diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 39e5749ea..0b4e5dd04 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -25,6 +25,13 @@ var ( errSyncRunning = errors.New("Sync is running, must Close first") ) +type SyncMode int + +const ( + SyncModeAsync SyncMode = iota // Asynchronous mode (default) + SyncModeSync // Synchronous mode +) + // BinlogSyncerConfig is the configuration for BinlogSyncer. type BinlogSyncerConfig struct { // ServerID is the unique ID in cluster. @@ -126,6 +133,16 @@ type BinlogSyncerConfig struct { DiscardGTIDSet bool EventCacheCount int + + // SyncMode specifies whether to operate in synchronous or asynchronous mode. + // - SyncModeAsync (default): Events are sent to the BinlogStreamer and can be consumed via GetEvent(). + // - SyncModeSync: Events are processed synchronously using the EventHandler. + SyncMode SyncMode +} + +// EventHandler defines the interface for processing binlog events. +type EventHandler interface { + HandleEvent(e *BinlogEvent) error } // BinlogSyncer syncs binlog event from server. @@ -155,6 +172,8 @@ type BinlogSyncer struct { lastConnectionID uint32 retryCount int + + eventHandler EventHandler } // NewBinlogSyncer creates the BinlogSyncer with cfg. @@ -382,6 +401,12 @@ func (b *BinlogSyncer) enableSemiSync() error { return nil } +func (b *BinlogSyncer) SetEventHandler(handler EventHandler) { + b.m.Lock() + defer b.m.Unlock() + b.eventHandler = handler +} + func (b *BinlogSyncer) prepare() error { if b.isClosed() { return errors.Trace(ErrSyncClosed) @@ -765,7 +790,16 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: - if err = b.parseEvent(s, data); err != nil { + // Parse the event + e, needACK, err := b.parseEvent(data) + if err != nil { + s.closeWithError(err) + return + } + + // Handle the event and send ACK if necessary + err = b.handleEventAndACK(s, e, needACK) + if err != nil { s.closeWithError(err) return } @@ -786,39 +820,42 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } } -func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { - //skip OK byte, 0x00 +// parseEvent parses the raw data into a BinlogEvent. +// It only handles parsing and does not perform any side effects. +func (b *BinlogSyncer) parseEvent(data []byte) (*BinlogEvent, bool, error) { + // Skip OK byte (0x00) data = data[1:] needACK := false - if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) { + if b.cfg.SemiSyncEnabled && data[0] == SemiSyncIndicator { needACK = data[1] == 0x01 - //skip semi sync header + // Skip semi-sync header data = data[2:] } + // Parse the event using the BinlogParser e, err := b.parser.Parse(data) if err != nil { - return errors.Trace(err) + return nil, false, errors.Trace(err) } + return e, needACK, nil +} + +func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, needACK bool) error { + // Update the next position based on the event's LogPos if e.Header.LogPos > 0 { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos } - getCurrentGtidSet := func() GTIDSet { - if b.currGset == nil { - return nil - } - return b.currGset.Clone() - } - + // Handle event types to update positions and GTID sets switch event := e.Event.(type) { case *RotateEvent: b.nextPos.Name = string(event.NextLogName) b.nextPos.Pos = uint32(event.Position) b.cfg.Logger.Infof("rotate to %s", b.nextPos) + case *GTIDEvent: if b.prevGset == nil { break @@ -826,13 +863,20 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { if b.currGset == nil { b.currGset = b.prevGset.Clone() } - u, _ := uuid.FromBytes(event.SID) + u, err := uuid.FromBytes(event.SID) + if err != nil { + return errors.Trace(err) + } b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO) if b.prevMySQLGTIDEvent != nil { - u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID) + u, err = uuid.FromBytes(b.prevMySQLGTIDEvent.SID) + if err != nil { + return errors.Trace(err) + } b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO) } b.prevMySQLGTIDEvent = event + case *MariadbGTIDEvent: if b.prevGset == nil { break @@ -841,29 +885,49 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { b.currGset = b.prevGset.Clone() } prev := b.currGset.Clone() - err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) + err := b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) if err != nil { return errors.Trace(err) } - // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed + // Right after reconnect we may see the same GTID as before; update prevGset if currGset changed if !b.currGset.Equal(prev) { b.prevGset = prev } + case *XIDEvent: if !b.cfg.DiscardGTIDSet { - event.GSet = getCurrentGtidSet() + event.GSet = b.getCurrentGtidSet() } + case *QueryEvent: if !b.cfg.DiscardGTIDSet { - event.GSet = getCurrentGtidSet() + event.GSet = b.getCurrentGtidSet() } } - needStop := false - select { - case s.ch <- e: - case <-b.ctx.Done(): - needStop = true + // Process the event based on the configured SyncMode + switch b.cfg.SyncMode { + case SyncModeSync: + // Synchronous mode: use EventHandler + b.m.RLock() + handler := b.eventHandler + b.m.RUnlock() + if handler != nil { + err := handler.HandleEvent(e) + if err != nil { + return errors.Trace(err) + } + } else { + return errors.New("no EventHandler set for synchronous mode") + } + + case SyncModeAsync: + // Asynchronous mode: send the event to the streamer channel + select { + case s.ch <- e: + case <-b.ctx.Done(): + return errors.New("sync is being closed...") + } } if needACK { @@ -873,10 +937,14 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { } } - if needStop { - return errors.New("sync is been closing...") - } + return nil +} +// getCurrentGtidSet returns a clone of the current GTID set. +func (b *BinlogSyncer) getCurrentGtidSet() GTIDSet { + if b.currGset != nil { + return b.currGset.Clone() + } return nil } From d162219f4e5ac9949925f3e3fd5adfd2c48e149f Mon Sep 17 00:00:00 2001 From: Dylan Terry Date: Wed, 9 Oct 2024 15:42:54 -0400 Subject: [PATCH 2/6] Refactor event handling by replacing SyncMode and EventHandleMode with SynchronousEventHandler. Simplify the event processing in BinlogSyncerConfig by introducing SynchronousEventHandler for synchronous event handling. Update StartBackup, StartBackupWithHandler, and associated tests to reflect these changes. --- replication/backup.go | 103 +++++++++++++++++------------------- replication/backup_test.go | 37 ++++++++++--- replication/binlogsyncer.go | 36 ++++--------- 3 files changed, 88 insertions(+), 88 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index 06af71ad4..31b630838 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -12,20 +12,24 @@ import ( "github.com/pingcap/errors" ) -// StartBackup: Like mysqlbinlog remote raw backup -// Backup remote binlog from position (filename, offset) and write in backupDir +// StartBackup starts the backup process for the binary log and writes to the backup directory. func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error { err := os.MkdirAll(backupDir, 0755) if err != nil { return errors.Trace(err) } - return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) { - return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) - }) + if b.cfg.SynchronousEventHandler == nil { + return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + }) + } else { + return b.StartSynchronousBackup(p, timeout) + } } // StartBackupWithHandler starts the backup process for the binary log using the specified position and handler. // The process will continue until the timeout is reached or an error occurs. +// This method should not be used together with SynchronousEventHandler. // // Parameters: // - p: The starting position in the binlog from which to begin the backup. @@ -38,6 +42,9 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, // a very long timeout here timeout = 30 * 3600 * 24 * time.Second } + if b.cfg.SynchronousEventHandler != nil { + return errors.New("StartBackupWithHandler cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackup instead.") + } // Force use raw mode b.parser.SetRawMode(true) @@ -47,18 +54,12 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, handler: handler, } - if b.cfg.SyncMode == SyncModeSync { - // Set the event handler in BinlogSyncer for synchronous mode - b.SetEventHandler(backupHandler) - } - s, err := b.StartSync(p) if err != nil { return errors.Trace(err) } defer func() { - b.SetEventHandler(nil) // Reset the event handler if backupHandler.w != nil { closeErr := backupHandler.w.Close() if retErr == nil { @@ -70,8 +71,7 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - if b.cfg.SyncMode == SyncModeSync { - // Synchronous mode: wait for completion or error + for { select { case <-ctx.Done(): return nil @@ -79,57 +79,68 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, return nil case err := <-s.ech: return errors.Trace(err) - } - } else { - // Asynchronous mode: consume events from the streamer - for { - select { - case <-ctx.Done(): - return nil - case <-b.ctx.Done(): - return nil - case err := <-s.ech: + case e := <-s.ch: + err = backupHandler.HandleEvent(e) + if err != nil { return errors.Trace(err) - case e := <-s.ch: - err = backupHandler.HandleEvent(e) - if err != nil { - return errors.Trace(err) - } } } } } +// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig. +func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error { + if b.cfg.SynchronousEventHandler == nil { + return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup") + } + + if timeout == 0 { + timeout = 30 * 3600 * 24 * time.Second // Long timeout by default + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + s, err := b.StartSync(p) + if err != nil { + return errors.Trace(err) + } + + // Wait for the binlog syncer to finish or encounter an error + select { + case <-ctx.Done(): + return nil + case <-b.ctx.Done(): + return nil + case err := <-s.ech: + return errors.Trace(err) + } +} + // BackupEventHandler handles writing events for backup type BackupEventHandler struct { - handler func(binlogFilename string) (io.WriteCloser, error) - w io.WriteCloser - mutex sync.Mutex - fsyncedChan chan struct{} - eventCount int // eventCount used for testing + handler func(binlogFilename string) (io.WriteCloser, error) + w io.WriteCloser + mutex sync.Mutex filename string } +// HandleEvent processes a single event for the backup. func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { h.mutex.Lock() defer h.mutex.Unlock() var err error - - // Update the offset offset := e.Header.LogPos if e.Header.EventType == ROTATE_EVENT { rotateEvent := e.Event.(*RotateEvent) h.filename = string(rotateEvent.NextLogName) - if e.Header.Timestamp == 0 || offset == 0 { - // Fake rotate event, skip processing return nil } } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { - // Close the current writer and open a new one if h.w != nil { if err = h.w.Close(); err != nil { h.w = nil @@ -146,14 +157,12 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { return errors.Trace(err) } - // Write binlog header fe'bin' _, err = h.w.Write(BinLogFileHeader) if err != nil { return errors.Trace(err) } } - // Write raw event data to the current writer if h.w != nil { n, err := h.w.Write(e.RawData) if err != nil { @@ -162,23 +171,9 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { if n != len(e.RawData) { return errors.Trace(io.ErrShortWrite) } - - // Perform Sync if the writer supports it - if f, ok := h.w.(*os.File); ok { - if err := f.Sync(); err != nil { - return errors.Trace(err) - } - // Signal that fsync has completed - if h.fsyncedChan != nil { - h.fsyncedChan <- struct{}{} - } - } } else { - // If writer is nil and event is not FORMAT_DESCRIPTION_EVENT, we can't write - // This should not happen if events are in expected order return errors.New("writer is not initialized") } - h.eventCount++ return nil } diff --git a/replication/backup_test.go b/replication/backup_test.go index 91f70c495..7d6609157 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -2,7 +2,9 @@ package replication import ( "context" + "io" "os" + "path" "time" "github.com/stretchr/testify/require" @@ -50,24 +52,36 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { // TestAsyncBackup runs the backup process in asynchronous mode and verifies binlog file creation. func (t *testSyncerSuite) TestAsyncBackup() { - testSyncModeBackup(t, SyncModeAsync) + testBackup(t, false) // false indicates asynchronous mode } // TestSyncBackup runs the backup process in synchronous mode and verifies binlog file creation. func (t *testSyncerSuite) TestSyncBackup() { - testSyncModeBackup(t, SyncModeSync) + testBackup(t, true) // true indicates synchronous mode } -// testSyncModeBackup is a helper function that runs the backup process for a given sync mode and checks if binlog files are written correctly. -func testSyncModeBackup(t *testSyncerSuite, syncMode SyncMode) { +// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly. +func testBackup(t *testSyncerSuite, isSynchronous bool) { t.setupTest(mysql.MySQLFlavor) t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled - t.b.cfg.SyncMode = syncMode // Set the sync mode binlogDir := "./var" os.RemoveAll(binlogDir) timeout := 3 * time.Second + if isSynchronous { + // Set up a BackupEventHandler for synchronous mode + backupHandler := &BackupEventHandler{ + handler: func(filename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + }, + } + t.b.cfg.SynchronousEventHandler = backupHandler + } else { + // Ensure SynchronousEventHandler is nil for asynchronous mode + t.b.cfg.SynchronousEventHandler = nil + } + done := make(chan bool) // Start the backup process in a goroutine @@ -88,8 +102,17 @@ func testSyncModeBackup(t *testSyncerSuite, syncMode SyncMode) { files, err := os.ReadDir(binlogDir) require.NoError(t.T(), err, "Failed to read binlog directory") require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory") - t.T().Logf("Backup completed successfully in %v mode with %d binlog file(s).", syncMode, len(files)) + mode := modeLabel(isSynchronous) + t.T().Logf("Backup completed successfully in %s mode with %d binlog file(s).", mode, len(files)) case <-ctx.Done(): - t.T().Fatalf("Timeout error during backup in %v mode.", syncMode) + mode := modeLabel(isSynchronous) + t.T().Fatalf("Timeout error during backup in %s mode.", mode) + } +} + +func modeLabel(isSynchronous bool) string { + if isSynchronous { + return "synchronous" } + return "asynchronous" } diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 0b4e5dd04..50934a893 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -25,13 +25,6 @@ var ( errSyncRunning = errors.New("Sync is running, must Close first") ) -type SyncMode int - -const ( - SyncModeAsync SyncMode = iota // Asynchronous mode (default) - SyncModeSync // Synchronous mode -) - // BinlogSyncerConfig is the configuration for BinlogSyncer. type BinlogSyncerConfig struct { // ServerID is the unique ID in cluster. @@ -134,10 +127,9 @@ type BinlogSyncerConfig struct { EventCacheCount int - // SyncMode specifies whether to operate in synchronous or asynchronous mode. - // - SyncModeAsync (default): Events are sent to the BinlogStreamer and can be consumed via GetEvent(). - // - SyncModeSync: Events are processed synchronously using the EventHandler. - SyncMode SyncMode + // SynchronousEventHandler is used for synchronous event handling. + // This should not be used together with StartBackupWithHandler. + SynchronousEventHandler EventHandler } // EventHandler defines the interface for processing binlog events. @@ -905,23 +897,13 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need } } - // Process the event based on the configured SyncMode - switch b.cfg.SyncMode { - case SyncModeSync: - // Synchronous mode: use EventHandler - b.m.RLock() - handler := b.eventHandler - b.m.RUnlock() - if handler != nil { - err := handler.HandleEvent(e) - if err != nil { - return errors.Trace(err) - } - } else { - return errors.New("no EventHandler set for synchronous mode") + // Use SynchronousEventHandler if it's set + if b.cfg.SynchronousEventHandler != nil { + err := b.cfg.SynchronousEventHandler.HandleEvent(e) + if err != nil { + return errors.Trace(err) } - - case SyncModeAsync: + } else { // Asynchronous mode: send the event to the streamer channel select { case s.ch <- e: From ca576c957993d8bc26a742bd1fa8e54e81c20aa8 Mon Sep 17 00:00:00 2001 From: Dylan Terry Date: Wed, 9 Oct 2024 15:48:16 -0400 Subject: [PATCH 3/6] Add some comments and remember to remove SetEventHandler and the eventHandler attribute --- replication/binlogsyncer.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 50934a893..a502ee851 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -11,11 +11,10 @@ import ( "sync" "time" - "github.com/siddontang/go-log/loggers" - "github.com/google/uuid" "github.com/pingcap/errors" "github.com/siddontang/go-log/log" + "github.com/siddontang/go-log/loggers" "github.com/go-mysql-org/go-mysql/client" . "github.com/go-mysql-org/go-mysql/mysql" @@ -58,7 +57,7 @@ type BinlogSyncerConfig struct { TLSConfig *tls.Config // Use replication.Time structure for timestamp and datetime. - // We will use Local location for timestamp and UTC location for datatime. + // We will use Local location for timestamp and UTC location for datetime. ParseTime bool // If ParseTime is false, convert TIMESTAMP into this specified timezone. If @@ -137,7 +136,7 @@ type EventHandler interface { HandleEvent(e *BinlogEvent) error } -// BinlogSyncer syncs binlog event from server. +// BinlogSyncer syncs binlog events from the server. type BinlogSyncer struct { m sync.RWMutex @@ -164,11 +163,9 @@ type BinlogSyncer struct { lastConnectionID uint32 retryCount int - - eventHandler EventHandler } -// NewBinlogSyncer creates the BinlogSyncer with cfg. +// NewBinlogSyncer creates the BinlogSyncer with the given configuration. func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { if cfg.Logger == nil { streamHandler, _ := log.NewStreamHandler(os.Stdout) @@ -185,7 +182,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { cfg.EventCacheCount = 10240 } - // Clear the Password to avoid outputing it in log. + // Clear the Password to avoid outputting it in logs. pass := cfg.Password cfg.Password = "" cfg.Logger.Infof("create BinlogSyncer with config %+v", cfg) @@ -393,12 +390,6 @@ func (b *BinlogSyncer) enableSemiSync() error { return nil } -func (b *BinlogSyncer) SetEventHandler(handler EventHandler) { - b.m.Lock() - defer b.m.Unlock() - b.eventHandler = handler -} - func (b *BinlogSyncer) prepare() error { if b.isClosed() { return errors.Trace(ErrSyncClosed) @@ -834,6 +825,7 @@ func (b *BinlogSyncer) parseEvent(data []byte) (*BinlogEvent, bool, error) { return e, needACK, nil } +// handleEventAndACK processes an event and sends an ACK if necessary. func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, needACK bool) error { // Update the next position based on the event's LogPos if e.Header.LogPos > 0 { From a8bcf4c45ffe93dfa57a0989921c03100bd3e995 Mon Sep 17 00:00:00 2001 From: Dylan Terry Date: Fri, 11 Oct 2024 15:22:21 -0400 Subject: [PATCH 4/6] Remove the timeout for synchronous backup, revert the timeout move to return the behavior to 30 days _between_ events, restore some comments, use struct instead of bool as recommended, add a note about SynchronousEventHandler and the parseEvent return values --- replication/backup.go | 22 +++++++--------------- replication/backup_test.go | 4 ++-- replication/binlogsyncer.go | 11 +++++++---- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index 31b630838..19d780bac 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -23,7 +23,7 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) }) } else { - return b.StartSynchronousBackup(p, timeout) + return b.StartSynchronousBackup(p) } } @@ -68,10 +68,10 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, } }() - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - for { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + select { case <-ctx.Done(): return nil @@ -89,27 +89,17 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, } // StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig. -func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error { +func (b *BinlogSyncer) StartSynchronousBackup(p Position) error { if b.cfg.SynchronousEventHandler == nil { return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup") } - if timeout == 0 { - timeout = 30 * 3600 * 24 * time.Second // Long timeout by default - } - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - s, err := b.StartSync(p) if err != nil { return errors.Trace(err) } - // Wait for the binlog syncer to finish or encounter an error select { - case <-ctx.Done(): - return nil case <-b.ctx.Done(): return nil case err := <-s.ech: @@ -138,6 +128,7 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { rotateEvent := e.Event.(*RotateEvent) h.filename = string(rotateEvent.NextLogName) if e.Header.Timestamp == 0 || offset == 0 { + // fake rotate event return nil } } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { @@ -157,6 +148,7 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { return errors.Trace(err) } + // Write binlog header 0xfebin _, err = h.w.Write(BinLogFileHeader) if err != nil { return errors.Trace(err) diff --git a/replication/backup_test.go b/replication/backup_test.go index 7d6609157..83714c16e 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -32,12 +32,12 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { os.RemoveAll(binlogDir) timeout := 2 * time.Second - done := make(chan bool) + done := make(chan struct{}) go func() { err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) require.NoError(t.T(), err) - done <- true + close(done) }() failTimeout := 5 * timeout ctx, cancel := context.WithTimeout(context.Background(), failTimeout) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index a502ee851..6763bf3f7 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -128,6 +128,7 @@ type BinlogSyncerConfig struct { // SynchronousEventHandler is used for synchronous event handling. // This should not be used together with StartBackupWithHandler. + // If this is not nil, GetEvent does not need to be called. SynchronousEventHandler EventHandler } @@ -805,11 +806,13 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { // parseEvent parses the raw data into a BinlogEvent. // It only handles parsing and does not perform any side effects. -func (b *BinlogSyncer) parseEvent(data []byte) (*BinlogEvent, bool, error) { +// Returns the parsed BinlogEvent, a boolean indicating if an ACK is needed, and an error if the +// parsing fails +func (b *BinlogSyncer) parseEvent(data []byte) (event *BinlogEvent, needACK bool, err error) { // Skip OK byte (0x00) data = data[1:] - needACK := false + needACK = false if b.cfg.SemiSyncEnabled && data[0] == SemiSyncIndicator { needACK = data[1] == 0x01 // Skip semi-sync header @@ -817,12 +820,12 @@ func (b *BinlogSyncer) parseEvent(data []byte) (*BinlogEvent, bool, error) { } // Parse the event using the BinlogParser - e, err := b.parser.Parse(data) + event, err = b.parser.Parse(data) if err != nil { return nil, false, errors.Trace(err) } - return e, needACK, nil + return event, needACK, nil } // handleEventAndACK processes an event and sends an ACK if necessary. From 06c92686f93c950cb21287a48c5094420d11c353 Mon Sep 17 00:00:00 2001 From: Dylan Terry Date: Fri, 11 Oct 2024 15:38:21 -0400 Subject: [PATCH 5/6] Make sure to assign the timeout on the syncer so the backup doesn't fail --- replication/backup.go | 19 +++++++++++++++++-- replication/backup_test.go | 1 + 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index 19d780bac..ee80964de 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -23,7 +23,7 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) }) } else { - return b.StartSynchronousBackup(p) + return b.StartSynchronousBackup(p, timeout) } } @@ -89,7 +89,7 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, } // StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig. -func (b *BinlogSyncer) StartSynchronousBackup(p Position) error { +func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error { if b.cfg.SynchronousEventHandler == nil { return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup") } @@ -99,10 +99,25 @@ func (b *BinlogSyncer) StartSynchronousBackup(p Position) error { return errors.Trace(err) } + var ctx context.Context + var cancel context.CancelFunc + + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + defer cancel() + } else { + ctx = context.Background() + } + select { + case <-ctx.Done(): + // The timeout has been reached + return nil case <-b.ctx.Done(): + // The BinlogSyncer has been closed return nil case err := <-s.ech: + // An error occurred during streaming return errors.Trace(err) } } diff --git a/replication/backup_test.go b/replication/backup_test.go index 83714c16e..688e47fc2 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" ) +// TestStartBackupEndInGivenTime tests the backup process completes within a given time. func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { t.setupTest(mysql.MySQLFlavor) From bc1dd076bbfa30a291323bf213e6bd896330c112 Mon Sep 17 00:00:00 2001 From: Dylan Terry Date: Wed, 16 Oct 2024 11:16:41 -0400 Subject: [PATCH 6/6] Make sure to add NewBackupHandler in order to expose the otherwise private handler outside the package --- replication/backup.go | 6 ++++++ replication/backup_test.go | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index ee80964de..f1d7dd28b 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -131,6 +131,12 @@ type BackupEventHandler struct { filename string } +func NewBackupEventHandler(handlerFunction func(filename string) (io.WriteCloser, error)) *BackupEventHandler { + return &BackupEventHandler{ + handler: handlerFunction, + } +} + // HandleEvent processes a single event for the backup. func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { h.mutex.Lock() diff --git a/replication/backup_test.go b/replication/backup_test.go index 688e47fc2..769f61e83 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -72,11 +72,11 @@ func testBackup(t *testSyncerSuite, isSynchronous bool) { if isSynchronous { // Set up a BackupEventHandler for synchronous mode - backupHandler := &BackupEventHandler{ - handler: func(filename string) (io.WriteCloser, error) { + backupHandler := NewBackupEventHandler( + func(filename string) (io.WriteCloser, error) { return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644) }, - } + ) t.b.cfg.SynchronousEventHandler = backupHandler } else { // Ensure SynchronousEventHandler is nil for asynchronous mode