Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compat] [server] [client] [test] Global RT DIV: Chunking Support #1385

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

KaiSernLim
Copy link
Contributor

@KaiSernLim KaiSernLim commented Dec 11, 2024

Summary

Continuation from #1257.

This PR mainly focuses on adding chunking support for DIV messages when they are produced to Kafka topics, as the size of the DIV message can surpass the ~1MB Kafka message limit. The existing chunking mechanism is reused, including the CHUNK and CHUNKED_VALUE_MANIFEST values in the message's schemaId:

Every DIV message has GLOBAL_RT_DIV for the header byte in its KafkaKey. The corresponding KafkaMessageEnvelope has a Put payload utilizing the putValue field containing the GlobalRtDiv data, and which has the following schemaId:

  1. If the DIV message is non-chunked, the KME schemaId is set to the current protocol version of GLOBAL_RT_DIV.
  2. If the DIV message is chunk message, the KME schemaId is set to CHUNK.
  3. If the DIV message is a chunk manifest message, the KME schemaId is set to CHUNKED_VALUE_MANIFEST. The schemaId of the ChunkedValueManifest will be the current protocol version of GLOBAL_RT_DIV.

ChunkAssembler is adapted, on the receiver side, to buffer, assemble, and deserialize DIV messages (chunked/non-chunked).

GlobalRtDivMessages@2x

DivChunking

Changes

  1. Added a new MessageType called GlobalRtDiv, which reuses the Put message type format and objects. When the Venice server encounters a message with KafkaKey containing the GlobalRtDiv header byte, it will know to process this message differently from a regular Put.
    • The only indication of this new GlobalRtDiv message type is the header byte in KafkaKey. Otherwise, it's identical to a regular Put.
    • KafkaMessageEnvelope.avsc will not be updated to avoid the unnecessary risk of incompatible avro formats when upgrading the cluster.
    • The risk in not creating a dedicated GlobalRtDiv message type in KME is that the GlobalRtDiv objects will be processed as user records and stored in the storage engine, which seems to be much less scary than a cluster upgrade issue.
  2. Changed bufferAndAssembleRecord() in ChunkAssembler to use a deserialization Function instead of a RecordDeserializer. This is because RecordDeserializer is on a separate inheritance path to the InternalAvroSpecificSerializer.
  3. GlobalRtDiv messages should not be processed if they originate from remote VT and RT, because those are invalid scenarios. These two conditions are checked.

Minor Changes

  1. Renamed resetUpstreamOffsetMap() to mergeUpstreamOffsets() in OffsetRecord.
  2. Updated toString() in KafkaKey, which incorrectly assumed all messages would be ControlMessage, Put, or Delete. This misses Update messages and the new GlobalRtDiv message that is being added.
  3. Added various helper methods (buildPutPayload() and buildManifestPayload()) in VeniceWriter for creating the Put payloads and when chunking is involved.

Testing

  1. Unit Tests
    1. testGlobalRtDivChunking() in VeniceWriterUnitTest
    2. testShouldProcessRecordForGlobalRtDivMessage() in StoreIngestionTaskTest
    3. testProcessGlobalRtDivMessage() in StoreIngestionTaskTest
  2. Integration Test
    1. testChunkedDiv() in TestGlobalRtDiv

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.

2. Split `ChunkAssembler` for RT DIV into its own object. 🍐
3. `GlobalRtDiv` serializer is per-message to be safe, because it doesn't seem to be thread-safe. 🍋‍🟩
4. Fixed spotbugs. 🌶️
5. Fixed `divChunkAssembler` for the SIT unit test. 🫨
@KaiSernLim KaiSernLim force-pushed the global-rt-div-chunking branch from 7ee8cd9 to d3ae029 Compare December 12, 2024 19:36
…will have `GLOBAL_RT_DIV` as the `MessageType`. 🏯
@@ -1479,7 +1482,7 @@ protected void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState part
upstreamTopic = versionTopic;
}
if (upstreamTopic.isRealTime()) {
offsetRecord.resetUpstreamOffsetMap(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap());
offsetRecord.mergeUpstreamOffsets(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap());
Copy link
Contributor

@sixpluszero sixpluszero Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you do this change? Personally I think this makes less explicit about the offset map we are tracking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I feel that reset() is not the right word for it. It's not clearing the state or replacing the map entirely. It's updating the existing map with the values of the new map and retaining the values missing from the new map.

I can rename it to updateUpstreamOffsets().

@@ -21,7 +21,8 @@
*/
public enum MessageType implements VeniceEnumValue {
PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE),
CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE);
CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE),
GLOBAL_RT_DIV(4, Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is ok to not add it into KME. I am thinking for example, Da Vinci and CDC, if the code is not upgraded, how can it be handling the global rt div? Or should we completely mute them from processing RT div message?
Will this controlled by store config or just server config?

Copy link
Contributor

@lluwm lluwm Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, Davinci and CDC don't need to process the RT div messages. Only the next leader has to read and process it, right. They can simply ignore them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's controlled by server config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, with this code change, we will eventually enable the feature to produce RT DIV snapshot to VT topic. This will be processed by follower, which also applicable to DVC. I know DVC won't be leader, but will the existing code handle unknown new msg type gracefully?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. The reader of the msg type (to be able to ignore it) has to be deployed before enabling this feature. i don't know if there is other way to walk around it, feel free to suggest alternatives.

Copy link
Contributor Author

@KaiSernLim KaiSernLim Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't look very deep, but I think that the existing code will throw an exception when encountering an unknown message type.

Handling an unknown message type will be implemented in a future PR. Since we control the feature flag, we can choose not to enable the feature flag until all server instances support the new message type so there won't be any messages with this message type floating around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to deploy the changes to all VT consumers before enabling this feature, which includes:

  1. Server.
  2. DaVinci.
  3. ETL.
  4. CDC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it is better to treat the new KME type as a Control Message, and I would like to learn why if we can't do that.

Copy link
Contributor

@gaojieliu gaojieliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to call out another item we didn't discuss in the review meeting:

  1. Cleanup the Global RT DIV messages from VT.

This new type of message can be large and with chunking support, we are leaking the data chunks as chunk id is unique.
I was thinking some strategy to let follower to send out a Kafka delete or Kafka message with empty value for the previous key after consuming a new Global RT DIV message, and if we don't do the cleanup, the size of Kafka topic might grow a lot depending on the sending frequency.
Maybe we don't need to implement such cleanup in the MVP, but I think eventually, we need some way to clean them up from the version topics.

@@ -1904,6 +1906,10 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
}
}
}
// Global RT DIV messages should be completely ignored when leader is consuming from remote version topic
if (record.getKey().isGlobalRtDiv()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have Global RT DIV messages in the remote version topic?
There are a few cases for remote version topic consumption:

  1. NR for the venice push job.
  2. Streaming Reprocessing job.
  3. Data recovery for batch store.
    None of the use cases in theory would encounter Global RT DIV messages.
    Today, there is no support for hybrid store data recovery and it is being done via re-push.
    So can you explain the scenarios in your mind?

@@ -467,6 +472,8 @@ public StoreIngestionTask(
new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion);
this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic));
this.chunkAssembler = new ChunkAssembler(storeName);
this.divChunkAssembler =
builder.getDivChunkAssembler() != null ? builder.getDivChunkAssembler() : new ChunkAssembler(storeName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ChunkAssembler is store specific, so can we use the instance from the builder, which typically only contains the sharable objects?

put.getPutValue(),
record.getOffset(),
put.getSchemaId(),
new NoopCompressor(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we always use a certain compression algo for GLOBAL RT DIV messages?
We know DIV is large sometimes, I wondered whether we can always enable gzip compression or not to reduce the Kafka usage.

put.getSchemaId(),
new NoopCompressor(),
(valueBytes) -> GLOBAL_RT_DIV_STATE.getSerializer()
.deserialize(ByteUtils.extractByteArray(valueBytes), GLOBAL_RT_DIV_STATE_SCHEMA_ID));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update InternalAvroSpecificSerializer to support a ByteBuffer as the input?

KafkaMessageEnvelope value = record.getValue();
Put put = (Put) value.getPayloadUnion();

Object assembledObject = divChunkAssembler.bufferAndAssembleRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this function is using in-memory storage engine as a temp storage, which will increase the memory usage, which is different from the regular data chunk handling, which would minimize the memory usage, and I think this is also what we agreed during the design review.
cc @lluwm

@@ -3786,7 +3827,7 @@ private int processKafkaDataMessage(
private void waitReadyToProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record)
throws InterruptedException {
KafkaMessageEnvelope kafkaValue = record.getValue();
if (record.getKey().isControlMessage() || kafkaValue == null) {
if (record.getKey().isControlMessage() || record.getKey().isGlobalRtDiv() || kafkaValue == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Global RT DIV be one type of Control Message?

@@ -336,5 +338,13 @@ public Builder setAAWCWorkLoadProcessingThreadPool(ExecutorService executorServi
public ExecutorService getAAWCWorkLoadProcessingThreadPool() {
return this.aaWCWorkLoadProcessingThreadPool;
}

public Builder setDivChunkAssembler(ChunkAssembler divChunkAssembler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkAssember has a store name field, and in theory, it is not sharable among store ingestion tasks with different stores.
Can you explain your thinking here?

@@ -21,7 +21,8 @@
*/
public enum MessageType implements VeniceEnumValue {
PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE),
CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE);
CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE),
GLOBAL_RT_DIV(4, Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to deploy the changes to all VT consumers before enabling this feature, which includes:

  1. Server.
  2. DaVinci.
  3. ETL.
  4. CDC.

@@ -21,7 +21,8 @@
*/
public enum MessageType implements VeniceEnumValue {
PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE),
CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE);
CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE),
GLOBAL_RT_DIV(4, Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it is better to treat the new KME type as a Control Message, and I would like to learn why if we can't do that.

"type": "map",
"doc": "A map that maps producer GUID -> producer state for realtime data.",
"values": {
"name": "ProducerPartitionState",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to reference the existing definition in PartitionState.avsc?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants