diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index ac7a09c4e6..9b72042eb4 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -77,6 +77,7 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme private static final String SINK_SUBPIPELINE_PLUGIN_NAME = "pipeline"; private static final String SUBPIPELINE_PATH = "$.source.pipeline"; + private static final String S3_BUFFER_PREFIX = "/buffer"; Configuration parseConfigWithJsonNode = Configuration.builder() .jsonProvider(new JacksonJsonNodeJsonProvider()) @@ -402,15 +403,28 @@ private boolean isJsonPath(String parameter) { } /** - * Specific to DocDB depth field. - * @param s3Prefix - * @return + * Calculate s3 folder scan depth for DocDB source pipeline + * @param s3Prefix: s3 prefix defined in the source configuration + * @return s3 folder scan depth */ public String calculateDepth(String s3Prefix) { + return Integer.toString(getDepth(s3Prefix, 4)); + } + + /** + * Calculate s3 folder scan depth for RDS source pipeline + * @param s3Prefix: s3 prefix defined in the source configuration + * @return s3 folder scan depth + */ + public String calculateDepthForRdsSource(String s3Prefix) { + return Integer.toString(getDepth(s3Prefix, 3)); + } + + private int getDepth(String s3Prefix, int baseDepth) { if(s3Prefix == null){ - return Integer.toString(4); + return baseDepth; } - return Integer.toString(s3Prefix.split("/").length + 4); + return s3Prefix.split("/").length + baseDepth; } public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ @@ -421,6 +435,23 @@ public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ return s3Prefix+"/"+envSourceCoordinationIdentifier; } + /** + * Get the include_prefix in s3 scan source. This is a function specific to RDS source. + * @param s3Prefix: s3 prefix defined in the source configuration + * @return the actual include_prefix + */ + public String getIncludePrefixForRdsSource(String s3Prefix) { + String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + if (s3Prefix == null && envSourceCoordinationIdentifier == null) { + return S3_BUFFER_PREFIX; + } else if (s3Prefix == null) { + return envSourceCoordinationIdentifier + S3_BUFFER_PREFIX; + } else if (envSourceCoordinationIdentifier == null) { + return s3Prefix + S3_BUFFER_PREFIX; + } + return s3Prefix + "/" + envSourceCoordinationIdentifier + S3_BUFFER_PREFIX; + } + public String getAccountIdFromRole(final String roleArn) { return Arn.fromString(roleArn).accountId().orElse(null); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java index 7831754f0f..9de211ac6d 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java @@ -9,22 +9,25 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.rds.RdsClient; import software.amazon.awssdk.services.s3.S3Client; public class ClientFactory { - private final AwsCredentialsProvider awsCredentialsProvider; private final AwsAuthenticationConfig awsAuthenticationConfig; + private final AwsCredentialsProvider awsCredentialsProvider; + private final RdsSourceConfig sourceConfig; public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, - final AwsAuthenticationConfig awsAuthenticationConfig) { + final RdsSourceConfig sourceConfig) { + awsAuthenticationConfig = sourceConfig.getAwsAuthenticationConfig(); awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() .withRegion(awsAuthenticationConfig.getAwsRegion()) .withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn()) .withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId()) .withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides()) .build()); - this.awsAuthenticationConfig = awsAuthenticationConfig; + this.sourceConfig = sourceConfig; } public RdsClient buildRdsClient() { @@ -36,8 +39,16 @@ public RdsClient buildRdsClient() { public S3Client buildS3Client() { return S3Client.builder() - .region(awsAuthenticationConfig.getAwsRegion()) + .region(getS3ClientRegion()) .credentialsProvider(awsCredentialsProvider) .build(); } + + private Region getS3ClientRegion() { + if (sourceConfig.getS3Region() != null) { + return sourceConfig.getS3Region(); + } + + return awsAuthenticationConfig.getAwsRegion(); + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 0431b39372..97d07c0fa7 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -44,6 +44,7 @@ public class RdsService { * Maximum concurrent data loader per node */ public static final int DATA_LOADER_MAX_JOB_COUNT = 1; + public static final String S3_PATH_DELIMITER = "/"; private final RdsClient rdsClient; private final S3Client s3Client; @@ -88,8 +89,9 @@ public void start(Buffer> buffer) { final RdsApiStrategy rdsApiStrategy = sourceConfig.isCluster() ? new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient); final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier()); + final String s3PathPrefix = getS3PathPrefix(); leaderScheduler = new LeaderScheduler( - sourceCoordinator, sourceConfig, getSchemaManager(sourceConfig, dbMetadata), dbMetadata); + sourceCoordinator, sourceConfig, s3PathPrefix, getSchemaManager(sourceConfig, dbMetadata), dbMetadata); runnableList.add(leaderScheduler); if (sourceConfig.isExportEnabled()) { @@ -98,7 +100,7 @@ public void start(Buffer> buffer) { exportScheduler = new ExportScheduler( sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics); dataFileScheduler = new DataFileScheduler( - sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager); + sourceCoordinator, sourceConfig, s3PathPrefix, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager); runnableList.add(exportScheduler); runnableList.add(dataFileScheduler); } @@ -111,7 +113,7 @@ public void start(Buffer> buffer) { binaryLogClient.setSSLMode(SSLMode.DISABLED); } streamScheduler = new StreamScheduler( - sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager); + sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager); runnableList.add(streamScheduler); } @@ -149,4 +151,21 @@ private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final sourceConfig.isTlsEnabled()); return new SchemaManager(connectionManager); } + + private String getS3PathPrefix() { + final String s3UserPathPrefix; + if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) { + s3UserPathPrefix = sourceConfig.getS3Prefix(); + } else { + s3UserPathPrefix = ""; + } + + final String s3PathPrefix; + if (sourceCoordinator.getPartitionPrefix() != null ) { + s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix(); + } else { + s3PathPrefix = s3UserPathPrefix; + } + return s3PathPrefix; + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java index 5c2b08fb5c..b338ae8e15 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java @@ -51,7 +51,7 @@ public RdsSource(final PluginMetrics pluginMetrics, this.eventFactory = eventFactory; this.acknowledgementSetManager = acknowledgementSetManager; - clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig()); + clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig); } @Override diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java index 65a65a4fde..28854b317d 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java @@ -7,12 +7,15 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig; import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig; +import software.amazon.awssdk.regions.Region; import java.time.Duration; import java.util.List; @@ -21,6 +24,7 @@ * Configuration for RDS Source */ public class RdsSourceConfig { + private static final int DEFAULT_S3_FOLDER_PARTITION_COUNT = 100; /** * Identifier for RDS instance/cluster or Aurora cluster @@ -72,6 +76,14 @@ public class RdsSourceConfig { @JsonProperty("s3_region") private String s3Region; + /** + * The number of folder partitions in S3 buffer + */ + @JsonProperty("partition_count") + @Min(1) + @Max(1000) + private int s3FolderPartitionCount = DEFAULT_S3_FOLDER_PARTITION_COUNT; + @JsonProperty("export") @Valid private ExportConfig exportConfig; @@ -132,8 +144,12 @@ public String getS3Prefix() { return s3Prefix; } - public String getS3Region() { - return s3Region; + public Region getS3Region() { + return s3Region != null ? Region.of(s3Region) : null; + } + + public int getPartitionCount() { + return s3FolderPartitionCount; } public ExportConfig getExport() { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/ExportConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/ExportConfig.java index b3bdedcaef..161e068559 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/ExportConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/ExportConfig.java @@ -14,7 +14,18 @@ public class ExportConfig { @NotNull private String kmsKeyId; + /** + * The ARN of the IAM role that will be passed to RDS for export. + */ + @JsonProperty("iam_role_arn") + @NotNull + private String iamRoleArn; + public String getKmsKeyId() { return kmsKeyId; } + + public String getIamRoleArn() { + return iamRoleArn; + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java index f2b5af11a1..f2fcd474d6 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java @@ -11,23 +11,14 @@ public class StreamConfig { - private static final int DEFAULT_S3_FOLDER_PARTITION_COUNT = 100; private static final int DEFAULT_NUM_WORKERS = 1; - @JsonProperty("partition_count") - @Min(1) - @Max(1000) - private int s3FolderPartitionCount = DEFAULT_S3_FOLDER_PARTITION_COUNT; - + // TODO: Restricted max to 1 here until we have a proper method to process the stream events in parallel. @JsonProperty("workers") @Min(1) - @Max(1000) + @Max(1) private int numWorkers = DEFAULT_NUM_WORKERS; - public int getPartitionCount() { - return s3FolderPartitionCount; - } - public int getNumWorkers() { return numWorkers; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java index 428f121788..d571e40f32 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java @@ -5,52 +5,14 @@ package org.opensearch.dataprepper.plugins.source.rds.converter; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; -import org.opensearch.dataprepper.model.record.Record; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +public class ExportRecordConverter extends RecordConverter { -import java.util.List; -import java.util.stream.Collectors; - -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; - -public class ExportRecordConverter { - - private static final Logger LOG = LoggerFactory.getLogger(ExportRecordConverter.class); - - static final String EXPORT_EVENT_TYPE = "EXPORT"; - - public Event convert(final Record record, - final String databaseName, - final String tableName, - final List primaryKeys, - final long eventCreateTimeEpochMillis, - final long eventVersionNumber) { - Event event = record.getData(); - - EventMetadata eventMetadata = event.getMetadata(); - eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName); - eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName); - eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, OpenSearchBulkActions.INDEX.toString()); - eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, EXPORT_EVENT_TYPE); - - final String primaryKeyValue = primaryKeys.stream() - .map(key -> event.get(key, String.class)) - .collect(Collectors.joining("|")); - eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue); - - eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreateTimeEpochMillis); - eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber); + public ExportRecordConverter(final String s3Prefix, final int partitionCount) { + super(s3Prefix, partitionCount); + } - return event; + @Override + String getIngestionType() { + return EXPORT_INGESTION_TYPE; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/RecordConverter.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/RecordConverter.java new file mode 100644 index 0000000000..8c7dcdb996 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/RecordConverter.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +import com.github.shyiko.mysql.binlog.event.EventType; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; + +public abstract class RecordConverter { + + private final String s3Prefix; + private final List folderNames; + + static final String S3_BUFFER_PREFIX = "buffer"; + static final String S3_PATH_DELIMITER = "/"; + static final String EXPORT_INGESTION_TYPE = "EXPORT"; + static final String STREAM_INGESTION_TYPE = "STREAM"; + + + public RecordConverter(final String s3Prefix, final int partitionCount) { + this.s3Prefix = s3Prefix; + S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(partitionCount); + folderNames = s3PartitionCreator.createPartitions(); + } + + public Event convert(final Event event, + final String databaseName, + final String tableName, + final OpenSearchBulkActions bulkAction, + final List primaryKeys, + final long eventCreateTimeEpochMillis, + final long eventVersionNumber, + final EventType eventType) { + + EventMetadata eventMetadata = event.getMetadata(); + + // Only set external origination time for stream events, not export + if (STREAM_INGESTION_TYPE.equals(getIngestionType())) { + final Instant externalOriginationTime = Instant.ofEpochMilli(eventCreateTimeEpochMillis); + event.getEventHandle().setExternalOriginationTime(externalOriginationTime); + eventMetadata.setExternalOriginationTime(externalOriginationTime); + } + + eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName); + eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName); + eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, bulkAction.toString()); + setIngestionTypeMetadata(event); + if (eventType != null) { + eventMetadata.setAttribute(CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE, eventType.toString()); + } + + final String primaryKeyValue = primaryKeys.stream() + .map(key -> event.get(key, String.class)) + .collect(Collectors.joining("|")); + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue); + + final String s3PartitionKey = s3Prefix + S3_PATH_DELIMITER + S3_BUFFER_PREFIX + S3_PATH_DELIMITER + hashKeyToPartition(primaryKeyValue); + eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3PartitionKey); + + eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreateTimeEpochMillis); + eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber); + + return event; + } + + abstract String getIngestionType(); + + private void setIngestionTypeMetadata(final Event event) { + event.getMetadata().setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, getIngestionType()); + } + + private String hashKeyToPartition(final String key) { + return folderNames.get(hashKeyToIndex(key)); + } + + private int hashKeyToIndex(final String key) { + try { + // Create a SHA-256 hash instance + final MessageDigest digest = MessageDigest.getInstance("SHA-256"); + // Hash the key + byte[] hashBytes = digest.digest(key.getBytes()); + // Convert the hash to an integer + int hashValue = bytesToInt(hashBytes); + // Map the hash value to an index in the list + return Math.abs(hashValue) % folderNames.size(); + } catch (final NoSuchAlgorithmException e) { + return -1; + } + } + + private int bytesToInt(byte[] bytes) { + return ByteBuffer.wrap(bytes).getInt(); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java index cc1f897bea..76ad9eb89d 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java @@ -5,107 +5,17 @@ package org.opensearch.dataprepper.plugins.source.rds.converter; -import com.github.shyiko.mysql.binlog.event.EventType; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.time.Instant; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; - /** * Convert binlog row data into JacksonEvent */ -public class StreamRecordConverter { - - private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class); - - private final List folderNames; +public class StreamRecordConverter extends RecordConverter { - static final String S3_PATH_DELIMITER = "/"; - - static final String STREAM_EVENT_TYPE = "STREAM"; - - public StreamRecordConverter(final int partitionCount) { - S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(partitionCount); - folderNames = s3PartitionCreator.createPartitions(); + public StreamRecordConverter(final String s3Prefix, final int partitionCount) { + super(s3Prefix, partitionCount); } - public Event convert(final Map rowData, - final String databaseName, - final String tableName, - final EventType eventType, - final OpenSearchBulkActions bulkAction, - final List primaryKeys, - final String s3Prefix, - final long eventCreateTimeEpochMillis, - final long eventVersionNumber) { - final Event event = JacksonEvent.builder() - .withEventType("event") - .withData(rowData) - .build(); - - EventMetadata eventMetadata = event.getMetadata(); - - // Only set external origination time for stream events, not export - final Instant externalOriginationTime = Instant.ofEpochMilli(eventCreateTimeEpochMillis); - event.getEventHandle().setExternalOriginationTime(externalOriginationTime); - eventMetadata.setExternalOriginationTime(externalOriginationTime); - - eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName); - eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName); - eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, bulkAction.toString()); - eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, STREAM_EVENT_TYPE); - eventMetadata.setAttribute(CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE, eventType.toString()); - - final String primaryKeyValue = primaryKeys.stream() - .map(rowData::get) - .map(String::valueOf) - .collect(Collectors.joining("|")); - eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue); - eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3Prefix + S3_PATH_DELIMITER + hashKeyToPartition(primaryKeyValue)); - - eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreateTimeEpochMillis); - eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber); - - return event; - } - - private String hashKeyToPartition(final String key) { - return folderNames.get(hashKeyToIndex(key)); - } - private int hashKeyToIndex(final String key) { - try { - // Create a SHA-256 hash instance - final MessageDigest digest = MessageDigest.getInstance("SHA-256"); - // Hash the key - byte[] hashBytes = digest.digest(key.getBytes()); - // Convert the hash to an integer - int hashValue = bytesToInt(hashBytes); - // Map the hash value to an index in the list - return Math.abs(hashValue) % folderNames.size(); - } catch (final NoSuchAlgorithmException e) { - return -1; - } - } - private int bytesToInt(byte[] bytes) { - return ByteBuffer.wrap(bytes).getInt(); + @Override + String getIngestionType() { + return STREAM_INGESTION_TYPE; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index 64c613bc43..5e0fe9ecf3 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; @@ -121,12 +122,14 @@ public void run() { final long snapshotTime = progressState.getSnapshotTime(); final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis(); final Event transformedEvent = recordConverter.convert( - record, + event, progressState.getSourceDatabase(), progressState.getSourceTable(), + OpenSearchBulkActions.INDEX, primaryKeys, snapshotTime, - eventVersionNumber); + eventVersionNumber, + null); if (acknowledgementSet != null) { acknowledgementSet.add(transformedEvent); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java index 33c17d9d80..00af2b7db7 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java @@ -74,6 +74,7 @@ public class DataFileScheduler implements Runnable { public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, + final String s3Prefix, final S3Client s3Client, final EventFactory eventFactory, final Buffer> buffer, @@ -83,7 +84,7 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, this.sourceConfig = sourceConfig; codec = new ParquetInputCodec(eventFactory); objectReader = new S3ObjectReader(s3Client); - recordConverter = new ExportRecordConverter(); + recordConverter = new ExportRecordConverter(s3Prefix, sourceConfig.getPartitionCount()); executor = Executors.newFixedThreadPool(DATA_LOADER_MAX_JOB_COUNT); this.buffer = buffer; this.pluginMetrics = pluginMetrics; diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index 4fc00de3a5..33861993a8 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -27,13 +27,17 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.opensearch.dataprepper.plugins.source.rds.RdsService.S3_PATH_DELIMITER; + public class LeaderScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class); private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3; private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1); + private static final String S3_EXPORT_PREFIX = "rds-export"; private final EnhancedSourceCoordinator sourceCoordinator; private final RdsSourceConfig sourceConfig; + private final String s3Prefix; private final SchemaManager schemaManager; private final DbMetadata dbMetadata; @@ -42,10 +46,12 @@ public class LeaderScheduler implements Runnable { public LeaderScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, + final String s3Prefix, final SchemaManager schemaManager, final DbMetadata dbMetadata) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; + this.s3Prefix = s3Prefix; this.schemaManager = schemaManager; this.dbMetadata = dbMetadata; } @@ -127,9 +133,10 @@ private void init() { private void createExportPartition(RdsSourceConfig sourceConfig) { ExportProgressState progressState = new ExportProgressState(); - progressState.setIamRoleArn(sourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn()); + progressState.setIamRoleArn(sourceConfig.getExport().getIamRoleArn()); progressState.setBucket(sourceConfig.getS3Bucket()); - progressState.setPrefix(sourceConfig.getS3Prefix()); + // This prefix is for data exported from RDS + progressState.setPrefix(getS3PrefixForExport(s3Prefix)); progressState.setTables(sourceConfig.getTableNames()); progressState.setKmsKeyId(sourceConfig.getExport().getKmsKeyId()); progressState.setPrimaryKeyMap(getPrimaryKeyMap()); @@ -137,6 +144,10 @@ private void createExportPartition(RdsSourceConfig sourceConfig) { sourceCoordinator.createPartition(exportPartition); } + private String getS3PrefixForExport(final String givenS3Prefix) { + return givenS3Prefix + S3_PATH_DELIMITER + S3_EXPORT_PREFIX; + } + private Map> getPrimaryKeyMap() { return sourceConfig.getTableNames().stream() .collect(Collectors.toMap( diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java index feba4555b2..dab6cc8d40 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java @@ -5,12 +5,16 @@ package org.opensearch.dataprepper.plugins.source.rds.model; +import java.util.Arrays; +import java.util.stream.Collectors; + /** * Represents the object key for an object exported to S3 by RDS. * The object key has this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}" */ public class ExportObjectKey { + static final String S3_PATH_DELIMITER = "/"; private final String prefix; private final String exportTaskId; private final String databaseName; @@ -29,18 +33,21 @@ public class ExportObjectKey { public static ExportObjectKey fromString(final String objectKeyString) { - final String[] parts = objectKeyString.split("/"); - if (parts.length != 6) { + final String[] parts = objectKeyString.split(S3_PATH_DELIMITER); + if (parts.length < 5) { throw new IllegalArgumentException("Export object key is not valid: " + objectKeyString); } - final String prefix = parts[0]; - final String exportTaskId = parts[1]; - final String databaseName = parts[2]; + + final String prefix = Arrays.stream(parts, 0, parts.length - 5) + .collect(Collectors.joining(S3_PATH_DELIMITER)); + final String exportTaskId = parts[parts.length - 5]; + final String databaseName = parts[parts.length - 4]; // fullTableName is in the format of "databaseName.tableName" - final String fullTableName = parts[3]; + final String fullTableName = parts[parts.length - 3]; final String tableName = fullTableName.split("\\.")[1]; - final String numberedFolder = parts[4]; - final String fileName = parts[5]; + final String numberedFolder = parts[parts.length - 2]; + final String fileName = parts[parts.length - 1]; + return new ExportObjectKey(prefix, exportTaskId, databaseName, tableName, numberedFolder, fileName); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index 33defb42b7..cd12711f27 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -24,6 +24,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; @@ -51,6 +52,7 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + static final String DATA_PREPPER_EVENT_TYPE = "event"; static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed"; static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors"; static final String BYTES_RECEIVED = "bytesReceived"; @@ -86,6 +88,7 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { public BinlogEventListener(final Buffer> buffer, final RdsSourceConfig sourceConfig, + final String s3Prefix, final PluginMetrics pluginMetrics, final BinaryLogClient binaryLogClient, final StreamCheckpointer streamCheckpointer, @@ -93,8 +96,8 @@ public BinlogEventListener(final Buffer> buffer, this.buffer = buffer; this.binaryLogClient = binaryLogClient; tableMetadataMap = new HashMap<>(); - recordConverter = new StreamRecordConverter(sourceConfig.getStream().getPartitionCount()); - s3Prefix = sourceConfig.getS3Prefix(); + recordConverter = new StreamRecordConverter(s3Prefix, sourceConfig.getPartitionCount()); + this.s3Prefix = s3Prefix; tableNames = sourceConfig.getTableNames(); isAcknowledgmentsEnabled = sourceConfig.isAcknowledgmentsEnabled(); this.pluginMetrics = pluginMetrics; @@ -244,16 +247,20 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve rowDataMap.put(columnNames.get(i), rowDataArray[i]); } + final Event dataPrepperEvent = JacksonEvent.builder() + .withEventType(DATA_PREPPER_EVENT_TYPE) + .withData(rowDataMap) + .build(); + final Event pipelineEvent = recordConverter.convert( - rowDataMap, + dataPrepperEvent, tableMetadata.getDatabaseName(), tableMetadata.getTableName(), - event.getHeader().getEventType(), bulkAction, primaryKeys, - s3Prefix, eventTimestampMillis, - eventTimestampMillis); + eventTimestampMillis, + event.getHeader().getEventType()); pipelineEvents.add(pipelineEvent); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java index 14d61e6626..d7ddcdf8f8 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -33,6 +33,7 @@ public class StreamScheduler implements Runnable { private final EnhancedSourceCoordinator sourceCoordinator; private final RdsSourceConfig sourceConfig; + private final String s3Prefix; private final BinaryLogClient binaryLogClient; private final Buffer> buffer; private final PluginMetrics pluginMetrics; @@ -43,12 +44,14 @@ public class StreamScheduler implements Runnable { public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, + final String s3Prefix, final BinaryLogClient binaryLogClient, final Buffer> buffer, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; + this.s3Prefix = s3Prefix; this.binaryLogClient = binaryLogClient; this.buffer = buffer; this.pluginMetrics = pluginMetrics; @@ -74,7 +77,7 @@ public void run() { streamPartition = (StreamPartition) sourcePartition.get(); final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics); binaryLogClient.registerEventListener(new BinlogEventListener( - buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager)); + buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager)); final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics); executorService.submit(() -> streamWorker.processStream((StreamPartition) sourcePartition.get())); } diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml new file mode 100644 index 0000000000..f13f6b6fc4 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml @@ -0,0 +1,3 @@ +plugin_name: "rds" +apply_when: + - "$..source.rds" \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/templates/rds-template.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/templates/rds-template.yaml new file mode 100644 index 0000000000..b439068cad --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/templates/rds-template.yaml @@ -0,0 +1,81 @@ +"<>": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + rds: "<<$.<>.source.rds>>" + routes: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: + region: "<<$.<>.source.rds.s3_region>>" + sts_role_arn: "<<$.<>.source.rds.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.rds.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.rds.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.rds.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.rds.aws.sts_role_arn>>" + - s3: + routes: + - stream_load + aws: + region: "<<$.<>.source.rds.s3_region>>" + sts_role_arn: "<<$.<>.source.rds.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.rds.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.rds.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.rds.s3_bucket>>" + threshold: + event_collect_timeout: "15s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.rds.aws.sts_role_arn>>" +"<>-s3": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + s3: + codec: + event_json: + compression: "none" + aws: + region: "<<$.<>.source.rds.s3_region>>" + sts_role_arn: "<<$.<>.source.rds.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.rds.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.rds.aws.sts_header_overrides>>" + acknowledgments: true + delete_s3_objects_on_read: true + disable_s3_metadata_in_event: true + scan: + folder_partitions: + depth: "<>.source.rds.s3_prefix>>" + max_objects_per_ownership: 50 + buckets: + - bucket: + name: "<<$.<>.source.rds.s3_bucket>>" + filter: + include_prefix: ["<>.source.rds.s3_prefix>>"] + scheduling: + interval: "60s" + processor: "<<$.<>.processor>>" + sink: "<<$.<>.sink>>" + routes: "<<$.<>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel. \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java index 0a814e7fc1..9a658b867b 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; import org.mockito.Mock; +import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -34,14 +35,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.RdsService.S3_PATH_DELIMITER; @ExtendWith(MockitoExtension.class) class RdsServiceTest { @@ -131,12 +136,21 @@ void test_normal_service_start_when_stream_is_enabled() { when(sourceConfig.getAuthenticationConfig()).thenReturn(authConfig); when(sourceConfig.getTlsConfig()).thenReturn(mock(TlsConfig.class)); + final String s3Prefix = UUID.randomUUID().toString(); + final String partitionPrefix = UUID.randomUUID().toString(); + when(sourceConfig.getS3Prefix()).thenReturn(s3Prefix); + when(sourceCoordinator.getPartitionPrefix()).thenReturn(partitionPrefix); + final RdsService rdsService = createObjectUnderTest(); - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { + final String[] s3PrefixArray = new String[1]; + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedConstruction leaderSchedulerMockedConstruction = mockConstruction(LeaderScheduler.class, + (mock, context) -> s3PrefixArray[0] = (String) context.arguments().get(2))) { executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executor); rdsService.start(buffer); } + assertThat(s3PrefixArray[0], equalTo(s3Prefix + S3_PATH_DELIMITER + partitionPrefix)); verify(executor).submit(any(LeaderScheduler.class)); verify(executor).submit(any(StreamScheduler.class)); verify(executor, never()).submit(any(ExportScheduler.class)); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java index 564fde166b..027138c85a 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java @@ -13,7 +13,6 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; -import org.opensearch.dataprepper.model.record.Record; import java.util.List; import java.util.Map; @@ -22,25 +21,33 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; -import static org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter.EXPORT_EVENT_TYPE; +import static org.hamcrest.Matchers.startsWith; +import static org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter.EXPORT_INGESTION_TYPE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_S3_PARTITION_KEY; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter.S3_BUFFER_PREFIX; +import static org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter.S3_PATH_DELIMITER; @ExtendWith(MockitoExtension.class) class ExportRecordConverterTest { private Random random; + private String s3Prefix; private ExportRecordConverter exportRecordConverter; @BeforeEach void setUp() { random = new Random(); + s3Prefix = UUID.randomUUID().toString(); exportRecordConverter = createObjectUnderTest(); } @@ -49,6 +56,7 @@ void test_convert() { final String databaseName = UUID.randomUUID().toString(); final String tableName = UUID.randomUUID().toString(); final String primaryKeyName = UUID.randomUUID().toString(); + final List primaryKeys = List.of(primaryKeyName); final String primaryKeyValue = UUID.randomUUID().toString(); final long eventCreateTimeEpochMillis = random.nextLong(); final long eventVersionNumber = random.nextLong(); @@ -58,24 +66,24 @@ void test_convert() { .withData(Map.of(primaryKeyName, primaryKeyValue)) .build(); - Record testRecord = new Record<>(testEvent); - - ExportRecordConverter exportRecordConverter = new ExportRecordConverter(); Event actualEvent = exportRecordConverter.convert( - testRecord, databaseName, tableName, List.of(primaryKeyName), eventCreateTimeEpochMillis, eventVersionNumber); + testEvent, databaseName, tableName, OpenSearchBulkActions.INDEX, primaryKeys, + eventCreateTimeEpochMillis, eventVersionNumber, null); // Assert assertThat(actualEvent.getMetadata().getAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE), equalTo(databaseName)); assertThat(actualEvent.getMetadata().getAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE), equalTo(tableName)); assertThat(actualEvent.getMetadata().getAttribute(BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); assertThat(actualEvent.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(primaryKeyValue)); - assertThat(actualEvent.getMetadata().getAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE), equalTo(EXPORT_EVENT_TYPE)); + assertThat(actualEvent.getMetadata().getAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE), equalTo(EXPORT_INGESTION_TYPE)); + assertThat(actualEvent.getMetadata().getAttribute(EVENT_S3_PARTITION_KEY).toString(), startsWith(s3Prefix + S3_PATH_DELIMITER + S3_BUFFER_PREFIX + S3_PATH_DELIMITER)); assertThat(actualEvent.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(eventCreateTimeEpochMillis)); assertThat(actualEvent.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(eventVersionNumber)); - assertThat(actualEvent, sameInstance(testRecord.getData())); + assertThat(actualEvent.getMetadata().getAttribute(CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE), nullValue()); + assertThat(actualEvent, sameInstance(testEvent)); } private ExportRecordConverter createObjectUnderTest() { - return new ExportRecordConverter(); + return new ExportRecordConverter(s3Prefix, random.nextInt(1000) + 1); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java index 8b857bcf4f..c33a53e667 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java @@ -8,7 +8,9 @@ import com.github.shyiko.mysql.binlog.event.EventType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import java.util.List; @@ -17,7 +19,9 @@ import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE; @@ -26,7 +30,10 @@ import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter.S3_BUFFER_PREFIX; +import static org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter.STREAM_INGESTION_TYPE; import static org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter.S3_PATH_DELIMITER; @@ -34,11 +41,13 @@ class StreamRecordConverterTest { private StreamRecordConverter streamRecordConverter; private Random random; + private String s3Prefix; @BeforeEach void setUp() { - streamRecordConverter = createObjectUnderTest(); random = new Random(); + s3Prefix = UUID.randomUUID().toString(); + streamRecordConverter = createObjectUnderTest(); } @Test @@ -49,13 +58,17 @@ void test_convert_returns_expected_event() { final EventType eventType = EventType.EXT_WRITE_ROWS; final OpenSearchBulkActions bulkAction = OpenSearchBulkActions.INDEX; final List primaryKeys = List.of("key1"); - final String s3Prefix = UUID.randomUUID().toString(); final long eventCreateTimeEpochMillis = random.nextLong(); final long eventVersionNumber = random.nextLong(); + final Event testEvent = TestEventFactory.getTestEventFactory().eventBuilder(EventBuilder.class) + .withEventType("event") + .withData(rowData) + .build(); + Event event = streamRecordConverter.convert( - rowData, databaseName, tableName, eventType, bulkAction, - primaryKeys, s3Prefix, eventCreateTimeEpochMillis, eventVersionNumber); + testEvent, databaseName, tableName, bulkAction, primaryKeys, + eventCreateTimeEpochMillis, eventVersionNumber, eventType); assertThat(event.toMap(), is(rowData)); assertThat(event.getMetadata().getAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE), is(databaseName)); @@ -63,12 +76,14 @@ void test_convert_returns_expected_event() { assertThat(event.getMetadata().getAttribute(CHANGE_EVENT_TYPE_METADATA_ATTRIBUTE), is(eventType.toString())); assertThat(event.getMetadata().getAttribute(BULK_ACTION_METADATA_ATTRIBUTE), is(bulkAction.toString())); assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), is("value1")); - assertThat(event.getMetadata().getAttribute(EVENT_S3_PARTITION_KEY).toString(), startsWith(s3Prefix + S3_PATH_DELIMITER)); + assertThat(event.getMetadata().getAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE), equalTo(STREAM_INGESTION_TYPE)); + assertThat(event.getMetadata().getAttribute(EVENT_S3_PARTITION_KEY).toString(), startsWith(s3Prefix + S3_PATH_DELIMITER + S3_BUFFER_PREFIX + S3_PATH_DELIMITER)); assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), is(eventCreateTimeEpochMillis)); assertThat(event.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), is(eventVersionNumber)); + assertThat(event, sameInstance(testEvent)); } private StreamRecordConverter createObjectUnderTest() { - return new StreamRecordConverter(new Random().nextInt(1000) + 1); + return new StreamRecordConverter(s3Prefix, random.nextInt(1000) + 1); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java index c470021c6b..656e393cee 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java @@ -130,7 +130,7 @@ void test_run_success() throws Exception { when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); when(event.toJsonString()).thenReturn(randomString); - when(recordConverter.convert(any(), any(), any(), any(), anyLong(), anyLong())).thenReturn(event); + when(recordConverter.convert(any(), any(), any(), any(), any(), anyLong(), anyLong(), any())).thenReturn(event); AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); ParquetReader parquetReader = mock(ParquetReader.class); @@ -180,7 +180,7 @@ void test_flush_failure_then_error_metric_updated() throws Exception { when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); when(event.toJsonString()).thenReturn(randomString); - when(recordConverter.convert(any(), any(), any(), any(), anyLong(), anyLong())).thenReturn(event); + when(recordConverter.convert(any(), any(), any(), any(), any(), anyLong(), anyLong(), any())).thenReturn(event); ParquetReader parquetReader = mock(ParquetReader.class); AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java index f249e0b025..96a45588c8 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java @@ -88,10 +88,12 @@ class DataFileSchedulerTest { private AtomicInteger activeExportS3ObjectConsumersGauge; private Random random; + private String s3Prefix; @BeforeEach void setUp() { random = new Random(); + s3Prefix = UUID.randomUUID().toString(); when(pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT)).thenReturn(exportFileSuccessCounter); when(pluginMetrics.counter(eq(DataFileScheduler.EXPORT_S3_OBJECTS_ERROR_COUNT))).thenReturn(exportFileErrorCounter); when(pluginMetrics.gauge(eq(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE), any(AtomicInteger.class), any())) @@ -192,6 +194,6 @@ void test_shutdown() { } private DataFileScheduler createObjectUnderTest() { - return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager); + return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Prefix, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java index e4b9cb1304..11381c8a23 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java @@ -60,10 +60,12 @@ class LeaderSchedulerTest { @Mock private LeaderProgressState leaderProgressState; + private String s3Prefix; private LeaderScheduler leaderScheduler; @BeforeEach void setUp() { + s3Prefix = UUID.randomUUID().toString(); leaderScheduler = createObjectUnderTest(); AwsAuthenticationConfig awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); @@ -138,6 +140,6 @@ void test_shutDown() { } private LeaderScheduler createObjectUnderTest() { - return new LeaderScheduler(sourceCoordinator, sourceConfig, schemaManager, dbMetadata); + return new LeaderScheduler(sourceCoordinator, sourceConfig, s3Prefix, schemaManager, dbMetadata); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java index 697d721c9b..18a66bd6e2 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java @@ -27,9 +27,35 @@ void test_fromString_with_valid_input_string() { assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); } + @Test + void test_fromString_with_path_with_empty_prefix() { + final String objectKeyString = "export-task-id/db-name/db-name.table-name/1/file-name.parquet"; + final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString); + + assertThat(exportObjectKey.getPrefix(), equalTo("")); + assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id")); + assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name")); + assertThat(exportObjectKey.getTableName(), equalTo("table-name")); + assertThat(exportObjectKey.getNumberedFolder(), equalTo("1")); + assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); + } + + @Test + void test_fromString_with_path_with_multilevel_prefix() { + final String objectKeyString = "prefix1/prefix2/prefix3/export-task-id/db-name/db-name.table-name/1/file-name.parquet"; + final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString); + + assertThat(exportObjectKey.getPrefix(), equalTo("prefix1/prefix2/prefix3")); + assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id")); + assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name")); + assertThat(exportObjectKey.getTableName(), equalTo("table-name")); + assertThat(exportObjectKey.getNumberedFolder(), equalTo("1")); + assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); + } + @Test void test_fromString_with_invalid_input_string() { - final String objectKeyString = "prefix/export-task-id/db-name/table-name/1/"; + final String objectKeyString = "export-task-id/db-name/table-name/1"; Throwable exception = assertThrows(IllegalArgumentException.class, () -> ExportObjectKey.fromString(objectKeyString)); assertThat(exception.getMessage(), containsString("Export object key is not valid: " + objectKeyString)); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index c287ec2f00..27f3fa9037 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -26,6 +26,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -72,12 +73,15 @@ class BinlogEventListenerTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private com.github.shyiko.mysql.binlog.event.Event binlogEvent; + private String s3Prefix; + private BinlogEventListener objectUnderTest; private Timer eventProcessingTimer; @BeforeEach void setUp() { + s3Prefix = UUID.randomUUID().toString(); eventProcessingTimer = Metrics.timer("test-timer"); when(pluginMetrics.timer(REPLICATION_LOG_EVENT_PROCESSING_TIME)).thenReturn(eventProcessingTimer); try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { @@ -136,7 +140,7 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) } private BinlogEventListener createObjectUnderTest() { - return new BinlogEventListener(buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager); + return new BinlogEventListener(buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager); } private void verifyHandlerCallHelper() { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java index 0414495f6c..7b870bbfe1 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,10 +59,12 @@ class StreamSchedulerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + private String s3Prefix; private StreamScheduler objectUnderTest; @BeforeEach void setUp() { + s3Prefix = UUID.randomUUID().toString(); objectUnderTest = createObjectUnderTest(); } @@ -117,6 +120,6 @@ void test_shutdown() { } private StreamScheduler createObjectUnderTest() { - return new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager); + return new StreamScheduler(sourceCoordinator, sourceConfig, s3Prefix, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager); } } \ No newline at end of file