Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue Connections integration Draft #1696

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.aws.cmdb;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -30,6 +31,6 @@ public class AwsCmdbCompositeHandler
{
public AwsCmdbCompositeHandler()
{
super(new AwsCmdbMetadataHandler(System.getenv()), new AwsCmdbRecordHandler(System.getenv()));
super(new AwsCmdbMetadataHandler(GlueConnectionUtils.getGlueConnection()), new AwsCmdbRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.cloudera;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -32,6 +33,6 @@ public class HiveCompositeHandler
{
public HiveCompositeHandler()
{
super(new HiveMetadataHandler(System.getenv()), new HiveRecordHandler(System.getenv()));
super(new HiveMetadataHandler(GlueConnectionUtils.getGlueConnection()), new HiveRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package com.amazonaws.athena.connectors.cloudera;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -33,6 +34,6 @@ public class ImpalaCompositeHandler
{
public ImpalaCompositeHandler()
{
super(new ImpalaMetadataHandler(System.getenv()), new ImpalaRecordHandler(System.getenv()));
super(new ImpalaMetadataHandler(GlueConnectionUtils.getGlueConnection()), new ImpalaRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.cloudwatch.metrics;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -30,6 +31,6 @@ public class MetricsCompositeHandler
{
public MetricsCompositeHandler()
{
super(new MetricsMetadataHandler(System.getenv()), new MetricsRecordHandler(System.getenv()));
super(new MetricsMetadataHandler(GlueConnectionUtils.getGlueConnection()), new MetricsRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.cloudwatch;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -30,6 +31,6 @@ public class CloudwatchCompositeHandler
{
public CloudwatchCompositeHandler()
{
super(new CloudwatchMetadataHandler(System.getenv()), new CloudwatchRecordHandler(System.getenv()));
super(new CloudwatchMetadataHandler(GlueConnectionUtils.getGlueConnection()), new CloudwatchRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.datalakegen2;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -31,6 +32,6 @@ public class DataLakeGen2CompositeHandler extends CompositeHandler
{
public DataLakeGen2CompositeHandler()
{
super(new DataLakeGen2MetadataHandler(System.getenv()), new DataLakeGen2RecordHandler(System.getenv()));
super(new DataLakeGen2MetadataHandler(GlueConnectionUtils.getGlueConnection()), new DataLakeGen2RecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.db2as400;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -31,6 +32,6 @@ public class Db2As400CompositeHandler extends CompositeHandler
{
public Db2As400CompositeHandler()
{
super(new Db2As400MetadataHandler(System.getenv()), new Db2As400RecordHandler(System.getenv()));
super(new Db2As400MetadataHandler(GlueConnectionUtils.getGlueConnection()), new Db2As400RecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.db2;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -31,6 +32,6 @@ public class Db2CompositeHandler extends CompositeHandler
{
public Db2CompositeHandler()
{
super(new Db2MetadataHandler(System.getenv()), new Db2RecordHandler(System.getenv()));
super(new Db2MetadataHandler(GlueConnectionUtils.getGlueConnection()), new Db2RecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.docdb;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -30,6 +31,6 @@ public class DocDBCompositeHandler
{
public DocDBCompositeHandler()
{
super(new DocDBMetadataHandler(System.getenv()), new DocDBRecordHandler(System.getenv()));
super(new DocDBMetadataHandler(GlueConnectionUtils.getGlueConnection()), new DocDBRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.dynamodb;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -30,6 +31,6 @@ public class DynamoDBCompositeHandler
{
public DynamoDBCompositeHandler()
{
super(new DynamoDBMetadataHandler(System.getenv()), new DynamoDBRecordHandler(System.getenv()));
super(new DynamoDBMetadataHandler(GlueConnectionUtils.getGlueConnection()), new DynamoDBRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.elasticsearch;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -30,6 +31,6 @@ public class ElasticsearchCompositeHandler
{
public ElasticsearchCompositeHandler()
{
super(new ElasticsearchMetadataHandler(System.getenv()), new ElasticsearchRecordHandler(System.getenv()));
super(new ElasticsearchMetadataHandler(GlueConnectionUtils.getGlueConnection()), new ElasticsearchRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public ElasticsearchMetadataHandler(java.util.Map<String, String> configOptions)
this.domainMap = domainMapProvider.getDomainMap(resolveSecrets(configOptions.getOrDefault(DOMAIN_MAPPING, "")));
this.clientFactory = new AwsRestHighLevelClientFactory(this.autoDiscoverEndpoint);
this.glueTypeMapper = new ElasticsearchGlueTypeMapper();
this.queryTimeout = Long.parseLong(configOptions.getOrDefault(QUERY_TIMEOUT_CLUSTER, ""));
this.queryTimeout = Long.parseLong(configOptions.getOrDefault(QUERY_TIMEOUT_CLUSTER, "10"));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public ElasticsearchRecordHandler(java.util.Map<String, String> configOptions)

this.typeUtils = new ElasticsearchTypeUtils();
this.clientFactory = new AwsRestHighLevelClientFactory(configOptions.getOrDefault(AUTO_DISCOVER_ENDPOINT, "").equalsIgnoreCase("true"));
this.queryTimeout = Long.parseLong(configOptions.getOrDefault(QUERY_TIMEOUT_SEARCH, ""));
this.queryTimeout = Long.parseLong(configOptions.getOrDefault(QUERY_TIMEOUT_SEARCH, "720"));
this.scrollTimeout = Long.parseLong(configOptions.getOrDefault(SCROLL_TIMEOUT, "60"));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*-
* #%L
* Amazon Athena Query Federation SDK
* %%
* Copyright (C) 2019 - 2023 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connector.lambda;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.AWSGlueClientBuilder;
import com.amazonaws.services.glue.model.Connection;
import com.amazonaws.services.glue.model.GetConnectionRequest;
import com.amazonaws.services.glue.model.GetConnectionResult;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

public class GlueConnectionUtils
{
public static final String DEFAULT_GAMMA_ENDPOINT = "https://glue-gamma.ap-south-2.amazonaws.com";
public static final String DEFAULT_REGION = "ap-south-2";
public static final String DEFAULT_GLUE_CONNECTION = "glue_connection";
private static final int CONNECT_TIMEOUT = 1000;
private static final Logger logger = LoggerFactory.getLogger(GlueConnectionUtils.class);
private static HashMap<String, HashMap<String, String>> connectionNameCache = new HashMap<>();

private GlueConnectionUtils()
{
}

public static Map<String, String> getGlueConnection()
{
HashMap<String, String> envConfig = new HashMap<>(System.getenv());

String glueConnectionName = envConfig.get(DEFAULT_GLUE_CONNECTION);
if (glueConnectionName != null) {
HashMap<String, String> cachedConfig = connectionNameCache.get(glueConnectionName);
if (cachedConfig == null) {
try {
HashMap<String, HashMap<String, String>> athenaDriverPropertiesToMap = new HashMap<String, HashMap<String, String>>();

AWSGlue awsGlue = AWSGlueClientBuilder.standard().withEndpointConfiguration(new AWSGlueClientBuilder.EndpointConfiguration(DEFAULT_GAMMA_ENDPOINT, DEFAULT_REGION)).withClientConfiguration(new ClientConfiguration().withConnectionTimeout(CONNECT_TIMEOUT)).build();
GetConnectionRequest getGlueConnectionRequest = new GetConnectionRequest().withName(glueConnectionName);
GetConnectionResult glueConnection = awsGlue.getConnection(getGlueConnectionRequest);
Connection connection = glueConnection.getConnection();
String athenaDriverPropertiesAsString = connection.getConnectionProperties().get("AthenaDriverProperties");
try {
ObjectMapper mapper = new ObjectMapper();
athenaDriverPropertiesToMap = mapper.readValue(athenaDriverPropertiesAsString, new TypeReference<HashMap>(){});
}
catch (Exception err) {
logger.error("Error Parsing AthenaDriverProperties JSON to Map", err.toString());
}
String[] propertySubsets = {"federationSdkProperties", "driverProperties"};
for (String subset : propertySubsets) {
envConfig.putAll(athenaDriverPropertiesToMap.get(subset));
}
connectionNameCache.put(glueConnectionName, envConfig);
}
catch (Exception err) {
logger.debug("Error thrown during fetching of {} connection properties.", glueConnectionName);
logger.debug(err.toString());
throw new RuntimeException("Error thrown during fetching of " + glueConnectionName + " connection properties.");
}
}else{
return cachedConfig;
}
}
else {
logger.debug("No Glue Connection name was defined in Environment Variables.");
}
return envConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.hbase;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -30,6 +31,6 @@ public class HbaseCompositeHandler
{
public HbaseCompositeHandler()
{
super(new HbaseMetadataHandler(System.getenv()), new HbaseRecordHandler(System.getenv()));
super(new HbaseMetadataHandler(GlueConnectionUtils.getGlueConnection()), new HbaseRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.hortonworks;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;

/**
Expand All @@ -32,6 +33,6 @@ public class HiveCompositeHandler
{
public HiveCompositeHandler()
{
super(new HiveMetadataHandler(System.getenv()), new HiveRecordHandler(System.getenv()));
super(new HiveMetadataHandler(GlueConnectionUtils.getGlueConnection()), new HiveRecordHandler(GlueConnectionUtils.getGlueConnection()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.jdbc;

import com.amazonaws.athena.connector.lambda.GlueConnectionUtils;
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcRecordHandler;
Expand All @@ -43,10 +44,10 @@ public MultiplexingJdbcCompositeHandler(
{
super(
hasCatalogConnections ?
muxMetadataHandlerClass.getConstructor(java.util.Map.class).newInstance(System.getenv()) :
metadataHandlerClass.getConstructor(java.util.Map.class).newInstance(System.getenv()),
muxMetadataHandlerClass.getConstructor(java.util.Map.class).newInstance(GlueConnectionUtils.getGlueConnection()) :
metadataHandlerClass.getConstructor(java.util.Map.class).newInstance(GlueConnectionUtils.getGlueConnection()),
hasCatalogConnections ?
muxRecordHandlerClass.getConstructor(java.util.Map.class).newInstance(System.getenv()) :
recordHandlerClass.getConstructor(java.util.Map.class).newInstance(System.getenv()));
muxRecordHandlerClass.getConstructor(java.util.Map.class).newInstance(GlueConnectionUtils.getGlueConnection()) :
recordHandlerClass.getConstructor(java.util.Map.class).newInstance(GlueConnectionUtils.getGlueConnection()));
}
}
Loading
Loading