Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

事务调整为按db ID区分 #8303

Open
wants to merge 1 commit into
base: feature-tenant
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 18 additions & 32 deletions src/storage/dal/mongo/local/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,12 @@ const (

type sessionKey string

func (s sessionKey) genKey(tenant string) string {
// TODO this is only for compatible with old version, remove this when all db clients use ShardingDB
return transactionNumberRedisKeyNamespace + string(s)
if tenant == "" {
return transactionNumberRedisKeyNamespace + string(s)
}
return fmt.Sprintf("%s%s:%s", transactionNumberRedisKeyNamespace, tenant, string(s))
func (s sessionKey) genKey(dbID string) string {
return fmt.Sprintf("%s%s:%s", transactionNumberRedisKeyNamespace, dbID, string(s))
}

func (s sessionKey) genErrKey(tenant string) string {
// TODO this is only for compatible with old version, remove this when all db clients use ShardingDB
return transactionErrorRedisKeyNamespace + string(s)
if tenant == "" {
return transactionErrorRedisKeyNamespace + string(s)
}
return fmt.Sprintf("%s%s:%s", transactionErrorRedisKeyNamespace, tenant, string(s))
func (s sessionKey) genErrKey(dbID string) string {
return fmt.Sprintf("%s%s:%s", transactionErrorRedisKeyNamespace, dbID, string(s))
}

// TxnErrorType the error type of the transaction, some error type needs to do special operations like retry
Expand All @@ -77,27 +67,23 @@ func (t *ShardingTxnManager) InitTxnManager(r redis.Client) error {
return nil
}

// Tenant returns the transaction manager for tenant
func (t *ShardingTxnManager) Tenant(ignoreTenant bool, tenant string) (*TxnManager, error) {
if ignoreTenant {
return &TxnManager{cache: t.cache}, nil
}

if tenant == "" {
return nil, errors.New("tenant is not set")
// DB returns the transaction manager for db
func (t *ShardingTxnManager) DB(dbID string) (*TxnManager, error) {
if dbID == "" {
return nil, errors.New("db id is not set")
}

return &TxnManager{
tenant: tenant,
cache: t.cache,
dbID: dbID,
cache: t.cache,
}, nil
}

// TxnManager is the transaction manager
type TxnManager struct {
// tenant is the tenant id
tenant string
cache redis.Client
// dbID is the unique identifier of a db
dbID string
cache redis.Client
}

// InitTxnManager is to init txn manager, set the redis storage
Expand All @@ -109,7 +95,7 @@ func (t *TxnManager) InitTxnManager(r redis.Client) error {

// GetTxnNumber TODO
func (t *TxnManager) GetTxnNumber(sessionID string) (int64, error) {
key := sessionKey(sessionID).genKey(t.tenant)
key := sessionKey(sessionID).genKey(t.dbID)
v, err := t.cache.Get(context.Background(), key).Result()
if err != nil {
return 0, err
Expand All @@ -121,7 +107,7 @@ func (t *TxnManager) GetTxnNumber(sessionID string) (int64, error) {
func (t *TxnManager) GenTxnNumber(sessionID string, ttl time.Duration) (int64, error) {
// return txnNumber with 1 directly, when our mongodb client option's RetryWrite
// is set to false.
key := sessionKey(sessionID).genKey(t.tenant)
key := sessionKey(sessionID).genKey(t.dbID)

pip := t.cache.Pipeline()
defer pip.Close()
Expand All @@ -144,7 +130,7 @@ func (t *TxnManager) GenTxnNumber(sessionID string, ttl time.Duration) (int64, e

// RemoveSessionKey remove transaction session key
func (t *TxnManager) RemoveSessionKey(sessionID string) error {
key := sessionKey(sessionID).genKey(t.tenant)
key := sessionKey(sessionID).genKey(t.dbID)
return t.cache.Del(context.Background(), key).Err()
}

Expand Down Expand Up @@ -306,7 +292,7 @@ func (t *TxnManager) AutoRunWithTxn(ctx context.Context, cli *mongo.Client, cmd
func (t *TxnManager) setTxnError(sessionID sessionKey, txnErr error) {
switch {
case strings.Contains(txnErr.Error(), "WriteConflict"):
key := sessionID.genErrKey(t.tenant)
key := sessionID.genErrKey(t.dbID)
err := t.cache.SetNX(context.Background(), key, string(WriteConflictType), time.Minute*5).Err()
if err != nil {
blog.Errorf("set txn error(%v) failed, err: %v, session id: %s", txnErr, err, sessionID)
Expand All @@ -317,7 +303,7 @@ func (t *TxnManager) setTxnError(sessionID sessionKey, txnErr error) {

// GetTxnError get mongo raw error type in redis, the error may be used in scene server to retry this transaction
func (t *TxnManager) GetTxnError(sessionID sessionKey) TxnErrorType {
key := sessionID.genErrKey(t.tenant)
key := sessionID.genErrKey(t.dbID)
errorType, err := t.cache.Get(context.Background(), key).Result()
if err != nil && redis.IsNilErr(err) {
blog.Errorf("get txn error failed, err: %v, session id: %s", err, sessionID)
Expand Down
14 changes: 7 additions & 7 deletions src/storage/dal/mongo/sharding/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (m *ShardingMongoManager) Tenant(tenant string) local.DB {
return local.NewErrDB(fmt.Errorf("db client %s is disabled", client.UUID()))
}

txnManager, err := m.tm.Tenant(false, tenant)
txnManager, err := m.tm.DB(client.UUID())
if err != nil {
return local.NewErrDB(err)
}
Expand All @@ -219,7 +219,7 @@ func (m *ShardingMongoManager) Tenant(tenant string) local.DB {

// IgnoreTenant returns the master db client that do not use tenant
func (m *ShardingMongoManager) IgnoreTenant() local.DB {
txnManager, err := m.tm.Tenant(true, "")
txnManager, err := m.tm.DB(m.masterCli.UUID())
if err != nil {
return local.NewErrDB(err)
}
Expand Down Expand Up @@ -249,12 +249,12 @@ func (m *ShardingMongoManager) Ping() error {

// ExecForAllDB execute handler for all db clients
func (m *ShardingMongoManager) ExecForAllDB(handler func(db local.DB) error) error {
txnManager, err := m.tm.Tenant(true, "")
if err != nil {
return fmt.Errorf("get txn manager failed, err: %v", err)
}

for uuid, client := range m.dbClientMap {
txnManager, err := m.tm.DB(client.UUID())
if err != nil {
return fmt.Errorf("get txn manager failed, err: %v", err)
}

db, err := local.NewMongo(client, txnManager, m.conf, &local.MongoOptions{IgnoreTenant: true})
if err != nil {
return fmt.Errorf("generate %s db client failed, err: %v", uuid, err)
Expand Down