-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[admin-tool] Add a cluster batch processing framework command and a system store empty push task #1254
base: main
Are you sure you want to change the base?
Conversation
53d2c05
to
bd3bc34
Compare
…ystem store empty push task
bd3bc34
to
10096f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Few minor comments
String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.CLUSTER_BATCH_TASK); | ||
int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1")); | ||
System.out.println( | ||
"[**** Cluster Command Params ****] Cluster: " + clusterName + ", Task: " + task + ", Checkpoint: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use LOGGER to log this info?
try { | ||
Path checkpointFilePath = Paths.get(checkpointFile); | ||
if (!Files.exists(checkpointFilePath.toAbsolutePath())) { | ||
System.out.println( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use System.err
?
if (SystemStorePushTask.TASK_NAME.equals(task)) { | ||
functionSupplier = () -> new SystemStorePushTask(controllerClient, controllerClientMap, clusterName); | ||
} else { | ||
System.out.println("Undefined task: " + task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use System.err
here as well?
* This class is a simple runnable which keeps fetching task from list and execute the assigned task. The task fetching | ||
* and progress tracking / checkpointing is thread-safe, so it can be run in parallel. | ||
*/ | ||
public class ClusterTaskRunner implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the name of this class to something else? ClusterTaskRunner seems misleading to me
*/ | ||
public class ClusterTaskRunner implements Runnable { | ||
private static final Logger LOGGER = LogManager.getLogger(ClusterTaskRunner.class); | ||
private static final String TASK_LOG_PREFIX = "[**** TASK INFO ****]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need TASK_LOG_PREFIX
? We can alway grep logs based on the logger name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we scope it to provided system store type?
[admin-tool] Add a cluster batch processing framework command and a system store empty push task
This is just a side-effect PR I created for batch processing all stores in cluster when I am empty pushing all system stores to apply config updates.
Add the new command so you can write your store-oriented task and execute it cluster-wide with the admin-tool. It supports basic checkpointing and parallel processing.
With this, there is a system store empty push task in this PR as well, it did sanity checks and empty push to all system stores for a specific user store.
I think it can be further optimized and used for other purpose, but I found it efficient to empty push system store to apply ZK-shared store config update.
How was this PR tested?
This tool has been directly used in test environment and production environment.
Does this PR introduce any user-facing changes?