Skip to content

Commit

Permalink
feat(follower-db): allows watching for changes in the follower db (#53)
Browse files Browse the repository at this point in the history
* feat(follower-db): allows watching for changes in the follower db

* x

* test: fixed more tests

* x

* x
  • Loading branch information
chenquan authored Jun 6, 2024
1 parent ccb1886 commit 4389ad5
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 98 deletions.
94 changes: 56 additions & 38 deletions multiple.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package sqlx
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/core/trace"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -40,24 +43,24 @@ var (
var _ sqlx.SqlConn = (*multipleSqlConn)(nil)

type (
DBConf struct {
Leader string
Followers []string `json:",optional"`
FollowerDB struct {
Name string
Datasource string
Added bool
}

SqlOption func(*sqlOptions)

sqlOptions struct {
accept func(error) bool
DBConf struct {
Leader string
Followers []string `json:",optional"`
BackToOrigin bool `json:",optional"`
}

multipleSqlConn struct {
leader sqlx.SqlConn
enableFollower bool
p2cPicker picker // picker
followers []sqlx.SqlConn
conf DBConf
accept func(error) bool
driveName string
leader sqlx.SqlConn
p2cPicker picker // picker
conf DBConf
driveName string
sqlOptions *sqlOptions
}
)

Expand All @@ -69,21 +72,35 @@ func NewMultipleSqlConn(driverName string, conf DBConf, opts ...SqlOption) sqlx.
}

leader := sqlx.NewSqlConn(driverName, conf.Leader, sqlx.WithAcceptable(sqlOpt.accept))
followers := make([]sqlx.SqlConn, 0, len(conf.Followers))
for _, datasource := range conf.Followers {
followers = append(followers, sqlx.NewSqlConn(driverName, datasource, sqlx.WithAcceptable(sqlOpt.accept)))
}

conn := &multipleSqlConn{
leader: leader,
enableFollower: len(followers) != 0,
followers: followers,
conf: conf,
driveName: driverName,
accept: sqlOpt.accept,
leader: leader,
conf: conf,
driveName: driverName,
sqlOptions: &sqlOpt,
}

p2cPickerObj := newP2cPicker(driverName, sqlOpt.accept)
for i, datasource := range conf.Followers {
p2cPickerObj.add(strconv.Itoa(i), datasource)
}
go func() {
if sqlOpt.watcher == nil {
return
}

for follow := range sqlOpt.watcher {
logx.Infow("watcher", logx.Field("follow", follow))

if follow.Added {
p2cPickerObj.add(follow.Name, follow.Datasource)
} else {
p2cPickerObj.del(follow.Datasource)
}
}
}()

conn.p2cPicker = newP2cPicker(followers, conn.accept)
conn.p2cPicker = p2cPickerObj

return conn
}
Expand Down Expand Up @@ -178,10 +195,6 @@ func (m *multipleSqlConn) getQueryDB(ctx context.Context, query string) queryDB
return queryDB{conn: m.leader}
}

if !m.enableFollower {
return queryDB{conn: m.leader}
}

if !m.containSelect(query) {
return queryDB{conn: m.leader}
}
Expand All @@ -196,6 +209,10 @@ func (m *multipleSqlConn) getQueryDB(ctx context.Context, query string) queryDB
}
}

if !m.conf.BackToOrigin {
return queryDB{error: err}
}

return queryDB{conn: m.leader}
}

Expand All @@ -212,10 +229,10 @@ func (m *multipleSqlConn) startSpanWithLeader(ctx context.Context) (context.Cont
return ctx, span
}

func (m *multipleSqlConn) startSpanWithFollower(ctx context.Context, db int) (context.Context, oteltrace.Span) {
func (m *multipleSqlConn) startSpanWithFollower(ctx context.Context, dbName string) (context.Context, oteltrace.Span) {
ctx, span := m.startSpan(ctx)
span.SetAttributes(followerTypeAttributeKey)
span.SetAttributes(followerDBSqlAttributeKey.Int(db))
span.SetAttributes(followerDBSqlAttributeKey.String(dbName))
return ctx, span
}

Expand All @@ -239,13 +256,14 @@ type queryDB struct {
error error
done func(err error)
follower bool
followerDB int
followerDB string
}

func (q *queryDB) query(ctx context.Context, query func(ctx context.Context, conn sqlx.SqlConn) error) (err error) {
if q.error != nil {
return q.error
}

defer func() {
if q.done != nil {
q.done(err)
Expand All @@ -255,12 +273,6 @@ func (q *queryDB) query(ctx context.Context, query func(ctx context.Context, con
return query(ctx, q.conn)
}

func WithAccept(accept func(err error) bool) SqlOption {
return func(opts *sqlOptions) {
opts.accept = accept
}
}

type forceLeaderKey struct{}

func ForceLeaderContext(ctx context.Context) context.Context {
Expand All @@ -272,3 +284,9 @@ func forceLeaderFromContext(ctx context.Context) bool {
_, ok := value.(struct{})
return ok
}

// ---------------

func (f FollowerDB) String() string {
return fmt.Sprintf("FollowerDB{Name: %s, Datasource: %s, Added: %t}", f.Name, f.Datasource, f.Added)
}
69 changes: 49 additions & 20 deletions multiple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ package sqlx

import (
"context"
"database/sql/driver"
"fmt"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)

const mockedDatasource = "sqlmock"
Expand All @@ -44,28 +43,24 @@ func TestNewMultipleSqlConn(t *testing.T) {
Leader: leader,
Followers: []string{follower1},
})

follower1Mock.ExpectExec("any")
follower1Mock.ExpectQuery("any").WillReturnRows(sqlmock.NewRows([]string{"foo"}))
rows := sqlmock.NewRows([]string{"name"}).AddRow("John Doe")
follower1Mock.ExpectQuery("SELECT name FROM users ").WithoutArgs().WillReturnRows(rows)

var val string
assert.NotNil(t, mysql.QueryRow(&val, "any"))
assert.NotNil(t, mysql.QueryRow(&val, "any"))
assert.NotNil(t, mysql.QueryRowPartial(&val, "any"))
assert.NotNil(t, mysql.QueryRows(&val, "any"))
assert.NotNil(t, mysql.QueryRowsPartial(&val, "any"))
_, err = mysql.Prepare("any")
assert.NotNil(t, err)
assert.NotNil(t, mysql.Transact(func(session sqlx.Session) error {
return nil
}))

leaderMock.ExpectExec("any").WillReturnResult(driver.RowsAffected(1))
r, err := mysql.Exec("any")
assert.NoError(t, mysql.QueryRow(&val, "SELECT name FROM users "))
fmt.Println(val)

leaderMock.ExpectQuery("SELECT addr FROM users").WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"foo"}).AddRow("bar"))
assert.NoError(t, mysql.QueryRowCtx(ForceLeaderContext(context.Background()), &val, "SELECT addr FROM users"))

leaderMock.ExpectExec("INSERT INTO users").
WithArgs("john").WillReturnResult(sqlmock.NewResult(1, 1))

result, err := mysql.Exec(`INSERT INTO users(name) VALUES (?)`, "john")
assert.NoError(t, err)
rowsAffected, err := r.RowsAffected()
insertId, err := result.LastInsertId()
assert.NoError(t, err)
assert.Equal(t, int64(1), rowsAffected)
assert.EqualValues(t, 1, insertId)
}

func TestForceLeaderContext(t *testing.T) {
Expand All @@ -74,3 +69,37 @@ func TestForceLeaderContext(t *testing.T) {

assert.False(t, forceLeaderFromContext(context.Background()))
}

func TestWatch(t *testing.T) {
leader := "leader1"
follower1 := "follower1_1"
_, follower1Mock, err := sqlmock.NewWithDSN(follower1, sqlmock.MonitorPingsOption(true))
assert.NoError(t, err)
_, leaderMock, err := sqlmock.NewWithDSN(leader, sqlmock.MonitorPingsOption(true))
assert.NoError(t, err)

follower1Mock.ExpectPing().WillDelayFor(time.Millisecond)
leaderMock.ExpectPing().WillDelayFor(time.Millisecond)

dbsChan := make(chan FollowerDB, 1)
defer close(dbsChan)

mysql := NewMultipleSqlConn(mockedDatasource, DBConf{
Leader: leader,
Followers: []string{follower1},
}, WithWatchFollowerDB(dbsChan))

var val string
follower1Mock.ExpectQuery("SELECT name FROM users ").
WithoutArgs().
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("John Doe"))
assert.NoError(t, mysql.QueryRow(&val, "SELECT name FROM users "))
dbsChan <- FollowerDB{
Name: "0",
Datasource: "",
Added: false,
}
time.Sleep(time.Second)
assert.Error(t, mysql.QueryRow(&val, "SELECT name FROM users "))

}
38 changes: 38 additions & 0 deletions opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 chenquan
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sqlx

type (
SqlOption func(*sqlOptions)

sqlOptions struct {
accept func(error) bool
watcher <-chan FollowerDB
}
)

func WithAccept(accept func(err error) bool) SqlOption {
return func(opts *sqlOptions) {
opts.accept = accept
}
}

func WithWatchFollowerDB(watcher <-chan FollowerDB) SqlOption {
return func(opts *sqlOptions) {
opts.watcher = watcher
}
}
Loading

0 comments on commit 4389ad5

Please sign in to comment.