Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: fix the Raft NPE issue caused by two-phase concurrency #7005

Merged
merged 9 commits into from
Nov 18, 2024
Merged
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6947](https://github.com/apache/incubator-seata/pull/6947)] fix npe for nacos registry when look up address
- [[#6984](https://github.com/apache/incubator-seata/pull/6984)] support building docker image on openjdk23
- [[#6994](https://github.com/apache/incubator-seata/pull/6994)] fix the problem of building undoLog exception when update join does not update data
- [[#7005](https://github.com/apache/incubator-seata/pull/7005)] fix the Raft NPE issue caused by two-phase concurrency

### optimize:
- [[#6826](https://github.com/apache/incubator-seata/pull/6826)] remove the branch registration operation of the XA read-only transaction
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- [[#6947](https://github.com/apache/incubator-seata/pull/6947)] 修复nacos注册中心查询可用地址时的空指针问题
- [[#6984](https://github.com/apache/incubator-seata/pull/6984)] 修复 openjdk23 版本下无法构建 docker 镜像的问题
- [[#6994](https://github.com/apache/incubator-seata/pull/6994)] 修复updateJoin语句未更新到数据时prepareUndoLog异常
- [[#7005](https://github.com/apache/incubator-seata/pull/7005)] 修复Raft模式下两阶段并发可能导致NPE的问题


### optimize:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ public Boolean execute(RaftBaseMsg syncMsg) throws Throwable {
RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg;
RaftSessionManager raftSessionManager = (RaftSessionManager) SessionHolder.getRootSessionManager(sessionSyncMsg.getGroup());
BranchTransactionDTO branchTransactionDTO = sessionSyncMsg.getBranchSession();
GlobalSession globalSession = raftSessionManager.findGlobalSession(branchTransactionDTO.getXid());
String xid = branchTransactionDTO.getXid();
GlobalSession globalSession = raftSessionManager.findGlobalSession(xid);
if (globalSession == null) {
if (logger.isWarnEnabled()) {
logger.warn(
"The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}",
xid, syncMsg.getMsgType());
}
return false;
}
BranchSession branchSession = SessionConverter.convertBranchSession(branchTransactionDTO);
branchSession.lock();
globalSession.add(branchSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,26 @@ public class UpdateBranchSessionExecute extends AbstractRaftMsgExecute {
public Boolean execute(RaftBaseMsg syncMsg) throws Throwable {
RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg;
RaftSessionManager raftSessionManager = (RaftSessionManager) SessionHolder.getRootSessionManager(sessionSyncMsg.getGroup());
GlobalSession globalSession = raftSessionManager.findGlobalSession(sessionSyncMsg.getBranchSession().getXid());
BranchSession branchSession = globalSession.getBranch(sessionSyncMsg.getBranchSession().getBranchId());
String xid = sessionSyncMsg.getBranchSession().getXid();
GlobalSession globalSession = raftSessionManager.findGlobalSession(xid);
if (globalSession == null) {
if (logger.isWarnEnabled()) {
logger.warn(
"The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}",
xid, syncMsg.getMsgType());
}
return false;
}
long branchId = sessionSyncMsg.getBranchSession().getBranchId();
BranchSession branchSession = globalSession.getBranch(branchId);
if (branchSession == null) {
if (logger.isWarnEnabled()) {
logger.warn(
"The branch session corresponding to the branchId: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}",
sessionSyncMsg.getBranchSession().getBranchId(), syncMsg.getMsgType());
}
return false;
}
BranchStatus status = BranchStatus.get(sessionSyncMsg.getBranchSession().getStatus());
branchSession.setStatus(status);
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ public class BranchReleaseLockExecute extends AbstractRaftMsgExecute {
@Override
public Boolean execute(RaftBaseMsg syncMsg) throws Throwable {
RaftBranchSessionSyncMsg sessionSyncMsg = (RaftBranchSessionSyncMsg)syncMsg;
GlobalSession globalSession =
SessionHolder.getRootSessionManager().findGlobalSession(sessionSyncMsg.getBranchSession().getXid());
String xid = sessionSyncMsg.getBranchSession().getXid();
GlobalSession globalSession = SessionHolder.getRootSessionManager().findGlobalSession(xid);
if (globalSession == null) {
if (logger.isWarnEnabled()) {
logger.warn(
"The transaction corresponding to the XID: {} does not exist, which may cause a two-phase concurrency issue, msg type: {}",
xid, syncMsg.getMsgType());
}
return false;
}
BranchSession branchSession = globalSession.getBranch(sessionSyncMsg.getBranchSession().getBranchId());
if (branchSession != null) {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public static boolean isRollbackGlobalStatus(GlobalStatus status) {
|| status == GlobalStatus.RollbackRetryTimeout;
}

public static boolean isEndGlobalStatus(GlobalStatus status) {
return status == GlobalStatus.Rollbacked || status == GlobalStatus.TimeoutRollbacked
|| status == GlobalStatus.Committed || status == GlobalStatus.Finished;
}

/**
* is commit global status
*
Expand Down
Loading