diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 81d16b2861ca..fc5d4b4db0c5 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -723,6 +723,7 @@ enum SplitWALState{ ACQUIRE_SPLIT_WAL_WORKER = 1; DISPATCH_WAL_TO_WORKER = 2; RELEASE_SPLIT_WORKER = 3; + RETRY_ON_DIFFERENT_WORKER = 4; } message ClaimReplicationQueuesStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index 18dfc7d493bf..9dc5ce70d69d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -184,4 +184,30 @@ public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler sc public void addUsedSplitWALWorker(ServerName worker) { splitWorkerAssigner.addUsedWorker(worker); } + + /** + * Rename the WAL file at the specified walPath to retry with another worker. Returns true if the + * file is successfully renamed, or if it has already been renamed in previous try. Returns false + * if neither of the files exists. It throws an IOException if got any error while renaming. This + * method is only called in case of failure on one worker so in case of no failure flow is same as + * old one. + */ + public boolean ifExistRenameWALForRetry(String walPath, String postRenameWalPath) + throws IOException { + if (fs.exists(new Path(rootDir, walPath))) { + if (!fs.rename(new Path(rootDir, walPath), new Path(rootDir, postRenameWalPath))) { + throw new IOException("Failed to rename wal " + walPath + " to " + postRenameWalPath); + } + return true; + } else { + if (fs.exists(new Path(rootDir, postRenameWalPath))) { + LOG.info( + "{} was already renamed in last retry to {}, will continue to acquire new worker to split wal", + walPath, postRenameWalPath); + return true; + } else { + return false; + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java index 699834f9c1d7..1f40f51057ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.master.procedure; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.RETRYING_EXT; + import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; @@ -79,30 +81,82 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp try { finished = splitWALManager.isSplitWALFinished(walPath); } catch (IOException ioe) { - if (retryCounter == null) { - retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); - } - long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); - LOG.warn("Failed to check whether splitting wal {} success, wait {} seconds to retry", - walPath, backoff / 1000, ioe); - setTimeout(Math.toIntExact(backoff)); - setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); - skipPersistence(); + LOG.warn("Failed to check whether splitting wal {} success", walPath); + setTimeoutForSuspend(env, ioe.getMessage()); throw new ProcedureSuspendedException(); } splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); if (!finished) { LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); - setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); + setNextState(MasterProcedureProtos.SplitWALState.RETRY_ON_DIFFERENT_WORKER); return Flow.HAS_MORE_STATE; } ServerCrashProcedure.updateProgress(env, getParentProcId()); return Flow.NO_MORE_STATE; + case RETRY_ON_DIFFERENT_WORKER: + // HBASE-28951: In some cases, a RegionServer (RS) becomes unresponsive, which leads the + // master to mistakenly assume the RS has crashed and mark the SplitWALRemoteProcedure as + // failed. As a result, the WAL splitting task is reassigned to another worker. However, if + // the original worker starts responding again, both RegionServers may attempt to process + // the same WAL at the same time, causing both operations to fail. To prevent this conflict, + // we can "fence" the WAL by renaming it. + try { + String postRenameWalPath = getPostRenameWalPath(); + if (splitWALManager.ifExistRenameWALForRetry(this.walPath, postRenameWalPath)) { + this.walPath = postRenameWalPath; + setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); + return Flow.HAS_MORE_STATE; + } else { + // The method ifExistRenameWALForRetry will return false if the WAL file at walPath does + // not exist and has not been renamed to postRenameWalPath. This can only happen if the + // WALSplit was already completed before the flow reached this point.. + ServerCrashProcedure.updateProgress(env, getParentProcId()); + return Flow.NO_MORE_STATE; + } + } catch (IOException ioe) { + LOG.warn("Failed to rename the splitting wal {}", walPath); + setTimeoutForSuspend(env, ioe.getMessage()); + throw new ProcedureSuspendedException(); + } + default: throw new UnsupportedOperationException("unhandled state=" + state); } } + private String getPostRenameWalPath() { + // If the WAL split is retried with a different worker, ".retrying-xxx" is appended to the + // walPath. We cannot maintain a workerChangeCount in the SplitWALProcedure class due to the + // following scenario: + // If a SplitWALProcedure is bypassed or rolled back after being retried with another worker, + // the walPath will still have the ".retrying-xxx" suffix. During recovery, a new + // SplitWALProcedure (potentially by a different SCP) will be created. This new procedure will + // have its workerChangeCount initialized to 0, but the walPath will retain the ".retrying-xxx" + // suffix from the previous retry. To handle all these scenarios, we need to move to the next + // count(retrying-xxy) in order to properly fence the last worker. + String originalWALPath, postRenameWalPath; + int workerChangeCount = 0; + if (walPath.substring(walPath.length() - RETRYING_EXT.length() - 3).startsWith(RETRYING_EXT)) { + originalWALPath = walPath.substring(0, walPath.length() - RETRYING_EXT.length() - 3); + workerChangeCount = Integer.parseInt(walPath.substring(walPath.length() - 3)); + } else { + originalWALPath = walPath; + } + return originalWALPath + RETRYING_EXT + String.format("%03d", (workerChangeCount + 1) % 1000); + + } + + private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + @Override protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState splitOneWalState) throws IOException, InterruptedException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 13d2886182e4..723ea16c9457 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -234,9 +234,16 @@ static void requestLogRoll(final WAL wal) { static final String DEFAULT_PROVIDER_ID = "default"; // Implementation details that currently leak in tests or elsewhere follow - /** File Extension used while splitting an WAL into regions (HBASE-2312) */ + /** + * File Extension used while splitting an WAL into regions (HBASE-2312) This is used with the + * directory name/path + */ public static final String SPLITTING_EXT = "-splitting"; + // Extension for the WAL where the split failed on one worker and is being retried on another. + // this is used with the WAL file itself + public static final String RETRYING_EXT = ".retrying"; + /** * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for * description. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java index ea92f7922794..8c1b40500879 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java @@ -19,10 +19,13 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.RETRYING_EXT; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.fs.FileStatus; @@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; @@ -94,6 +98,37 @@ public void teardown() throws Exception { TEST_UTIL.shutdownMiniCluster(); } + @Test + public void testRenameWALForRetryWALSplit() throws Exception { + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + List logDirs = + master.getMasterWalManager().getLogDirs(Collections.singleton(regionServer.getServerName())); + List list = + SplitLogManager.getFileList(TEST_UTIL.getConfiguration(), logDirs, NON_META_FILTER); + Path testWal = list.get(0).getPath(); + + String newWALPath = testWal + RETRYING_EXT + "001"; + boolean result = splitWALManager.ifExistRenameWALForRetry(testWal.toString(), newWALPath); + Assert.assertTrue(result); + + List fileStatusesA = + SplitLogManager.getFileList(TEST_UTIL.getConfiguration(), logDirs, NON_META_FILTER); + Path walFromFileSystem = null; + for (FileStatus wal : fileStatusesA) { + if (wal.getPath().toString().endsWith(RETRYING_EXT + "001")) { + walFromFileSystem = wal.getPath(); + break; + } + } + Assert.assertNotNull(walFromFileSystem); + Assert.assertEquals(walFromFileSystem.toString(), newWALPath); + + // if file is already renamed it should return true + boolean resultAfterRename = + splitWALManager.ifExistRenameWALForRetry(testWal.toString(), newWALPath); + Assert.assertTrue(resultAfterRename); + } + @Test public void testAcquireAndRelease() throws Exception { List testProcedures = new ArrayList<>(); @@ -166,7 +201,7 @@ public void testCreateSplitWALProcedures() throws Exception { Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); // Test splitting wal - wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); + wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, NON_META_FILTER); Assert.assertEquals(1, wals.length); testProcedures = splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);