diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 2a52d13a68..1a1dd44148 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -589,7 +589,7 @@ public static void main(String[] args) throws Exception { break; case DUMP_HOST_HEARTBEAT: dumpHostHeartbeat(cmd); - case RUN_CLUSTER_COMMAND: + case CLUSTER_BATCH_TASK: runClusterCommand(cmd); break; default: @@ -857,9 +857,9 @@ private static void deleteStore(CommandLine cmd) throws IOException { } private static void runClusterCommand(CommandLine cmd) { - String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.RUN_CLUSTER_COMMAND); - String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.RUN_CLUSTER_COMMAND); - String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.RUN_CLUSTER_COMMAND); + String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.CLUSTER_BATCH_TASK); + String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.CLUSTER_BATCH_TASK); + String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.CLUSTER_BATCH_TASK); int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1")); System.out.println( "[**** Cluster Command Params ****] Cluster: " + clusterName + ", Task: " + task + ", Checkpoint: " @@ -881,11 +881,13 @@ private static void runClusterCommand(CommandLine cmd) { // Load progress from checkpoint file. If file does not exist, it will create new one during checkpointing. try { - Path filePath = Paths.get(checkpointFile).toAbsolutePath(); - if (!Files.exists(filePath)) { - System.out.println("Checkpoint file path does not exist, will create a new checkpoint file: " + filePath); + Path checkpointFilePath = Paths.get(checkpointFile); + if (!Files.exists(checkpointFilePath.toAbsolutePath())) { + System.out.println( + "Checkpoint file path does not exist, will create a new checkpoint file: " + + checkpointFilePath.toAbsolutePath()); } else { - List fileLines = Files.readAllLines(Paths.get(checkpointFile)); + List fileLines = Files.readAllLines(checkpointFilePath); for (String line: fileLines) { String storeName = line.split(",")[0]; // For now, it is boolean to start with, we can add more states to support retry. @@ -902,9 +904,9 @@ private static void runClusterCommand(CommandLine cmd) { List taskList = progressMap.entrySet().stream().filter(e -> !e.getValue()).map(Map.Entry::getKey).collect(Collectors.toList()); - // Validate task type. + // Validate task type. For now, we only has one task, if we have more task in the future, we can extend this logic. Supplier> functionSupplier; - if ("PushSystemStore".equals(task)) { + if (SystemStorePushTask.TASK_NAME.equals(task)) { functionSupplier = () -> new SystemStorePushTask(controllerClient, controllerClientMap, clusterName); } else { System.out.println("Undefined task: " + task); diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 52ba7442c4..2237bdede4 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -128,9 +128,9 @@ import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE; import static com.linkedin.venice.Arg.TARGET_SWAP_REGION; import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME; -import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES; import static com.linkedin.venice.Arg.TASK_NAME; import static com.linkedin.venice.Arg.THREAD_COUNT; +import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES; import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED; import static com.linkedin.venice.Arg.URL; import static com.linkedin.venice.Arg.VALUE_SCHEMA; @@ -204,8 +204,8 @@ public enum Command { "backfill-system-stores", "Create system stores of a given type for user stores in a cluster", new Arg[] { URL, CLUSTER, SYSTEM_STORE_TYPE } ), - RUN_CLUSTER_COMMAND( - "run-cluster-command", "Run specific task for all user stores in a cluster", + CLUSTER_BATCH_TASK( + "cluster-batch-task", "Run specific task against all user stores in a cluster in parallel", new Arg[] { URL, CLUSTER, TASK_NAME, CHECKPOINT_FILE }, new Arg[] { THREAD_COUNT } ), SET_VERSION( diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java index 0846e360d2..cf37fbd5d3 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java @@ -28,6 +28,7 @@ * It will also skip empty push to store which is being migrated and is in the destination cluster. */ public class SystemStorePushTask implements Function { + public static final String TASK_NAME = "PushSystemStore"; private static final Logger LOGGER = LogManager.getLogger(SystemStorePushTask.class); private static final int JOB_POLLING_RETRY_COUNT = 200; private static final int JOB_POLLING_RETRY_PERIOD_IN_SECONDS = 5; @@ -65,6 +66,19 @@ public Boolean apply(String storeName) { for (VeniceSystemStoreType type: SYSTEM_STORE_TYPE) { String systemStoreName = type.getSystemStoreName(storeName); + /** + * In current implementation, a push to system store will flip the flag to true, which can introduce unexpected + * behavior to the store. Here, we skip the system store push if it is turned off. + */ + boolean isSystemStoreEnabled = VeniceSystemStoreType.META_STORE.equals(type) + ? storeResponse.getStore().isStoreMetaSystemStoreEnabled() + : storeResponse.getStore().isDaVinciPushStatusStoreEnabled(); + if (!isSystemStoreEnabled) { + LOGGER.warn( + "{} System store: {} is disabled. Will skip the push.", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName); + } VersionResponse response = parentControllerClient.getStoreLargestUsedVersion(clusterName, systemStoreName); if (response.isError()) { LOGGER.error(