Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ACKs are sent after backup #921

Merged
merged 7 commits into from
Oct 20, 2024
173 changes: 124 additions & 49 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,31 @@ import (
"io"
"os"
"path"
"sync"
"time"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
)

// StartBackup: Like mysqlbinlog remote raw backup
// Backup remote binlog from position (filename, offset) and write in backupDir
// StartBackup starts the backup process for the binary log and writes to the backup directory.
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
err := os.MkdirAll(backupDir, 0755)
if err != nil {
return errors.Trace(err)
}
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
})
if b.cfg.SynchronousEventHandler == nil {
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
})
} else {
return b.StartSynchronousBackup(p, timeout)
}
}

// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
// The process will continue until the timeout is reached or an error occurs.
// This method should not be used together with SynchronousEventHandler.
//
// Parameters:
// - p: The starting position in the binlog from which to begin the backup.
Expand All @@ -37,81 +42,151 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
// a very long timeout here
timeout = 30 * 3600 * 24 * time.Second
}
if b.cfg.SynchronousEventHandler != nil {
return errors.New("StartBackupWithHandler cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackup instead.")
}

// Force use raw mode
b.parser.SetRawMode(true)

// Set up the backup event handler
backupHandler := &BackupEventHandler{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to change StartBackupWithHandler now? Because your requirement does not use StartBackupWithHandler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic of writing on disk is contained in

func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the old code, we still can use handler to create io.WriteCloser and write events to disk.

		e, err := s.GetEvent(ctx)
...
		if n, err := w.Write(e.RawData); err != nil {
			return errors.Trace(err)
		} else if n != len(e.RawData) {
			return errors.Trace(io.ErrShortWrite)
		}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I don't want to duplicate the code in HandleEvent. Do you want me to remove the BackupEventHandler struct and have HandleEvent be static?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate the code in HandleEvent

Sorry I don't know where is the duplication. Can you elaborate?

Do you want me to remove the BackupEventHandler struct and have HandleEvent be static?

No I don't mean that. I mean reverting StartBackupWithHandler to old code. Your StartSynchronousBackup is fine. And BackupEventHandler seems no need to be public (exported)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartBackupWithHandler implemented the code in HandleEvent which is also used by the SynchronousEventHandler in replication/binlogsyncer.go. As a helpful shorthand to prevent the need for clients to reimplement HandleEvent in their code, we can leave it extracted as its own method.

For example, if we reverted StartBackupWithHandler we'd either have that code (if e.Header.EventType == ROTATE_EVENT...) duplicated in HandleEvent or, if we removed that as well (as you suggested) we'd have it duplicated in the test code. Does that make sense?

If you have a different idea of how this should all change, can you upload an example diff?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see the duplication is if e.Header.EventType == ROTATE_EVENT, thanks. I think it's reasonable.

And here's my suggestions, please check them.

git.diff.txt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make newBackupEventHandler private, clients won't be able to do what backup_test.go (an example client) is doing

Copy link
Collaborator

@lance6716 lance6716 Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if they are the same behaviour as BackupEventHandler, they can directly use SynchronousEventHandler. if they have more customised behaviour, current BackupEventHandler is not extendable to other behaviour I think (no place to inject new logic?). so they still need to write many lines of code.

maybe you think they can wrap a structure on BackupEventHandler, dispatch event by event type to BackupEventHandler or the parent structure? I can't think of a real use case for now.

oh you are right 👍

handler: handler,
}

s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
}

var filename string
var offset uint32

var w io.WriteCloser
defer func() {
var closeErr error
if w != nil {
closeErr = w.Close()
}
if retErr == nil {
retErr = closeErr
if backupHandler.w != nil {
closeErr := backupHandler.w.Close()
if retErr == nil {
retErr = closeErr
}
}
}()

for {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
e, err := s.GetEvent(ctx)
cancel()
defer cancel()

if err == context.DeadlineExceeded {
select {
case <-ctx.Done():
return nil
}

if err != nil {
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
case e := <-s.ch:
err = backupHandler.HandleEvent(e)
if err != nil {
return errors.Trace(err)
}
}
}
}

offset = e.Header.LogPos
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error {
if b.cfg.SynchronousEventHandler == nil {
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
}

if e.Header.EventType == ROTATE_EVENT {
rotateEvent := e.Event.(*RotateEvent)
filename = string(rotateEvent.NextLogName)
s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
}

if e.Header.Timestamp == 0 || offset == 0 {
// fake rotate event
continue
}
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new

if w != nil {
if err = w.Close(); err != nil {
w = nil
return errors.Trace(err)
}
}
var ctx context.Context
var cancel context.CancelFunc

if len(filename) == 0 {
return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
}
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
} else {
ctx = context.Background()
}

w, err = handler(filename)
if err != nil {
return errors.Trace(err)
}
select {
case <-ctx.Done():
// The timeout has been reached
return nil
case <-b.ctx.Done():
// The BinlogSyncer has been closed
return nil
case err := <-s.ech:
// An error occurred during streaming
return errors.Trace(err)
}
}

// BackupEventHandler handles writing events for backup
type BackupEventHandler struct {
handler func(binlogFilename string) (io.WriteCloser, error)
w io.WriteCloser
mutex sync.Mutex

filename string
}

func NewBackupEventHandler(handlerFunction func(filename string) (io.WriteCloser, error)) *BackupEventHandler {
return &BackupEventHandler{
handler: handlerFunction,
}
}

// HandleEvent processes a single event for the backup.
func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
h.mutex.Lock()
defer h.mutex.Unlock()

var err error
offset := e.Header.LogPos

// write binlog header fe'bin'
if _, err = w.Write(BinLogFileHeader); err != nil {
if e.Header.EventType == ROTATE_EVENT {
rotateEvent := e.Event.(*RotateEvent)
h.filename = string(rotateEvent.NextLogName)
if e.Header.Timestamp == 0 || offset == 0 {
// fake rotate event
return nil
}
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
if h.w != nil {
if err = h.w.Close(); err != nil {
h.w = nil
return errors.Trace(err)
}
}

if n, err := w.Write(e.RawData); err != nil {
if len(h.filename) == 0 {
return errors.Errorf("empty binlog filename for FormatDescriptionEvent")
}

h.w, err = h.handler(h.filename)
if err != nil {
return errors.Trace(err)
} else if n != len(e.RawData) {
}

// Write binlog header 0xfebin
_, err = h.w.Write(BinLogFileHeader)
if err != nil {
return errors.Trace(err)
}
}

if h.w != nil {
n, err := h.w.Write(e.RawData)
if err != nil {
return errors.Trace(err)
}
if n != len(e.RawData) {
return errors.Trace(io.ErrShortWrite)
}
} else {
return errors.New("writer is not initialized")
}

return nil
}
74 changes: 72 additions & 2 deletions replication/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package replication

import (
"context"
"io"
"os"
"path"
"time"

"github.com/stretchr/testify/require"

"github.com/go-mysql-org/go-mysql/mysql"
)

// TestStartBackupEndInGivenTime tests the backup process completes within a given time.
func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
t.setupTest(mysql.MySQLFlavor)

Expand All @@ -30,12 +33,12 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
os.RemoveAll(binlogDir)
timeout := 2 * time.Second

done := make(chan bool)
done := make(chan struct{})

go func() {
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
require.NoError(t.T(), err)
done <- true
close(done)
}()
failTimeout := 5 * timeout
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
Expand All @@ -47,3 +50,70 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
t.T().Fatal("time out error")
}
}

// TestAsyncBackup runs the backup process in asynchronous mode and verifies binlog file creation.
func (t *testSyncerSuite) TestAsyncBackup() {
testBackup(t, false) // false indicates asynchronous mode
}

// TestSyncBackup runs the backup process in synchronous mode and verifies binlog file creation.
func (t *testSyncerSuite) TestSyncBackup() {
testBackup(t, true) // true indicates synchronous mode
}

// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly.
func testBackup(t *testSyncerSuite, isSynchronous bool) {
t.setupTest(mysql.MySQLFlavor)
t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled

binlogDir := "./var"
os.RemoveAll(binlogDir)
timeout := 3 * time.Second

if isSynchronous {
// Set up a BackupEventHandler for synchronous mode
backupHandler := NewBackupEventHandler(
func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
},
)
t.b.cfg.SynchronousEventHandler = backupHandler
} else {
// Ensure SynchronousEventHandler is nil for asynchronous mode
t.b.cfg.SynchronousEventHandler = nil
}

done := make(chan bool)
dt8269 marked this conversation as resolved.
Show resolved Hide resolved

// Start the backup process in a goroutine
go func() {
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
require.NoError(t.T(), err)
done <- true
}()

failTimeout := 2 * timeout
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
defer cancel()

// Wait for the backup to complete or timeout
select {
case <-done:
// Check if binlog files are written to the specified directory
files, err := os.ReadDir(binlogDir)
require.NoError(t.T(), err, "Failed to read binlog directory")
require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory")
mode := modeLabel(isSynchronous)
t.T().Logf("Backup completed successfully in %s mode with %d binlog file(s).", mode, len(files))
case <-ctx.Done():
mode := modeLabel(isSynchronous)
t.T().Fatalf("Timeout error during backup in %s mode.", mode)
}
}

func modeLabel(isSynchronous bool) string {
if isSynchronous {
return "synchronous"
}
return "asynchronous"
}
Loading