-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-8154 Make order metadata epoch handle changes to task parallelism & shard set #207
Conversation
@jpechane Can you review this when you get the chance? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that the change is mainly about introducing the concept of ShardEpoch map and refactoring code to share the implementation of shardEpoch and shardVGtid, the title of the PR might need to reflect that.
} | ||
shardGtids.add(new Vgtid.ShardGtid(keyspace, shard, gtidStr)); | ||
config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.GTID); | ||
if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like should be if ... else structure otherwise line 156 was executed in vain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getVitessTaskValuePerShard
actually modifies the map (since it passes it in as an arg). However, that is not clear (not well named) and better to not have hidden side effects on the config. I updated so it simply returns the value and then the config editing is done in this function.
@@ -248,6 +249,17 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue | |||
+ " If not configured, the connector streams changes from the latest position for the given shard(s)." | |||
+ " If snapshot.mode is INITIAL (default), the connector starts copying the tables for the given shard(s) first regardless of gtid value."); | |||
|
|||
public static final Field SHARD_EPOCH_MAP = Field.create(VITESS_CONFIG_GROUP_PREFIX + "shard.epoch.map") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be a big map to pass in and potentially error-prone, when will people pass in this epoch on command line? It this config mainly used for carry the current epoch values from a running system?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has two benefits
- The logic between the config VGTID (which is similarly large & error prone) and this config can be the same, so our refactored functions can have the same logic
- If someone is migrating to a new connector for whatever reason, and they do not want to lose all epoch values (ie preserve consistency, no need to re-bootstrap everything), they can pass in the shard epoch map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that the change is mainly about introducing the concept of ShardEpoch map and refactoring code to share the implementation of shardEpoch and shardVGtid, the title of the PR might need to reflect that.
That is all true. The result is that now, similar to VGTID, it will be redistributed correctly whenever the shards assigned to tasks changes (eg task parallelism change or shard set change). So the title highlights the effect, not so much the implementation. I added more details on what you mentioned to the description to help make it clearer.
@@ -248,6 +249,17 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue | |||
+ " If not configured, the connector streams changes from the latest position for the given shard(s)." | |||
+ " If snapshot.mode is INITIAL (default), the connector starts copying the tables for the given shard(s) first regardless of gtid value."); | |||
|
|||
public static final Field SHARD_EPOCH_MAP = Field.create(VITESS_CONFIG_GROUP_PREFIX + "shard.epoch.map") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has two benefits
- The logic between the config VGTID (which is similarly large & error prone) and this config can be the same, so our refactored functions can have the same logic
- If someone is migrating to a new connector for whatever reason, and they do not want to lose all epoch values (ie preserve consistency, no need to re-bootstrap everything), they can pass in the shard epoch map.
} | ||
shardGtids.add(new Vgtid.ShardGtid(keyspace, shard, gtidStr)); | ||
config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.GTID); | ||
if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getVitessTaskValuePerShard
actually modifies the map (since it passes it in as an arg). However, that is not clear (not well named) and better to not have hidden side effects on the config. I updated so it simply returns the value and then the config editing is done in this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks for the refactoring and support for shardEpoch
@twthorn Looks nice, thanks! |
Please just send a docs PR with the new config option. |
Right now each task simply reads the epoch from its last stored offsets. As seen by the logic for vgtid, we need to be able to handle changes in generation if task offset storage mode is enabled (e.g., task parallelism change or shard subset change). Add this feature by refactoring the logic for vgtid to be more generic so we can do equivalent operations on the shard epochs (when enabled). Add
ShardEpochMap
for centralizing the ser/deser operations of reading the shard epoch map from state & updating it. Operate on VGTID and ShardEpochMap in similar manners to accomplish the same goal of redistributing correctly on task parallelism or shard change.Note: there is one "TODO" intentionally left in the code: we will soon put out another PR to establish shard lineage and allow for inheriting the epoch of a parent shard when a split occurs. This PR is already substantial so we will do that in a separate one.