From 44fae3f5b292e69bbeaa340c4d102499cea5fbe8 Mon Sep 17 00:00:00 2001 From: wcy00000000000000 <2269766985@qq.com> Date: Fri, 20 Dec 2024 10:26:50 +0800 Subject: [PATCH] chore: distinguish txn by db id --story=120227834 --- src/storage/dal/mongo/local/txn_manager.go | 50 ++++++++-------------- src/storage/dal/mongo/sharding/mongo.go | 14 +++--- 2 files changed, 25 insertions(+), 39 deletions(-) diff --git a/src/storage/dal/mongo/local/txn_manager.go b/src/storage/dal/mongo/local/txn_manager.go index 0d6be646a3..4db5db42b6 100644 --- a/src/storage/dal/mongo/local/txn_manager.go +++ b/src/storage/dal/mongo/local/txn_manager.go @@ -38,22 +38,12 @@ const ( type sessionKey string -func (s sessionKey) genKey(tenant string) string { - // TODO this is only for compatible with old version, remove this when all db clients use ShardingDB - return transactionNumberRedisKeyNamespace + string(s) - if tenant == "" { - return transactionNumberRedisKeyNamespace + string(s) - } - return fmt.Sprintf("%s%s:%s", transactionNumberRedisKeyNamespace, tenant, string(s)) +func (s sessionKey) genKey(dbID string) string { + return fmt.Sprintf("%s%s:%s", transactionNumberRedisKeyNamespace, dbID, string(s)) } -func (s sessionKey) genErrKey(tenant string) string { - // TODO this is only for compatible with old version, remove this when all db clients use ShardingDB - return transactionErrorRedisKeyNamespace + string(s) - if tenant == "" { - return transactionErrorRedisKeyNamespace + string(s) - } - return fmt.Sprintf("%s%s:%s", transactionErrorRedisKeyNamespace, tenant, string(s)) +func (s sessionKey) genErrKey(dbID string) string { + return fmt.Sprintf("%s%s:%s", transactionErrorRedisKeyNamespace, dbID, string(s)) } // TxnErrorType the error type of the transaction, some error type needs to do special operations like retry @@ -77,27 +67,23 @@ func (t *ShardingTxnManager) InitTxnManager(r redis.Client) error { return nil } -// Tenant returns the transaction manager for tenant -func (t *ShardingTxnManager) Tenant(ignoreTenant bool, tenant string) (*TxnManager, error) { - if ignoreTenant { - return &TxnManager{cache: t.cache}, nil - } - - if tenant == "" { - return nil, errors.New("tenant is not set") +// DB returns the transaction manager for db +func (t *ShardingTxnManager) DB(dbID string) (*TxnManager, error) { + if dbID == "" { + return nil, errors.New("db id is not set") } return &TxnManager{ - tenant: tenant, - cache: t.cache, + dbID: dbID, + cache: t.cache, }, nil } // TxnManager is the transaction manager type TxnManager struct { - // tenant is the tenant id - tenant string - cache redis.Client + // dbID is the unique identifier of a db + dbID string + cache redis.Client } // InitTxnManager is to init txn manager, set the redis storage @@ -109,7 +95,7 @@ func (t *TxnManager) InitTxnManager(r redis.Client) error { // GetTxnNumber TODO func (t *TxnManager) GetTxnNumber(sessionID string) (int64, error) { - key := sessionKey(sessionID).genKey(t.tenant) + key := sessionKey(sessionID).genKey(t.dbID) v, err := t.cache.Get(context.Background(), key).Result() if err != nil { return 0, err @@ -121,7 +107,7 @@ func (t *TxnManager) GetTxnNumber(sessionID string) (int64, error) { func (t *TxnManager) GenTxnNumber(sessionID string, ttl time.Duration) (int64, error) { // return txnNumber with 1 directly, when our mongodb client option's RetryWrite // is set to false. - key := sessionKey(sessionID).genKey(t.tenant) + key := sessionKey(sessionID).genKey(t.dbID) pip := t.cache.Pipeline() defer pip.Close() @@ -144,7 +130,7 @@ func (t *TxnManager) GenTxnNumber(sessionID string, ttl time.Duration) (int64, e // RemoveSessionKey remove transaction session key func (t *TxnManager) RemoveSessionKey(sessionID string) error { - key := sessionKey(sessionID).genKey(t.tenant) + key := sessionKey(sessionID).genKey(t.dbID) return t.cache.Del(context.Background(), key).Err() } @@ -306,7 +292,7 @@ func (t *TxnManager) AutoRunWithTxn(ctx context.Context, cli *mongo.Client, cmd func (t *TxnManager) setTxnError(sessionID sessionKey, txnErr error) { switch { case strings.Contains(txnErr.Error(), "WriteConflict"): - key := sessionID.genErrKey(t.tenant) + key := sessionID.genErrKey(t.dbID) err := t.cache.SetNX(context.Background(), key, string(WriteConflictType), time.Minute*5).Err() if err != nil { blog.Errorf("set txn error(%v) failed, err: %v, session id: %s", txnErr, err, sessionID) @@ -317,7 +303,7 @@ func (t *TxnManager) setTxnError(sessionID sessionKey, txnErr error) { // GetTxnError get mongo raw error type in redis, the error may be used in scene server to retry this transaction func (t *TxnManager) GetTxnError(sessionID sessionKey) TxnErrorType { - key := sessionID.genErrKey(t.tenant) + key := sessionID.genErrKey(t.dbID) errorType, err := t.cache.Get(context.Background(), key).Result() if err != nil && redis.IsNilErr(err) { blog.Errorf("get txn error failed, err: %v, session id: %s", err, sessionID) diff --git a/src/storage/dal/mongo/sharding/mongo.go b/src/storage/dal/mongo/sharding/mongo.go index ad6c286cd1..2c2a820178 100644 --- a/src/storage/dal/mongo/sharding/mongo.go +++ b/src/storage/dal/mongo/sharding/mongo.go @@ -205,7 +205,7 @@ func (m *ShardingMongoManager) Tenant(tenant string) local.DB { return local.NewErrDB(fmt.Errorf("db client %s is disabled", client.UUID())) } - txnManager, err := m.tm.Tenant(false, tenant) + txnManager, err := m.tm.DB(client.UUID()) if err != nil { return local.NewErrDB(err) } @@ -219,7 +219,7 @@ func (m *ShardingMongoManager) Tenant(tenant string) local.DB { // IgnoreTenant returns the master db client that do not use tenant func (m *ShardingMongoManager) IgnoreTenant() local.DB { - txnManager, err := m.tm.Tenant(true, "") + txnManager, err := m.tm.DB(m.masterCli.UUID()) if err != nil { return local.NewErrDB(err) } @@ -249,12 +249,12 @@ func (m *ShardingMongoManager) Ping() error { // ExecForAllDB execute handler for all db clients func (m *ShardingMongoManager) ExecForAllDB(handler func(db local.DB) error) error { - txnManager, err := m.tm.Tenant(true, "") - if err != nil { - return fmt.Errorf("get txn manager failed, err: %v", err) - } - for uuid, client := range m.dbClientMap { + txnManager, err := m.tm.DB(client.UUID()) + if err != nil { + return fmt.Errorf("get txn manager failed, err: %v", err) + } + db, err := local.NewMongo(client, txnManager, m.conf, &local.MongoOptions{IgnoreTenant: true}) if err != nil { return fmt.Errorf("generate %s db client failed, err: %v", uuid, err)