Skip to content

Commit

Permalink
Adds config transformation template for RDS source (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#4946)

* Add rules and templates for rds source

Signed-off-by: Hai Yan <[email protected]>

* Update rds config to prep for template

Signed-off-by: Hai Yan <[email protected]>

* Refactor record converters

Signed-off-by: Hai Yan <[email protected]>

* Add template

Signed-off-by: Hai Yan <[email protected]>

* Remove unused import

Signed-off-by: Hai Yan <[email protected]>

* Address review comments

Signed-off-by: Hai Yan <[email protected]>

* Revert changes on prefix

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Sep 17, 2024
1 parent 72063d6 commit ddd2407
Show file tree
Hide file tree
Showing 27 changed files with 465 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme
private static final String SINK_SUBPIPELINE_PLUGIN_NAME = "pipeline";
private static final String SUBPIPELINE_PATH = "$.source.pipeline";

private static final String S3_BUFFER_PREFIX = "/buffer";

Configuration parseConfigWithJsonNode = Configuration.builder()
.jsonProvider(new JacksonJsonNodeJsonProvider())
Expand Down Expand Up @@ -402,15 +403,28 @@ private boolean isJsonPath(String parameter) {
}

/**
* Specific to DocDB depth field.
* @param s3Prefix
* @return
* Calculate s3 folder scan depth for DocDB source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepth(String s3Prefix) {
return Integer.toString(getDepth(s3Prefix, 4));
}

/**
* Calculate s3 folder scan depth for RDS source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepthForRdsSource(String s3Prefix) {
return Integer.toString(getDepth(s3Prefix, 3));
}

private int getDepth(String s3Prefix, int baseDepth) {
if(s3Prefix == null){
return Integer.toString(4);
return baseDepth;
}
return Integer.toString(s3Prefix.split("/").length + 4);
return s3Prefix.split("/").length + baseDepth;
}

public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
Expand All @@ -421,6 +435,23 @@ public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
return s3Prefix+"/"+envSourceCoordinationIdentifier;
}

/**
* Get the include_prefix in s3 scan source. This is a function specific to RDS source.
* @param s3Prefix: s3 prefix defined in the source configuration
* @return the actual include_prefix
*/
public String getIncludePrefixForRdsSource(String s3Prefix) {
String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
if (s3Prefix == null && envSourceCoordinationIdentifier == null) {
return S3_BUFFER_PREFIX;
} else if (s3Prefix == null) {
return envSourceCoordinationIdentifier + S3_BUFFER_PREFIX;
} else if (envSourceCoordinationIdentifier == null) {
return s3Prefix + S3_BUFFER_PREFIX;
}
return s3Prefix + "/" + envSourceCoordinationIdentifier + S3_BUFFER_PREFIX;
}

public String getAccountIdFromRole(final String roleArn) {
return Arn.fromString(roleArn).accountId().orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.rds.RdsClient;
import software.amazon.awssdk.services.s3.S3Client;

public class ClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;
private final AwsCredentialsProvider awsCredentialsProvider;
private final RdsSourceConfig sourceConfig;

public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig) {
final RdsSourceConfig sourceConfig) {
awsAuthenticationConfig = sourceConfig.getAwsAuthenticationConfig();
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
this.awsAuthenticationConfig = awsAuthenticationConfig;
this.sourceConfig = sourceConfig;
}

public RdsClient buildRdsClient() {
Expand All @@ -36,8 +39,16 @@ public RdsClient buildRdsClient() {

public S3Client buildS3Client() {
return S3Client.builder()
.region(awsAuthenticationConfig.getAwsRegion())
.region(getS3ClientRegion())
.credentialsProvider(awsCredentialsProvider)
.build();
}

private Region getS3ClientRegion() {
if (sourceConfig.getS3Region() != null) {
return sourceConfig.getS3Region();
}

return awsAuthenticationConfig.getAwsRegion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class RdsService {
* Maximum concurrent data loader per node
*/
public static final int DATA_LOADER_MAX_JOB_COUNT = 1;
public static final String S3_PATH_DELIMITER = "/";

private final RdsClient rdsClient;
private final S3Client s3Client;
Expand Down Expand Up @@ -88,8 +89,9 @@ public void start(Buffer<Record<Event>> buffer) {
final RdsApiStrategy rdsApiStrategy = sourceConfig.isCluster() ?
new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient);
final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier());
final String s3PathPrefix = getS3PathPrefix();
leaderScheduler = new LeaderScheduler(
sourceCoordinator, sourceConfig, getSchemaManager(sourceConfig, dbMetadata), dbMetadata);
sourceCoordinator, sourceConfig, s3PathPrefix, getSchemaManager(sourceConfig, dbMetadata), dbMetadata);
runnableList.add(leaderScheduler);

if (sourceConfig.isExportEnabled()) {
Expand All @@ -98,7 +100,7 @@ public void start(Buffer<Record<Event>> buffer) {
exportScheduler = new ExportScheduler(
sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics);
dataFileScheduler = new DataFileScheduler(
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager);
sourceCoordinator, sourceConfig, s3PathPrefix, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(exportScheduler);
runnableList.add(dataFileScheduler);
}
Expand All @@ -111,7 +113,7 @@ public void start(Buffer<Record<Event>> buffer) {
binaryLogClient.setSSLMode(SSLMode.DISABLED);
}
streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager);
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(streamScheduler);
}

Expand Down Expand Up @@ -149,4 +151,21 @@ private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
}

private String getS3PathPrefix() {
final String s3UserPathPrefix;
if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) {
s3UserPathPrefix = sourceConfig.getS3Prefix();
} else {
s3UserPathPrefix = "";
}

final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix();
} else {
s3PathPrefix = s3UserPathPrefix;
}
return s3PathPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public RdsSource(final PluginMetrics pluginMetrics,
this.eventFactory = eventFactory;
this.acknowledgementSetManager = acknowledgementSetManager;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig;
import software.amazon.awssdk.regions.Region;

import java.time.Duration;
import java.util.List;
Expand All @@ -21,6 +24,7 @@
* Configuration for RDS Source
*/
public class RdsSourceConfig {
private static final int DEFAULT_S3_FOLDER_PARTITION_COUNT = 100;

/**
* Identifier for RDS instance/cluster or Aurora cluster
Expand Down Expand Up @@ -72,6 +76,14 @@ public class RdsSourceConfig {
@JsonProperty("s3_region")
private String s3Region;

/**
* The number of folder partitions in S3 buffer
*/
@JsonProperty("partition_count")
@Min(1)
@Max(1000)
private int s3FolderPartitionCount = DEFAULT_S3_FOLDER_PARTITION_COUNT;

@JsonProperty("export")
@Valid
private ExportConfig exportConfig;
Expand Down Expand Up @@ -132,8 +144,12 @@ public String getS3Prefix() {
return s3Prefix;
}

public String getS3Region() {
return s3Region;
public Region getS3Region() {
return s3Region != null ? Region.of(s3Region) : null;
}

public int getPartitionCount() {
return s3FolderPartitionCount;
}

public ExportConfig getExport() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ public class ExportConfig {
@NotNull
private String kmsKeyId;

/**
* The ARN of the IAM role that will be passed to RDS for export.
*/
@JsonProperty("iam_role_arn")
@NotNull
private String iamRoleArn;

public String getKmsKeyId() {
return kmsKeyId;
}

public String getIamRoleArn() {
return iamRoleArn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,14 @@

public class StreamConfig {

private static final int DEFAULT_S3_FOLDER_PARTITION_COUNT = 100;
private static final int DEFAULT_NUM_WORKERS = 1;

@JsonProperty("partition_count")
@Min(1)
@Max(1000)
private int s3FolderPartitionCount = DEFAULT_S3_FOLDER_PARTITION_COUNT;

// TODO: Restricted max to 1 here until we have a proper method to process the stream events in parallel.
@JsonProperty("workers")
@Min(1)
@Max(1000)
@Max(1)
private int numWorkers = DEFAULT_NUM_WORKERS;

public int getPartitionCount() {
return s3FolderPartitionCount;
}

public int getNumWorkers() {
return numWorkers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,14 @@

package org.opensearch.dataprepper.plugins.source.rds.converter;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExportRecordConverter extends RecordConverter {

import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;

public class ExportRecordConverter {

private static final Logger LOG = LoggerFactory.getLogger(ExportRecordConverter.class);

static final String EXPORT_EVENT_TYPE = "EXPORT";

public Event convert(final Record<Event> record,
final String databaseName,
final String tableName,
final List<String> primaryKeys,
final long eventCreateTimeEpochMillis,
final long eventVersionNumber) {
Event event = record.getData();

EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName);
eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName);
eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, OpenSearchBulkActions.INDEX.toString());
eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, EXPORT_EVENT_TYPE);

final String primaryKeyValue = primaryKeys.stream()
.map(key -> event.get(key, String.class))
.collect(Collectors.joining("|"));
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue);

eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreateTimeEpochMillis);
eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber);
public ExportRecordConverter(final String s3Prefix, final int partitionCount) {
super(s3Prefix, partitionCount);
}

return event;
@Override
String getIngestionType() {
return EXPORT_INGESTION_TYPE;
}
}
Loading

0 comments on commit ddd2407

Please sign in to comment.