diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index 97b6ba9d6e7..18395b7b46f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -758,7 +758,7 @@ protected Optional, VeniceChangeCoordinate>> con if (messageType.equals(MessageType.PUT)) { Put put = (Put) message.getValue().payloadUnion; // Select appropriate deserializers - Lazy deserializerProvider; + Lazy deserializerProvider; int readerSchemaId; if (pubSubTopicPartition.getPubSubTopic().isVersionTopic()) { deserializerProvider = Lazy.of(() -> storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId)); @@ -783,9 +783,9 @@ protected Optional, VeniceChangeCoordinate>> con keyBytes, put.getPutValue(), message.getOffset(), - deserializerProvider, readerSchemaId, - compressor); + compressor, + (valueBytes) -> deserializerProvider.get().deserialize(valueBytes)); if (assembledObject == null) { // bufferAndAssembleRecord may have only buffered records and not returned anything yet because // it's waiting for more input. In this case, just return an empty optional for now. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 311156d00a7..3fa3f284f82 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -7,6 +7,8 @@ import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.PAUSE_TRANSITION_FROM_STANDBY_TO_LEADER; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY; +import static com.linkedin.davinci.validation.PartitionTracker.TopicType.REALTIME_TOPIC_TYPE; +import static com.linkedin.davinci.validation.PartitionTracker.TopicType.VERSION_TOPIC_TYPE; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT; import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER; @@ -33,6 +35,7 @@ import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.validation.KafkaDataIntegrityValidator; import com.linkedin.davinci.validation.PartitionTracker; +import com.linkedin.davinci.validation.PartitionTracker.TopicType; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.VeniceException; @@ -1479,7 +1482,7 @@ protected void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState part upstreamTopic = versionTopic; } if (upstreamTopic.isRealTime()) { - offsetRecord.resetUpstreamOffsetMap(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap()); + offsetRecord.mergeUpstreamOffsets(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap()); } else { offsetRecord.setCheckpointUpstreamVersionTopicOffset( partitionConsumptionState.getLatestProcessedUpstreamVersionTopicOffset()); @@ -1619,7 +1622,6 @@ protected static void checkAndHandleUpstreamOffsetRewind( int actualSchemaId = ByteUtils.readInt(actualValue, 0); Put put = (Put) envelope.payloadUnion; if (actualSchemaId == put.schemaId) { - if (put.putValue.equals( ByteBuffer.wrap( actualValue, @@ -1904,6 +1906,10 @@ protected boolean shouldProcessRecord(PubSubMessage> validate Iterable> records, String kafkaUrl, PubSubTopicPartition topicPartition) { - PartitionConsumptionState partitionConsumptionState = - partitionConsumptionStateMap.get(topicPartition.getPartitionNumber()); - if (partitionConsumptionState == null) { + final PartitionConsumptionState pcs = partitionConsumptionStateMap.get(topicPartition.getPartitionNumber()); + if (pcs == null) { // The partition is likely unsubscribed, will skip these messages. LOGGER.warn( - "No partition consumption state for store version: {}, partition:{}, will filter out all the messages", + "No partition consumption state for store version: {}, partition: {}, will filter out all the messages", kafkaVersionTopic, topicPartition.getPartitionNumber()); return Collections.emptyList(); } - boolean isEndOfPushReceived = partitionConsumptionState.isEndOfPushReceived(); - if (!shouldProduceToVersionTopic(partitionConsumptionState)) { + boolean isEndOfPushReceived = pcs.isEndOfPushReceived(); + if (!shouldProduceToVersionTopic(pcs)) { return records; } /** @@ -2302,30 +2307,15 @@ protected Iterable> validate Iterator> iter = records.iterator(); while (iter.hasNext()) { PubSubMessage record = iter.next(); - boolean isRealTimeMsg = record.getTopicPartition().getPubSubTopic().isRealTime(); + boolean isRealTimeTopic = record.getTopicPartition().getPubSubTopic().isRealTime(); try { /** * TODO: An improvement can be made to fail all future versions for fatal DIV exceptions after EOP. */ - if (!isGlobalRtDivEnabled) { - validateMessage( - PartitionTracker.VERSION_TOPIC, - this.kafkaDataIntegrityValidatorForLeaders, - record, - isEndOfPushReceived, - partitionConsumptionState); - } else { - validateMessage( - PartitionTracker.TopicType.of( - isRealTimeMsg - ? PartitionTracker.TopicType.REALTIME_TOPIC_TYPE - : PartitionTracker.TopicType.VERSION_TOPIC_TYPE, - kafkaUrl), - this.kafkaDataIntegrityValidatorForLeaders, - record, - isEndOfPushReceived, - partitionConsumptionState); - } + final TopicType topicType = (isGlobalRtDivEnabled) + ? TopicType.of(isRealTimeTopic ? REALTIME_TOPIC_TYPE : VERSION_TOPIC_TYPE, kafkaUrl) + : PartitionTracker.VERSION_TOPIC; + validateMessage(topicType, kafkaDataIntegrityValidatorForLeaders, record, isEndOfPushReceived, pcs); versionedDIVStats.recordSuccessMsg(storeName, versionNumber); } catch (FatalDataValidationException e) { if (!isEndOfPushReceived) { @@ -2342,7 +2332,7 @@ protected Iterable> validate "Skipping a duplicate record from: {} offset: {} for replica: {}", record.getTopicPartition(), record.getOffset(), - partitionConsumptionState.getReplicaId()); + pcs.getReplicaId()); iter.remove(); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 2e1856a7ce0..46255d9bfcd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -12,6 +12,7 @@ import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT; +import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.GLOBAL_RT_DIV_STATE; import static com.linkedin.venice.utils.Utils.FATAL_DATA_VALIDATION_ERROR; import static com.linkedin.venice.utils.Utils.getReplicaId; import static java.util.Comparator.comparingInt; @@ -48,6 +49,7 @@ import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.compression.NoopCompressor; import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; @@ -184,6 +186,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { private static final int MAX_KILL_CHECKING_ATTEMPTS = 10; private static final int CHUNK_MANIFEST_SCHEMA_ID = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + private static final int GLOBAL_RT_DIV_STATE_SCHEMA_ID = + AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion(); protected static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter(); @@ -317,6 +321,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final IngestionNotificationDispatcher ingestionNotificationDispatcher; protected final ChunkAssembler chunkAssembler; + protected final ChunkAssembler divChunkAssembler; private final Optional cacheBackend; private DaVinciRecordTransformer recordTransformer; @@ -467,6 +472,8 @@ public StoreIngestionTask( new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion); this.missingSOPCheckExecutor.execute(() -> waitForStateVersion(kafkaVersionTopic)); this.chunkAssembler = new ChunkAssembler(storeName); + this.divChunkAssembler = + builder.getDivChunkAssembler() != null ? builder.getDivChunkAssembler() : new ChunkAssembler(storeName); this.cacheBackend = cacheBackend; if (recordTransformerFunction != null) { @@ -1145,6 +1152,12 @@ private int handleSingleMessage( record.getTopicPartition().getPartitionNumber(), partitionConsumptionStateMap.get(topicPartition.getPartitionNumber())); } + } else if (record.getKey().isGlobalRtDiv()) { + // TODO: This is a placeholder for the actual implementation. + if (isGlobalRtDivEnabled) { + processGlobalRtDivMessage(record); // This is a global realtime topic data integrity validator snapshot + } + return 0; } // This function may modify the original record in KME and it is unsafe to use the payload from KME directly after @@ -1189,6 +1202,28 @@ private int handleSingleMessage( return record.getPayloadSize(); } + void processGlobalRtDivMessage(PubSubMessage record) { + KafkaKey key = record.getKey(); + KafkaMessageEnvelope value = record.getValue(); + Put put = (Put) value.getPayloadUnion(); + + Object assembledObject = divChunkAssembler.bufferAndAssembleRecord( + record.getTopicPartition(), + put.getSchemaId(), + key.getKey(), + put.getPutValue(), + record.getOffset(), + put.getSchemaId(), + new NoopCompressor(), + (valueBytes) -> GLOBAL_RT_DIV_STATE.getSerializer() + .deserialize(ByteUtils.extractByteArray(valueBytes), GLOBAL_RT_DIV_STATE_SCHEMA_ID)); + + if (assembledObject == null) { + return; // the message value only contained one data chunk, so the Global RT DIV cannot yet be fully assembled + } + // TODO: We will add the code to process Global RT DIV message later in here. + } + /** * This function is in charge of producing the consumer records to the writer buffers maintained by {@link StoreBufferService}. * @@ -2397,6 +2432,12 @@ protected boolean shouldProcessRecord(PubSubMessage record) throws InterruptedException { KafkaMessageEnvelope kafkaValue = record.getValue(); - if (record.getKey().isControlMessage() || kafkaValue == null) { + if (record.getKey().isControlMessage() || record.getKey().isGlobalRtDiv() || kafkaValue == null) { return; } @@ -4561,4 +4602,8 @@ void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) { protected boolean isDaVinciClient() { return isDaVinciClient; } + + ChunkAssembler getDivChunkAssembler() { + return this.divChunkAssembler; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java index 93211b6f24e..250be3ed7e6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java @@ -14,6 +14,7 @@ import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.view.VeniceViewWriterFactory; +import com.linkedin.davinci.utils.ChunkAssembler; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; @@ -126,6 +127,7 @@ public static class Builder { private PubSubTopicRepository pubSubTopicRepository; private Runnable runnableForKillIngestionTasksForNonCurrentVersions; private ExecutorService aaWCWorkLoadProcessingThreadPool; + private ChunkAssembler divChunkAssembler; private interface Setter { void apply(); @@ -336,5 +338,13 @@ public Builder setAAWCWorkLoadProcessingThreadPool(ExecutorService executorServi public ExecutorService getAAWCWorkLoadProcessingThreadPool() { return this.aaWCWorkLoadProcessingThreadPool; } + + public Builder setDivChunkAssembler(ChunkAssembler divChunkAssembler) { + return set(() -> this.divChunkAssembler = divChunkAssembler); + } + + public ChunkAssembler getDivChunkAssembler() { + return divChunkAssembler; + } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java index 3000207a31f..ae30e253866 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/utils/ChunkAssembler.java @@ -8,10 +8,11 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.serialization.RawBytesStoreDeserializerCache; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; -import com.linkedin.venice.serializer.RecordDeserializer; -import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import com.linkedin.venice.utils.ByteUtils; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.function.Function; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,10 +46,8 @@ public ChunkAssembler(String storeName) { } /** - * Buffers and assembles chunks of a record. - * - * If the record is chunked, it stores the chunks and returns null. - * Once all chunks of a record are received, it assembles, decompresses, and deserializes the record. + * Buffers and assembles chunks of a record, then decompresses and deserializes the record. + * @see #bufferAndAssembleRecord(PubSubTopicPartition, int, byte[], ByteBuffer, long, int, VeniceCompressor) */ public T bufferAndAssembleRecord( PubSubTopicPartition pubSubTopicPartition, @@ -56,9 +55,9 @@ public T bufferAndAssembleRecord( byte[] keyBytes, ByteBuffer valueBytes, long recordOffset, - Lazy> recordDeserializer, int readerSchemaId, - VeniceCompressor compressor) { + VeniceCompressor compressor, + Function deserializationFunction) { ByteBuffer assembledRecord = bufferAndAssembleRecord( pubSubTopicPartition, schemaId, @@ -67,28 +66,25 @@ public T bufferAndAssembleRecord( recordOffset, readerSchemaId, compressor); - T decompressedAndDeserializedRecord = null; - // Record is a chunk. Return null if (assembledRecord == null) { - return decompressedAndDeserializedRecord; + return null; // the value is a chunk, and the full record cannot yet be assembled until the manifest is reached } try { - decompressedAndDeserializedRecord = - decompressAndDeserialize(recordDeserializer.get(), compressor, assembledRecord); + return decompressAndDeserialize(deserializationFunction, compressor, assembledRecord); } catch (Exception e) { throw new RuntimeException(e); } - - return decompressedAndDeserializedRecord; } /** - * Buffers and assembles chunks of a record. + * Buffers and assembles chunks of a record, returning the compressed and serialized version of the assembled record. * - * If the record is chunked, it stores the chunks and returns null. - * Once all chunks of a record are received, it returns the compressed and serialized assembled record. + * 1. If the record is not chunked, the original record is returned without buffering it in-memory. + * 2. If the record is chunked, this function buffers the chunks in-memory and returns null. + * 3. Once all chunks specified by the {@link ChunkedValueManifest} of a record are received, this function returns + * the compressed and serialized version of the assembled record. */ public ByteBuffer bufferAndAssembleRecord( PubSubTopicPartition pubSubTopicPartition, @@ -103,19 +99,22 @@ public ByteBuffer bufferAndAssembleRecord( if (!inMemoryStorageEngine.containsPartition(pubSubTopicPartition.getPartitionNumber())) { inMemoryStorageEngine.addStoragePartition(pubSubTopicPartition.getPartitionNumber()); } - // If this is a record chunk, store the chunk and return null for processing this record - if (schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) { + + final Runnable putRecordToInMemoryStorageEngine = () -> { inMemoryStorageEngine.put( pubSubTopicPartition.getPartitionNumber(), keyBytes, - ValueRecord.create(schemaId, valueBytes.array()).serialize()); + // We need to extract data from valueBytes, otherwise the array could contain non-data + ValueRecord.create(schemaId, ByteUtils.extractByteArray(valueBytes)).serialize()); + }; + + if (schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) { + // If this is a chunk, store the chunk and return null because the full record cannot yet be assembled + putRecordToInMemoryStorageEngine.run(); return null; } else if (schemaId == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) { - // This is the last value. Store it, and now read it from the in memory store as a fully assembled value - inMemoryStorageEngine.put( - pubSubTopicPartition.getPartitionNumber(), - keyBytes, - ValueRecord.create(schemaId, valueBytes.array()).serialize()); + // This is the last value. Store it and read it from the in-memory store as a fully assembled record + putRecordToInMemoryStorageEngine.run(); try { assembledRecord = RawBytesChunkingAdapter.INSTANCE.get( inMemoryStorageEngine, @@ -137,7 +136,8 @@ public ByteBuffer bufferAndAssembleRecord( LOGGER.warn( "Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}", recordOffset, - pubSubTopicPartition.getPubSubTopic().getName()); + pubSubTopicPartition.getPubSubTopic().getName(), + ex); } } else { // this is a fully specified record, no need to buffer and assemble it, just return the valueBytes @@ -156,11 +156,14 @@ public ByteBuffer bufferAndAssembleRecord( return assembledRecord; } + /** + * Decompresses the value bytes using the input compressor and applies the provided deserialization function. + */ protected T decompressAndDeserialize( - RecordDeserializer deserializer, + Function deserializationFunction, VeniceCompressor compressor, ByteBuffer value) throws IOException { - return deserializer.deserialize(compressor.decompress(value)); + return deserializationFunction.apply(compressor.decompress(value)); } public void clearInMemoryDB() { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 2e49452a8bd..d22e47b6e4d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -103,6 +103,7 @@ import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig; import com.linkedin.davinci.transformer.TestStringRecordTransformer; +import com.linkedin.davinci.utils.ChunkAssembler; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; @@ -361,6 +362,7 @@ public static Object[][] sortedInputAndAAConfigProvider() { private AggVersionedDIVStats mockVersionedDIVStats; private AggVersionedIngestionStats mockVersionedStorageIngestionStats; private StoreIngestionTask storeIngestionTaskUnderTest; + private ChunkAssembler divChunkAssembler; private ExecutorService taskPollingService; private StoreBufferService storeBufferService; private AggKafkaConsumerService aggKafkaConsumerService; @@ -544,6 +546,7 @@ public void methodSetUp() throws Exception { mockRemoteKafkaConsumer = mock(PubSubConsumerAdapter.class); kafkaUrlToRecordsThrottler = new HashMap<>(); kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler); + divChunkAssembler = mock(ChunkAssembler.class); mockTopicManager = mock(TopicManager.class); mockTopicManagerRepository = mock(TopicManagerRepository.class); @@ -1116,6 +1119,7 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( .setPubSubTopicRepository(pubSubTopicRepository) .setPartitionStateSerializer(partitionStateSerializer) .setRunnableForKillIngestionTasksForNonCurrentVersions(runnableForKillNonCurrentVersion) + .setDivChunkAssembler(divChunkAssembler) .setAAWCWorkLoadProcessingThreadPool( Executors.newFixedThreadPool(2, new DaemonThreadFactory("AA_WC_PARALLEL_PROCESSING"))); } @@ -5192,6 +5196,112 @@ public void testGetTopicManager() throws Exception { }, AA_OFF); } + @Test + public void testShouldProcessRecordForGlobalRtDivMessage() throws Exception { + // Set up the environment. + StoreIngestionTaskFactory.Builder builder = mock(StoreIngestionTaskFactory.Builder.class); + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + doReturn(new DeepCopyStorageEngine(mockAbstractStorageEngine)).when(mockStorageEngineRepository) + .getLocalStorageEngine(anyString()); + doReturn(mockStorageEngineRepository).when(builder).getStorageEngineRepository(); + VeniceServerConfig veniceServerConfig = mock(VeniceServerConfig.class); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getClusterProperties(); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getKafkaConsumerConfigsForLocalConsumption(); + doReturn(VeniceProperties.empty()).when(veniceServerConfig).getKafkaConsumerConfigsForRemoteConsumption(); + doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig).getKafkaClusterUrlToIdMap(); + doReturn(veniceServerConfig).when(builder).getServerConfig(); + doReturn(mock(ReadOnlyStoreRepository.class)).when(builder).getMetadataRepo(); + doReturn(mock(ReadOnlySchemaRepository.class)).when(builder).getSchemaRepo(); + doReturn(mock(AggKafkaConsumerService.class)).when(builder).getAggKafkaConsumerService(); + doReturn(mockAggStoreIngestionStats).when(builder).getIngestionStats(); + doReturn(pubSubTopicRepository).when(builder).getPubSubTopicRepository(); + + Version version = mock(Version.class); + doReturn(1).when(version).getPartitionCount(); + doReturn(null).when(version).getPartitionerConfig(); + doReturn(VersionStatus.ONLINE).when(version).getStatus(); + doReturn(true).when(version).isNativeReplicationEnabled(); + doReturn("localhost").when(version).getPushStreamSourceAddress(); + + Store store = mock(Store.class); + doReturn(version).when(store).getVersion(eq(1)); + + String versionTopicName = "testStore_v1"; + String rtTopicName = "testStore_rt"; + VeniceStoreVersionConfig storeConfig = mock(VeniceStoreVersionConfig.class); + doReturn(Version.parseStoreFromVersionTopic(versionTopicName)).when(store).getName(); + doReturn(versionTopicName).when(storeConfig).getStoreVersionName(); + + LeaderFollowerStoreIngestionTask ingestionTask = spy( + new LeaderFollowerStoreIngestionTask( + mock(StorageService.class), + builder, + store, + version, + mock(Properties.class), + mock(BooleanSupplier.class), + storeConfig, + -1, + false, + Optional.empty(), + null, + null)); + + // Create a DIV record. + KafkaKey key = new KafkaKey(MessageType.GLOBAL_RT_DIV, "test_key".getBytes()); + KafkaMessageEnvelope value = new KafkaMessageEnvelope(); + value.payloadUnion = new Put(); + value.messageType = MessageType.PUT.getValue(); + PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic("testStore", 1)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic("testStore")); + + PubSubTopicPartition versionTopicPartition = new PubSubTopicPartitionImpl(versionTopic, PARTITION_FOO); + PubSubTopicPartition rtPartition = new PubSubTopicPartitionImpl(rtTopic, PARTITION_FOO); + PubSubMessage vtRecord = + new ImmutablePubSubMessage<>(key, value, versionTopicPartition, 1, 0, 0); + + PartitionConsumptionState pcs = mock(PartitionConsumptionState.class); + when(pcs.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER); + doReturn(true).when(pcs).consumeRemotely(); + doReturn(false).when(pcs).skipKafkaMessage(); + + OffsetRecord offsetRecord = mock(OffsetRecord.class); + doReturn(offsetRecord).when(pcs).getOffsetRecord(); + doReturn(pubSubTopicRepository.getTopic(versionTopicName)).when(offsetRecord).getLeaderTopic(any()); + ingestionTask.setPartitionConsumptionState(PARTITION_FOO, pcs); + + assertFalse(ingestionTask.shouldProcessRecord(vtRecord), "RT DIV From remote VT should not be processed"); + + when(pcs.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.STANDBY); + doReturn(false).when(pcs).consumeRemotely(); + assertTrue(ingestionTask.shouldProcessRecord(vtRecord), "RT DIV from local VT should be processed"); + + doReturn(pubSubTopicRepository.getTopic(rtTopicName)).when(offsetRecord).getLeaderTopic(any()); + PubSubMessage rtRecord = + new ImmutablePubSubMessage<>(key, value, rtPartition, 0, 0, 0); + assertFalse(ingestionTask.shouldProcessRecord(rtRecord), "RT DIV from RT should not be processed"); + + } + + @Test(dataProvider = "aaConfigProvider") + public void testProcessGlobalRtDivMessage(AAConfig aaConfig) throws Exception { + runTest(Collections.singleton(PARTITION_FOO), () -> { + // Arrange + KafkaKey key = new KafkaKey(MessageType.GLOBAL_RT_DIV, "test_key".getBytes()); + KafkaMessageEnvelope value = new KafkaMessageEnvelope(); + Put put = new Put(); + value.payloadUnion = put; + value.messageType = MessageType.PUT.getValue(); + PubSubMessage record = + new ImmutablePubSubMessage<>(key, value, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), 0, 0, 0); + // Act + storeIngestionTaskUnderTest.processGlobalRtDivMessage(record); + // Assert + verify(storeIngestionTaskUnderTest.getDivChunkAssembler()) + .bufferAndAssembleRecord(any(), anyInt(), any(), any(), anyLong(), anyInt(), any(), any()); + }, aaConfig); + } + @Test public void testResolveTopicPartitionWithKafkaURL() { StoreIngestionTask storeIngestionTask = mock(StoreIngestionTask.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java index 68e99e53468..a376569e569 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/MessageType.java @@ -21,7 +21,8 @@ */ public enum MessageType implements VeniceEnumValue { PUT(0, Constants.PUT_KEY_HEADER_BYTE), DELETE(1, Constants.PUT_KEY_HEADER_BYTE), - CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE); + CONTROL_MESSAGE(2, Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE), UPDATE(3, Constants.UPDATE_KEY_HEADER_BYTE), + GLOBAL_RT_DIV(4, Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE); private static final List TYPES = EnumUtils.getEnumValuesList(MessageType.class); @@ -58,10 +59,12 @@ public byte getKeyHeaderByte() { * - {@link com.linkedin.venice.kafka.protocol.Put} * - {@link com.linkedin.venice.kafka.protocol.Delete} * - {@link com.linkedin.venice.kafka.protocol.ControlMessage} + * - {@link com.linkedin.venice.kafka.protocol.Update} */ public Object getNewInstance() { switch (valueOf(value)) { case PUT: + case GLOBAL_RT_DIV: return new Put(); case DELETE: return new Delete(); @@ -86,5 +89,6 @@ public static class Constants { public static final byte PUT_KEY_HEADER_BYTE = 0; public static final byte CONTROL_MESSAGE_KEY_HEADER_BYTE = 2; public static final byte UPDATE_KEY_HEADER_BYTE = 4; + public static final byte GLOBAL_RT_DIV_KEY_HEADER_BYTE = 8; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java index b9ee11d58ef..911d721de7b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java @@ -291,6 +291,7 @@ public synchronized boolean addToCheckSum(KafkaKey key, KafkaMessageEnvelope mes + controlMessage.getControlMessageType()); } case PUT: + case GLOBAL_RT_DIV: updateCheckSum(messageEnvelope.getMessageType()); updateCheckSum(key.getKey()); Put putPayload = (Put) messageEnvelope.getPayloadUnion(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java index 0c67049ae90..2e5fd10fb44 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java @@ -159,6 +159,7 @@ public static int getSize(KafkaMessageEnvelope kme) { int size = KME_PARTIAL_CLASS_OVERHEAD; switch (MessageType.valueOf(kme)) { case PUT: + case GLOBAL_RT_DIV: size += getSize((Put) kme.payloadUnion); break; case DELETE: diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java index ae5971c7f0c..c6f9afafefd 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java @@ -66,6 +66,13 @@ public boolean isControlMessage() { return keyHeaderByte == MessageType.CONTROL_MESSAGE.getKeyHeaderByte(); } + /** + * @return true if this key corresponds to a GlobalRtDiv message, and false otherwise. + */ + public boolean isGlobalRtDiv() { + return keyHeaderByte == MessageType.GLOBAL_RT_DIV.getKeyHeaderByte(); + } + /** * @return the content of the key (everything beyond the first byte) */ @@ -77,9 +84,23 @@ public int getKeyLength() { return key == null ? 0 : key.length; } + private String messageTypeString() { + switch (keyHeaderByte) { + case MessageType.Constants.PUT_KEY_HEADER_BYTE: + return "PUT or DELETE"; // PUT_KEY_HEADER_BYTE corresponds to both PUT or DELETE + case MessageType.Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE: + return "CONTROL_MESSAGE"; + case MessageType.Constants.UPDATE_KEY_HEADER_BYTE: + return "UPDATE"; + case MessageType.Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE: + return "GLOBAL_RT_DIV"; + default: + return "UNKNOWN"; + } + } + public String toString() { - return getClass().getSimpleName() + "(" + (isControlMessage() ? "CONTROL_MESSAGE" : "PUT or DELETE") + ", " - + ByteUtils.toHexString(key) + ")"; + return getClass().getSimpleName() + "(" + messageTypeString() + ", " + ByteUtils.toHexString(key) + ")"; } public int getHeapSize() { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java index 7e925e5a1ef..0b02e6d6bca 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java @@ -273,12 +273,12 @@ public void cloneUpstreamOffsetMap(@Nonnull Map checkpointUpstream } /** - * Reset the checkpoint upstream offset map to another map provided as the input. - * @param checkpointUpstreamOffsetMap + * Update the checkpoint upstream offset map with new values from another map provided as the input. + * @param newUpstreamOffsetMap */ - public void resetUpstreamOffsetMap(@Nonnull Map checkpointUpstreamOffsetMap) { - Validate.notNull(checkpointUpstreamOffsetMap); - for (Map.Entry offsetEntry: checkpointUpstreamOffsetMap.entrySet()) { + public void mergeUpstreamOffsets(@Nonnull Map newUpstreamOffsetMap) { + Validate.notNull(newUpstreamOffsetMap); + for (Map.Entry offsetEntry: newUpstreamOffsetMap.entrySet()) { // leader offset can be the topic offset from any colo this.setLeaderUpstreamOffset(offsetEntry.getKey(), offsetEntry.getValue()); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java index 36970faa780..70ab1e4aa43 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java @@ -96,6 +96,7 @@ private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) { return putEnvelopePool.get(); // No need to pool control messages since there are so few of them, and they are varied anyway, limiting reuse. case MessageType.Constants.CONTROL_MESSAGE_KEY_HEADER_BYTE: + case MessageType.Constants.GLOBAL_RT_DIV_KEY_HEADER_BYTE: return new KafkaMessageEnvelope(); case MessageType.Constants.UPDATE_KEY_HEADER_BYTE: return updateEnvelopePool.get(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java index 00c4cefebf9..d482bf1ab94 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java @@ -12,6 +12,7 @@ import com.linkedin.venice.ingestion.protocol.ProcessShutdownCommand; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.state.GlobalRtDivState; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.Store; @@ -174,7 +175,12 @@ public enum AvroProtocolDefinition { * Value schema for change capture event. * TODO: Figure out a way to pull in protocol from different view class. */ - RECORD_CHANGE_EVENT(1, RecordChangeEvent.class); + RECORD_CHANGE_EVENT(1, RecordChangeEvent.class), + + /** + * Global Realtime Topic Data Integrity Validator is the RT DIV snapshot propagated from the leader to followers. + */ + GLOBAL_RT_DIV_STATE(1, GlobalRtDivState.class); private static final Set magicByteSet = validateMagicBytes(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 5d8cd79cb28..bd546d43b19 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -151,7 +151,7 @@ public class VeniceWriter extends AbstractVeniceWriter { /** * The default for {@link #maxRecordSizeBytes} is unlimited / unset (-1) just to be safe. A more specific default value - * should be set using {@link com.linkedin.venice.ConfigKeys#CONTROLLER_DEFAULT_MAX_RECORD_SIZE_BYTES} the controller + * should be set using {@link com.linkedin.venice.ConfigKeys#DEFAULT_MAX_RECORD_SIZE_BYTES} the controller * config on the cluster level. */ public static final int UNLIMITED_MAX_RECORD_SIZE = -1; @@ -952,17 +952,7 @@ public CompletableFuture put( KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, serializedKey); // Initialize the SpecificRecord instances used by the Avro-based Kafka protocol - Put putPayload = new Put(); - putPayload.putValue = ByteBuffer.wrap(serializedValue); - putPayload.schemaId = valueSchemaId; - - if (putMetadata == null) { - putPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; - putPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; - } else { - putPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); - putPayload.replicationMetadataPayload = putMetadata.getRmdPayload(); - } + Put putPayload = buildPutPayload(serializedValue, valueSchemaId, putMetadata); CompletableFuture produceResultFuture = sendMessage( producerMetadata -> kafkaKey, MessageType.PUT, @@ -971,6 +961,7 @@ public CompletableFuture put( callback, leaderMetadataWrapper, logicalTs); + DeleteMetadata deleteMetadata = new DeleteMetadata(valueSchemaId, putPayload.replicationMetadataVersionId, VeniceWriter.EMPTY_BYTE_BUFFER); PubSubProducerCallback chunkCallback = callback == null ? null : new ErrorPropagationCallback(callback); @@ -985,6 +976,114 @@ public CompletableFuture put( return produceResultFuture; } + /** + * This is the main method to send DIV messages to a kafka topic through VeniceWriter. The method decides whether to + * send the messages in chunked or non-chunked mode based on the size of the message. Today, DIV is the only user of + * this method, but it can be extended easily to support other class types in the future. + * + * All the messages sent through this method are of type {@link MessageType#GLOBAL_RT_DIV} in its KafkaKey and + * all their corresponding {@link KafkaMessageEnvelope} uses {@link Put} as the payload. Inside the Put payload, the + * actual message is stored in the putValue field and the schema id has 3 cases: + * + * 1. If the message is non-chunked, the schema id is set to {@link AvroProtocolDefinition#GLOBAL_RT_DIV_STATE}. + * 2. If the message is chunk message, the schema id is set to {@link AvroProtocolDefinition#CHUNK}. + * 3. If the message is a chunk manifest message, the schema id is set to {@link AvroProtocolDefinition#CHUNKED_VALUE_MANIFEST}. + */ + public CompletableFuture sendGlobalRtDivMessage(int partition, K key, V value) { + if (partition < 0 || partition >= numberOfPartitions) { + throw new VeniceException("Invalid partition: " + partition); + } + + byte[] serializedKey = keySerializer.serialize(topicName, key); + byte[] serializedValue = valueSerializer.serialize(topicName, value); + + if (isChunkingNeededForRecord(serializedKey.length + serializedValue.length)) { + return sendChunkedGlobalRtDivMessage(partition, serializedKey, serializedValue); + } + + serializedKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); + KafkaKey divKey = new KafkaKey(MessageType.GLOBAL_RT_DIV, serializedKey); + + // Initialize the SpecificRecord instances used by the Avro-based Kafka protocol + Put putPayload = + buildPutPayload(serializedValue, AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion(), null); + + // TODO: This needs to be implemented later to support Global RT DIV + final CompletableFuture completableFuture = new CompletableFuture<>(); + PubSubProducerCallback callback = new CompletableFutureCallback(completableFuture); + + return sendMessage( + producerMetadata -> divKey, + MessageType.PUT, + putPayload, + partition, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS); + } + + private CompletableFuture sendChunkedGlobalRtDivMessage( + int partition, + byte[] serializedKey, + byte[] serializedValue) { + final Supplier reportSizeGenerator = () -> getSizeReport(serializedKey.length, serializedValue.length, 0); + // TODO: This needs to be implemented later to support Global RT DIV. + final CompletableFuture completableFuture = new CompletableFuture<>(); + PubSubProducerCallback callback = new ErrorPropagationCallback(new CompletableFutureCallback(completableFuture)); + BiConsumer sendMessageFunction = (keyProvider, putPayload) -> sendMessage( + keyProvider, + MessageType.PUT, + putPayload, + partition, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + VENICE_DEFAULT_LOGICAL_TS); + + ChunkedPayloadAndManifest valueChunksAndManifest = WriterChunkingHelper.chunkPayloadAndSend( + serializedKey, + serializedValue, + MessageType.GLOBAL_RT_DIV, + true, + AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion(), + 0, + false, + reportSizeGenerator, + maxSizeForUserPayloadPerMessageInBytes, + keyWithChunkingSuffixSerializer, + sendMessageFunction); + + final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; + Put manifestPayload = + buildManifestPayload(null, null, valueChunksAndManifest, sizeAvailablePerMessage, reportSizeGenerator); + return sendManifestMessage( + manifestPayload, + serializedKey, + MessageType.GLOBAL_RT_DIV, + valueChunksAndManifest, + callback, + null, + partition, + null, + null, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS); + } + + private Put buildPutPayload(byte[] serializedValue, int valueSchemaId, PutMetadata putMetadata) { + Put putPayload = new Put(); + putPayload.putValue = ByteBuffer.wrap(serializedValue); + putPayload.schemaId = valueSchemaId; + + if (putMetadata == null) { + putPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + putPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + } else { + putPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); + putPayload.replicationMetadataPayload = putMetadata.getRmdPayload(); + } + return putPayload; + } + /** * Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to * speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer @@ -1507,6 +1606,7 @@ private CompletableFuture putLargeValue( ChunkedPayloadAndManifest valueChunksAndManifest = WriterChunkingHelper.chunkPayloadAndSend( serializedKey, serializedValue, + MessageType.PUT, true, valueSchemaId, 0, @@ -1520,6 +1620,7 @@ private CompletableFuture putLargeValue( ? WriterChunkingHelper.chunkPayloadAndSend( serializedKey, putMetadata == null ? EMPTY_BYTE_ARRAY : ByteUtils.extractByteArray(putMetadata.getRmdPayload()), + MessageType.PUT, false, valueSchemaId, valueChunkCount, @@ -1529,31 +1630,58 @@ private CompletableFuture putLargeValue( keyWithChunkingSuffixSerializer, sendMessageFunction) : EMPTY_CHUNKED_PAYLOAD_AND_MANIFEST; + + final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; + Put putManifestsPayload = buildManifestPayload( + rmdChunksAndManifest, + putMetadata, + valueChunksAndManifest, + sizeAvailablePerMessage, + reportSizeGenerator); + CompletableFuture manifestProduceFuture = sendManifestMessage( + putManifestsPayload, + serializedKey, + MessageType.PUT, + valueChunksAndManifest, + callback, + rmdChunksAndManifest, + partition, + oldValueManifest, + oldRmdManifest, + leaderMetadataWrapper, + logicalTs); + + DeleteMetadata deleteMetadata = new DeleteMetadata( + valueSchemaId, + putManifestsPayload.replicationMetadataVersionId, + VeniceWriter.EMPTY_BYTE_BUFFER); + deleteDeprecatedChunksFromManifest( + oldValueManifest, + partition, + chunkCallback, + leaderMetadataWrapper, + deleteMetadata); + deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); + + return manifestProduceFuture; + } + + private CompletableFuture sendManifestMessage( + Object manifestPayload, + byte[] serializedKey, + MessageType keyType, + ChunkedPayloadAndManifest valueChunksAndManifest, + PubSubProducerCallback callback, + ChunkedPayloadAndManifest rmdChunksAndManifest, + int partition, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs) { // Now that we've sent all the chunks, we can take care of the final value, the manifest. byte[] topLevelKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KeyProvider manifestKeyProvider = producerMetadata -> new KafkaKey(MessageType.PUT, topLevelKey); + KeyProvider manifestKeyProvider = producerMetadata -> new KafkaKey(keyType, topLevelKey); - Put putManifestsPayload = new Put(); - putManifestsPayload.putValue = - chunkedValueManifestSerializer.serialize(valueChunksAndManifest.getChunkedValueManifest()); - putManifestsPayload.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); - if (putMetadata == null) { - putManifestsPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; - putManifestsPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; - } else { - putManifestsPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); - putManifestsPayload.replicationMetadataPayload = isRmdChunkingEnabled - ? chunkedValueManifestSerializer.serialize(rmdChunksAndManifest.getChunkedValueManifest()) - : putMetadata.getRmdPayload(); - } - final int sizeAvailablePerMessage = maxSizeForUserPayloadPerMessageInBytes - serializedKey.length; - if (putManifestsPayload.putValue.remaining() - + putManifestsPayload.replicationMetadataPayload.remaining() > sizeAvailablePerMessage) { - // This is a very desperate edge case... - throw new VeniceException( - "This message cannot be chunked, because even its manifest is too big to go through. " - + "Please reconsider your life choices. " + reportSizeGenerator.get()); - } if (callback instanceof ChunkAwareCallback) { /** We leave a handle to the key, chunks and manifests so that the {@link ChunkAwareCallback} can act on them */ ((ChunkAwareCallback) callback).setChunkingInfo( @@ -1568,28 +1696,43 @@ private CompletableFuture putLargeValue( // We only return the last future (the one for the manifest) and assume that once this one is finished, // all the chunks should also be finished, since they were sent first, and ordering should be guaranteed. - CompletableFuture manifestProduceFuture = sendMessage( + return sendMessage( manifestKeyProvider, MessageType.PUT, - putManifestsPayload, + manifestPayload, partition, callback, leaderMetadataWrapper, logicalTs); + } - DeleteMetadata deleteMetadata = new DeleteMetadata( - valueSchemaId, - putManifestsPayload.replicationMetadataVersionId, - VeniceWriter.EMPTY_BYTE_BUFFER); - deleteDeprecatedChunksFromManifest( - oldValueManifest, - partition, - chunkCallback, - leaderMetadataWrapper, - deleteMetadata); - deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); - - return manifestProduceFuture; + private Put buildManifestPayload( + ChunkedPayloadAndManifest rmdChunksAndManifest, + PutMetadata putMetadata, + ChunkedPayloadAndManifest valueChunksAndManifest, + int sizeAvailablePerMessage, + Supplier reportSizeGenerator) { + Put putManifestsPayload = new Put(); + putManifestsPayload.putValue = + chunkedValueManifestSerializer.serialize(valueChunksAndManifest.getChunkedValueManifest()); + putManifestsPayload.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + if (putMetadata == null) { + putManifestsPayload.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + putManifestsPayload.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + } else { + putManifestsPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); + putManifestsPayload.replicationMetadataPayload = isRmdChunkingEnabled + ? chunkedValueManifestSerializer.serialize(rmdChunksAndManifest.getChunkedValueManifest()) + : putMetadata.getRmdPayload(); + } + if (putManifestsPayload.putValue.remaining() + + putManifestsPayload.replicationMetadataPayload.remaining() > sizeAvailablePerMessage) { + // This is a very desperate edge case... + throw new VeniceException( + "This message cannot be chunked, because even its manifest is too big to go through. " + + "Please reconsider your life choices. " + reportSizeGenerator.get()); + } + return putManifestsPayload; } /** @@ -1615,7 +1758,7 @@ private String getSizeReport(int serializedKeySize, int serializedValueSize, int return "Key size: " + serializedKeySize + " bytes, " + "Value size: " + serializedValueSize + " bytes, " + "Replication Metadata size: " + replicationMetadataPayloadSize + " bytes, " + "Total payload size: " + (serializedKeySize + serializedValueSize + replicationMetadataPayloadSize) + " bytes, " - + "Max available payload size: " + maxSizeForUserPayloadPerMessageInBytes + " bytes, " + ", Max record size: " + + "Max available payload size: " + maxSizeForUserPayloadPerMessageInBytes + " bytes, " + "Max record size: " + ((maxRecordSizeBytes == UNLIMITED_MAX_RECORD_SIZE) ? "unlimited" : maxRecordSizeBytes) + " bytes."; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java index 240043521e9..6801f950dde 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java @@ -39,6 +39,7 @@ public class WriterChunkingHelper { public static ChunkedPayloadAndManifest chunkPayloadAndSend( byte[] serializedKey, byte[] payload, + MessageType keyType, boolean isValuePayload, int schemaId, int chunkedKeySuffixStartingIndex, @@ -75,7 +76,7 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( subsequentKeyProvider = producerMetadata -> { ByteBuffer keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithSuffix); - return new KafkaKey(MessageType.PUT, keyWithSuffix.array()); + return new KafkaKey(keyType, keyWithSuffix.array()); }; firstKeyProvider = producerMetadata -> { chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID; @@ -83,6 +84,7 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber; return subsequentKeyProvider.getKey(producerMetadata); }; + for (int chunkIndex = 0; chunkIndex < numberOfChunks; chunkIndex++) { int chunkStartByteIndex = chunkIndex * sizeAvailablePerMessage; int chunkEndByteIndex = Math.min((chunkIndex + 1) * sizeAvailablePerMessage, payload.length); @@ -111,6 +113,7 @@ public static ChunkedPayloadAndManifest chunkPayloadAndSend( putPayload.putValue = EMPTY_BYTE_BUFFER; putPayload.replicationMetadataPayload = chunk; } + chunkedKeySuffix.chunkId.chunkIndex = chunkIndex + chunkedKeySuffixStartingIndex; keyProvider = chunkIndex == 0 ? firstKeyProvider : subsequentKeyProvider; diff --git a/internal/venice-common/src/main/resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc b/internal/venice-common/src/main/resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc new file mode 100644 index 00000000000..1b8a7fa4ef3 --- /dev/null +++ b/internal/venice-common/src/main/resources/avro/GlobalRtDivState/v1/GlobalRtDivState.avsc @@ -0,0 +1,85 @@ +{ + "name": "GlobalRtDivState", + "namespace": "com.linkedin.venice.kafka.protocol.state", + "doc": "Global Realtime Topic Data Integrity Validator is the RT DIV snapshot propagated from the leader to the followers via local VT", + "type": "record", + "fields": [ + { + "name": "srcUrl", + "doc": "Upstream Kafka bootstrap server url.", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "producerStates", + "type": { + "type": "map", + "doc": "A map that maps producer GUID -> producer state for realtime data.", + "values": { + "name": "ProducerPartitionState", + "namespace": "com.linkedin.venice.kafka.protocol.state", + "doc": "A record containing the state pertaining to the data sent by one upstream producer into one partition.", + "type": "record", + "fields": [ + { + "name": "segmentNumber", + "doc": "The current segment number corresponds to the last (highest) segment number for which we have seen a StartOfSegment control message.", + "type": "int" + }, + { + "name": "segmentStatus", + "doc": "The status of the current segment: 0 => NOT_STARTED, 1 => IN_PROGRESS, 2 => END_OF_INTERMEDIATE_SEGMENT, 3 => END_OF_FINAL_SEGMENT.", + "type": "int" + }, + { + "name": "isRegistered", + "doc": "Whether the segment is registered. i.e. received Start_Of_Segment to initialize the segment.", + "type": "boolean", + "default": false + }, + { + "name": "messageSequenceNumber", + "doc": "The current message sequence number, within the current segment, which we have seen for this partition/producer pair.", + "type": "int" + }, + { + "name": "messageTimestamp", + "doc": "The timestamp included in the last message we have seen for this partition/producer pair.", + "type": "long" + }, + { + "name": "checksumType", + "doc": "The current mapping is the following: 0 => None, 1 => MD5, 2 => Adler32, 3 => CRC32.", + "type": "int" + }, + { + "name": "checksumState", + "doc": "The value of the checksum computed since the last StartOfSegment ControlMessage.", + "type": "bytes" + }, + { + "name": "aggregates", + "doc": "The aggregates that have been computed so far since the last StartOfSegment ControlMessage.", + "type": { + "type": "map", + "values": "long" + } + }, + { + "name": "debugInfo", + "doc": "The debug info received as part of the last StartOfSegment ControlMessage.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + }, + "default": {} + } + ] +} \ No newline at end of file diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java index 2477b2ef839..2c1320d127b 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/protocol/enums/MessageTypeTest.java @@ -17,6 +17,7 @@ protected Map expectedMapping() { .put(1, MessageType.DELETE) .put(2, MessageType.CONTROL_MESSAGE) .put(3, MessageType.UPDATE) + .put(4, MessageType.GLOBAL_RT_DIV) .build(); } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java b/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java index ff560bc120e..de69498bd4f 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java @@ -53,7 +53,7 @@ public void testResetUpstreamOffsetMap() { Map testMap = new HashMap<>(); testMap.put(TEST_KAFKA_URL2, 2L); - offsetRecord.resetUpstreamOffsetMap(testMap); + offsetRecord.mergeUpstreamOffsets(testMap); // no upstream found for it so fall back to use the leaderOffset which is 1 Assert.assertEquals(offsetRecord.getUpstreamOffset(TEST_KAFKA_URL1), 1L); Assert.assertEquals(offsetRecord.getUpstreamOffset(TEST_KAFKA_URL2), 2L); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index f3a2d5ca071..cf3296388d0 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask; @@ -43,6 +44,7 @@ import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; +import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -76,6 +78,9 @@ public class VeniceWriterUnitTest { private static final long TIMEOUT = 10 * Time.MS_PER_SECOND; + private static final int CHUNK_MANIFEST_SCHEMA_ID = + AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + private static final int CHUNK_VALUE_SCHEMA_ID = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(); @Test(dataProvider = "Chunking-And-Partition-Counts", dataProviderClass = DataProviderUtils.class) public void testTargetPartitionIsSameForAllOperationsWithTheSameKey(boolean isChunkingEnabled, int partitionCount) { @@ -144,10 +149,7 @@ public void testDeleteDeprecatedChunk() { 0, null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, - new DeleteMetadata( - AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(), - 1, - WriterChunkingHelper.EMPTY_BYTE_BUFFER)); + new DeleteMetadata(CHUNK_VALUE_SCHEMA_ID, 1, WriterChunkingHelper.EMPTY_BYTE_BUFFER)); ArgumentCaptor keyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); @@ -331,9 +333,7 @@ public void testReplicationMetadataChunking() { // Check manifest for both value and rmd. KafkaMessageEnvelope actualValue4 = kmeArgumentCaptor.getAllValues().get(4); assertEquals(actualValue4.messageType, MessageType.PUT.getValue()); - assertEquals( - ((Put) actualValue4.payloadUnion).schemaId, - AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()); + assertEquals(((Put) actualValue4.payloadUnion).schemaId, CHUNK_MANIFEST_SCHEMA_ID); assertEquals(((Put) actualValue4.payloadUnion).replicationMetadataVersionId, putMetadata.getRmdVersionId()); assertEquals( ((Put) actualValue4.payloadUnion).replicationMetadataPayload, @@ -526,7 +526,7 @@ public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteSta pubSubMessageHeadersArgumentCaptor.capture(), any()); for (KafkaKey key: kafkaKeyArgumentCaptor.getAllValues()) { - Assert.assertTrue(Arrays.equals(HEART_BEAT.getKey(), key.getKey())); + assertTrue(Arrays.equals(HEART_BEAT.getKey(), key.getKey())); } for (KafkaMessageEnvelope kme: kmeArgumentCaptor.getAllValues()) { assertEquals(kme.messageType, MessageType.CONTROL_MESSAGE.getValue()); @@ -658,12 +658,61 @@ public void testPutTooLargeRecord(boolean isChunkingEnabled) { continue; // Ok behavior. Small records should never throw RecordTooLargeException } if (!isChunkingEnabled || size == TOO_LARGE_VALUE_SIZE) { - Assert.fail("Should've thrown RecordTooLargeException if chunking not enabled or record is too large"); + fail("Should've thrown RecordTooLargeException if chunking not enabled or record is too large"); } } catch (Exception e) { - Assert.assertTrue(e instanceof RecordTooLargeException); + assertTrue(e instanceof RecordTooLargeException); Assert.assertNotEquals(size, SMALL_VALUE_SIZE, "Small records shouldn't throw RecordTooLargeException"); } } } + + /** + * Writes two GlobalRtDiv messages, one that needs chunking and one that doesn't, and verifies the sent output. + */ + @Test(timeOut = TIMEOUT) + public void testGlobalRtDivChunking() { + final int NON_CHUNKED_VALUE_SIZE = BYTES_PER_MB / 2; // 500 KB + final int CHUNKED_VALUE_SIZE = BYTES_PER_MB * 2; // 2 MB + for (int size: Arrays.asList(NON_CHUNKED_VALUE_SIZE, CHUNKED_VALUE_SIZE)) { + CompletableFuture mockedFuture = mock(CompletableFuture.class); + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + final VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(TestWriteUtils.STRING_SCHEMA); + final VeniceWriterOptions options = new VeniceWriterOptions.Builder("testTopic").setPartitionCount(1) + .setKeySerializer(serializer) + .setValueSerializer(serializer) + .build(); + VeniceProperties props = VeniceProperties.empty(); + final VeniceWriter writer = new VeniceWriter<>(options, props, mockedProducer); + + char[] valueChars = new char[size]; + Arrays.fill(valueChars, '*'); + writer.sendGlobalRtDivMessage(0, "test-key", new String(valueChars)); + + // NON_CHUNKED_VALUE_SIZE: 1 SOS, 1 GlobalRtDiv Message + // CHUNKED_VALUE_SIZE: 1 SOS, 3 DivChunk, 1 DivManifest + final int invocationCount = (size == NON_CHUNKED_VALUE_SIZE) ? 2 : 5; + ArgumentCaptor keyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); + ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + verify(mockedProducer, times(invocationCount)) + .sendMessage(any(), any(), keyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); + keyArgumentCaptor.getAllValues().forEach(key -> assertTrue(key.isGlobalRtDiv() || key.isControlMessage())); + + for (KafkaMessageEnvelope kme: kmeArgumentCaptor.getAllValues()) { + if (kme.messageType == MessageType.CONTROL_MESSAGE.getValue()) { + ControlMessage controlMessage = ((ControlMessage) kme.getPayloadUnion()); + assertEquals(ControlMessageType.START_OF_SEGMENT.getValue(), controlMessage.getControlMessageType()); + } else { + Put put = (Put) kme.payloadUnion; + assertEquals(kme.messageType, MessageType.PUT.getValue()); + if (size == NON_CHUNKED_VALUE_SIZE) { + assertEquals(put.getSchemaId(), AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion()); + } else { + assertTrue(put.getSchemaId() == CHUNK_VALUE_SCHEMA_ID || put.getSchemaId() == CHUNK_MANIFEST_SCHEMA_ID); + } + } + } + } + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/WriterChunkingHelperTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/WriterChunkingHelperTest.java index 121b7388f99..f9877d61820 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/WriterChunkingHelperTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/WriterChunkingHelperTest.java @@ -1,5 +1,6 @@ package com.linkedin.venice.writer; +import com.linkedin.venice.kafka.protocol.enums.MessageType; import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import org.testng.Assert; import org.testng.annotations.Test; @@ -14,6 +15,7 @@ public void testChunkPayload() { ChunkedPayloadAndManifest result = WriterChunkingHelper.chunkPayloadAndSend( keyBytes, valueBytes, + MessageType.PUT, true, 1, 0, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalRtDiv.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalRtDiv.java new file mode 100644 index 00000000000..e9640b406a7 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalRtDiv.java @@ -0,0 +1,196 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS; +import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; +import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; +import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER; +import static com.linkedin.venice.ConfigKeys.SERVER_GLOBAL_RT_DIV_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; +import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY; +import static com.linkedin.venice.ConfigKeys.SSL_TO_KAFKA_LEGACY; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_KEY_SCHEMA; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_VALUE_SCHEMA; + +import com.linkedin.davinci.kafka.consumer.KafkaConsumerService; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.guid.GuidUtils; +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.state.GlobalRtDivState; +import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState; +import com.linkedin.venice.kafka.validation.SegmentStatus; +import com.linkedin.venice.kafka.validation.checksum.CheckSumType; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; +import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; +import org.apache.avro.util.Utf8; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestGlobalRtDiv { + private static final Logger LOGGER = LogManager.getLogger(TestGlobalRtDiv.class); + + private VeniceClusterWrapper sharedVenice; + + SecureRandom random = new SecureRandom(); + + @BeforeClass + public void setUp() { + Properties extraProperties = new Properties(); + extraProperties.setProperty(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB.name()); + extraProperties.setProperty(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L)); + + // N.B.: RF 2 with 3 servers is important, in order to test both the leader and follower code paths + sharedVenice = ServiceFactory.getVeniceCluster(1, 0, 0, 2, 1000000, false, false, extraProperties); + + Properties routerProperties = new Properties(); + + sharedVenice.addVeniceRouter(routerProperties); + // Added a server with shared consumer enabled. + Properties serverPropertiesWithSharedConsumer = new Properties(); + serverPropertiesWithSharedConsumer.setProperty(SSL_TO_KAFKA_LEGACY, "false"); + extraProperties.setProperty(SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER, "3"); + extraProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "4"); + extraProperties.setProperty( + SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY, + KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name()); + // Enable global div feature in the integration test. + extraProperties.setProperty(SERVER_GLOBAL_RT_DIV_ENABLED, "true"); + + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + sharedVenice.addVeniceServer(serverPropertiesWithSharedConsumer, extraProperties); + LOGGER.info("Finished creating VeniceClusterWrapper"); + } + + @AfterClass + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(sharedVenice); + } + + /** + * This test verifies functionality of sending chunked/non-chunked div messages: + * + * 1. Create a hybrid store and create a store version. + * 2. Send a non-chunked div message to the version topic. + * 3. Send a chunked div message to the version topic. + * 4. Verify the messages are sent successfully. + * 5. TODO: Add more verification steps on the server side later. + */ + @Test(timeOut = 180 * Time.MS_PER_SECOND) + public void testChunkedDiv() { + String storeName = Utils.getUniqueString("store"); + final int partitionCount = 1; + final int keyCount = 10; + + UpdateStoreQueryParams params = new UpdateStoreQueryParams() + // set hybridRewindSecond to a big number so following versions won't ignore old records in RT + .setHybridRewindSeconds(2000000) + .setHybridOffsetLagThreshold(10) + .setPartitionCount(partitionCount); + + sharedVenice.useControllerClient(client -> { + client.createNewStore(storeName, "owner", DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA); + client.updateStore(storeName, params); + }); + + // Create store version 1 by writing keyCount records. + sharedVenice.createVersion( + storeName, + DEFAULT_KEY_SCHEMA, + DEFAULT_VALUE_SCHEMA, + IntStream.range(0, keyCount).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, i))); + + Properties veniceWriterProperties = new Properties(); + veniceWriterProperties.put(KAFKA_BOOTSTRAP_SERVERS, sharedVenice.getPubSubBrokerWrapper().getAddress()); + + // Set max segment elapsed time to 0 to enforce creating small segments aggressively + veniceWriterProperties.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, "0"); + veniceWriterProperties.putAll( + PubSubBrokerWrapper + .getBrokerDetailsForClients(Collections.singletonList(sharedVenice.getPubSubBrokerWrapper()))); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = + sharedVenice.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); + + try (VeniceWriter versionTopicWriter = + TestUtils.getVeniceWriterFactory(veniceWriterProperties, pubSubProducerAdapterFactory) + .createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeKafkaTopic(storeName, 1)).build())) { + + InternalAvroSpecificSerializer serializer = + AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getSerializer(); + + GlobalRtDivState state = createGlobalRtDivState("localhost:9090", false); + versionTopicWriter + .sendGlobalRtDivMessage( + 0, + "NonChunkedKey".getBytes(), + ByteUtils.extractByteArray(serializer.serialize(state))) + .get(); + LOGGER.info("Sent non-chunked div message"); + + state = createGlobalRtDivState("localhost:9092", true); + versionTopicWriter + .sendGlobalRtDivMessage(0, "ChunkedKey".getBytes(), ByteUtils.extractByteArray(serializer.serialize(state))) + .get(); + LOGGER.info("Sent chunked div message"); + + // TODO: Add more verification steps later. + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private GlobalRtDivState createGlobalRtDivState(String srcUrl, boolean isChunked) { + GlobalRtDivState state = new GlobalRtDivState(); + state.producerStates = new HashMap<>(); + state.setSrcUrl(srcUrl); + + final int entryCount = (isChunked) ? 20000 : 1; // Create a large state with 20k entries to trigger chunking + for (int i = 0; i < entryCount; i++) { + byte[] bytes = new byte[256]; + random.nextBytes(bytes); + GUID guid = new GUID(bytes); + state.producerStates.put(guidToUtf8(guid), createProducerPartitionState(i, i)); + } + return state; + } + + private CharSequence guidToUtf8(GUID guid) { + return new Utf8(GuidUtils.getCharSequenceFromGuid(guid)); + } + + private ProducerPartitionState createProducerPartitionState(int segment, int sequence) { + ProducerPartitionState ppState = new ProducerPartitionState(); + ppState.segmentNumber = segment; + ppState.segmentStatus = SegmentStatus.IN_PROGRESS.getValue(); + ppState.messageSequenceNumber = sequence; + ppState.messageTimestamp = System.currentTimeMillis(); + ppState.checksumType = CheckSumType.NONE.getValue(); + ppState.checksumState = ByteBuffer.allocate(0); + ppState.aggregates = new HashMap<>(); + ppState.debugInfo = new HashMap<>(); + return ppState; + } +}