diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 22a961edcae..864ec7b10b0 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -24,6 +24,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6947](https://github.com/apache/incubator-seata/pull/6947)] fix npe for nacos registry when look up address - [[#6984](https://github.com/apache/incubator-seata/pull/6984)] support building docker image on openjdk23 - [[#6994](https://github.com/apache/incubator-seata/pull/6994)] fix the problem of building undoLog exception when update join does not update data +- [[#7005](https://github.com/apache/incubator-seata/pull/7005)] fix the Raft NPE issue caused by two-phase concurrency ### optimize: - [[#6826](https://github.com/apache/incubator-seata/pull/6826)] remove the branch registration operation of the XA read-only transaction diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 89bfb6438f6..42b990699f3 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -24,6 +24,7 @@ - [[#6947](https://github.com/apache/incubator-seata/pull/6947)] 修复nacos注册中心查询可用地址时的空指针问题 - [[#6984](https://github.com/apache/incubator-seata/pull/6984)] 修复 openjdk23 版本下无法构建 docker 镜像的问题 - [[#6994](https://github.com/apache/incubator-seata/pull/6994)] 修复updateJoin语句未更新到数据时prepareUndoLog异常 +- [[#7005](https://github.com/apache/incubator-seata/pull/7005)] 修复Raft模式下两阶段并发可能导致NPE的问题 ### optimize: diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java index 0bf2f122215..9d3086ad7ae 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/AddBranchSessionExecute.java @@ -35,7 +35,16 @@ public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg; RaftSessionManager raftSessionManager = (RaftSessionManager) SessionHolder.getRootSessionManager(sessionSyncMsg.getGroup()); BranchTransactionDTO branchTransactionDTO = sessionSyncMsg.getBranchSession(); - GlobalSession globalSession = raftSessionManager.findGlobalSession(branchTransactionDTO.getXid()); + String xid = branchTransactionDTO.getXid(); + GlobalSession globalSession = raftSessionManager.findGlobalSession(xid); + if (globalSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + xid, syncMsg.getMsgType()); + } + return false; + } BranchSession branchSession = SessionConverter.convertBranchSession(branchTransactionDTO); branchSession.lock(); globalSession.add(branchSession); diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java index c98f3820476..e096ea36cdc 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/branch/UpdateBranchSessionExecute.java @@ -33,8 +33,26 @@ public class UpdateBranchSessionExecute extends AbstractRaftMsgExecute { public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg; RaftSessionManager raftSessionManager = (RaftSessionManager) SessionHolder.getRootSessionManager(sessionSyncMsg.getGroup()); - GlobalSession globalSession = raftSessionManager.findGlobalSession(sessionSyncMsg.getBranchSession().getXid()); - BranchSession branchSession = globalSession.getBranch(sessionSyncMsg.getBranchSession().getBranchId()); + String xid = sessionSyncMsg.getBranchSession().getXid(); + GlobalSession globalSession = raftSessionManager.findGlobalSession(xid); + if (globalSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + xid, syncMsg.getMsgType()); + } + return false; + } + long branchId = sessionSyncMsg.getBranchSession().getBranchId(); + BranchSession branchSession = globalSession.getBranch(branchId); + if (branchSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The branch session corresponding to the branchId: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + sessionSyncMsg.getBranchSession().getBranchId(), syncMsg.getMsgType()); + } + return false; + } BranchStatus status = BranchStatus.get(sessionSyncMsg.getBranchSession().getStatus()); branchSession.setStatus(status); if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java index 805ef1b7f7b..206fe77e3cd 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/execute/lock/BranchReleaseLockExecute.java @@ -30,8 +30,16 @@ public class BranchReleaseLockExecute extends AbstractRaftMsgExecute { @Override public Boolean execute(RaftBaseMsg syncMsg) throws Throwable { RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg; - GlobalSession globalSession = - SessionHolder.getRootSessionManager().findGlobalSession(sessionSyncMsg.getBranchSession().getXid()); + String xid = sessionSyncMsg.getBranchSession().getXid(); + GlobalSession globalSession = SessionHolder.getRootSessionManager().findGlobalSession(xid); + if (globalSession == null) { + if (logger.isWarnEnabled()) { + logger.warn( + "The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}", + xid, syncMsg.getMsgType()); + } + return false; + } BranchSession branchSession = globalSession.getBranch(sessionSyncMsg.getBranchSession().getBranchId()); if (branchSession != null) { if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java b/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java index 654af466245..f6fa7f074aa 100644 --- a/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java +++ b/server/src/main/java/org/apache/seata/server/session/SessionStatusValidator.java @@ -54,6 +54,11 @@ public static boolean isRollbackGlobalStatus(GlobalStatus status) { || status == GlobalStatus.RollbackRetryTimeout; } + public static boolean isEndGlobalStatus(GlobalStatus status) { + return status == GlobalStatus.Rollbacked || status == GlobalStatus.TimeoutRollbacked + || status == GlobalStatus.Committed || status == GlobalStatus.Finished; + } + /** * is commit global status *