Skip to content

Commit

Permalink
feat: Added AWS Athena connector
Browse files Browse the repository at this point in the history
  • Loading branch information
TivonB-AI2 committed Apr 27, 2024
1 parent 4773a03 commit 5968529
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 259 deletions.
4 changes: 3 additions & 1 deletion integrations/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ gem "async-websocket", "~> 0.8.0"

gem "git"

gem "hubspot-api-client"
gem "hubspot-api-client", "~> 17.2.0"

gem "ruby-limiter"

Expand All @@ -51,6 +51,8 @@ gem "net-sftp"

gem "csv"

gem "aws-sdk-athena"

group :development, :test do
gem "simplecov", require: false
gem "simplecov_json_formatter", require: false
Expand Down
66 changes: 40 additions & 26 deletions integrations/lib/multiwoven/integrations/source/athena/client.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# frozen_string_literal: true

require "pg"
require "aws-sdk-athena"

module Multiwoven::Integrations::Source
module AWSAthena
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
Expand All @@ -18,7 +19,7 @@ def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '#{connection_config[:schema]}' AND table_catalog = '#{connection_config[:database]}'
WHERE table_schema = '#{connection_config[:schema]}'
ORDER BY table_name, ordinal_position;"

db = create_connection(connection_config)
Expand All @@ -42,44 +43,33 @@ def discover(connection_config)
def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
query = sync_config.model.query
query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?

db = create_connection(connection_config)
response = db.start_query_execution({ query_string: "
SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '#{connection_config[:schema]}'
ORDER BY table_name, ordinal_position", result_configuration: { output_location: connection_config[:output_location] } })
query_execution_id = response.query_execution_id
db.get_query_execution({ query_execution_id: query_execution_id })
sleep(5)

query(db, query)
results = db.get_query_results({ query_execution_id: query_execution_id })
records = transform_query_results(results)
query(records)
rescue StandardError => e
handle_exception(
"AWS:ATHENA:READ:EXCEPTION",
"error",
e
)
ensure
db&.close
end

private

def query(connection, query)
connection.exec(query) do |result|
result.map do |row|
RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
end
end

def create_connection(connection_config)
raise "Unsupported Auth type" unless connection_config[:credentials][:auth_type] == "access_key/secret_access_key"

PG.connect(
access_key: connection_config[:access_key],
secret_access_key: connection_config[:secret_access_key],
region: connection_config[:region],
workgroup: connection_config[:workgroup],
catalog: connection_config[:catalog],
dbname: connection_config[:database],
output_location: connection_config[:output_location]
)
Aws.config.update({ credentials: Aws::Credentials.new(connection_config[:access_key], connection_config[:secret_access_key]), region: "us-east-2" })
Aws::Athena::Client.new
end

def create_streams(records)
Expand All @@ -88,6 +78,30 @@ def create_streams(records)
end
end

def transform_query_results(query_results)
return [] if query_results.nil? || query_results.result_set.nil? || query_results.result_set.rows.nil?

columns = query_results.result_set.result_set_metadata.column_info
rows = query_results.result_set.rows

records = rows.map do |row|
data = row.data
columns.map.with_index do |column, index|
[column.name, data[index].var_char_value]
end.to_h
end

group_by_table(records)
end

def query(queries)
records = []
queries.map do |row|
records << RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
records
end

def group_by_table(records)
records.group_by { |entry| entry["table_name"] }.map do |table_name, columns|
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,76 +6,55 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AWS Athena",
"type": "object",
"required": ["region", "workgroup", "catalog", "database", "output_location", "schema"],
"required": ["region", "workgroup", "catalog", "database", "output_location"],
"properties": {
"credentials": {
"title": "",
"type": "object",
"required": ["auth_type", "access_key", "secret_access_key"],
"properties": {
"auth_type": {
"type": "string",
"default": "access_key/secret_access_key",
"order": 0,
"readOnly": true
},
"access_key": {
"description": "The AWS Access Key ID to use for authentication.",
"examples": ["AWSATHENAEXAMPLE"],
"type": "string",
"title": "Personal Access Key",
"order": 1
},
"secret_access_key": {
"description": "The AWS Secret Access Key to use for authentication.",
"examples": ["AWS/ATHENA/EXAMPLEKEY"],
"type": "string",
"title": "Secret Access Key",
"order": 2
}
},
"access_key": {
"description": "The AWS Access Key ID to use for authentication.",
"examples": ["AWSATHENAEXAMPLE"],
"type": "string",
"title": "Personal Access Key",
"order": 0
},
"secret_access_key": {
"description": "The AWS Secret Access Key to use for authentication.",
"examples": ["AWS/ATHENA/EXAMPLEKEY"],
"type": "string",
"title": "Secret Access Key",
"order": 1
},
"region": {
"description": "AWS region where Athena is located.",
"examples": ["ATHENA_REGION"],
"type": "string",
"title": "Secret Access Key",
"order": 1
"order": 2
},
"workgroup": {
"description": "The Athena workgroup you previously set up in AWS.",
"examples": ["ATHENA_WORKGROUP"],
"type": "string",
"title": "Workgroup",
"order": 2
"order": 3
},
"catalog": {
"description": "The Data catalog name within Athena.",
"examples": ["ATHENA_CATALOG"],
"type": "string",
"title": "Catalog",
"order": 3
"order": 4
},
"database": {
"description": "The specific Athena database to connect to.",
"schema": {
"description": "The specific Athena database/schema to connect to.",
"examples": ["ATHENA_DB"],
"type": "string",
"title": "Database",
"order": 4
"order": 5
},
"output_location": {
"description": "S3 path for query output.",
"examples": ["s3://example-bucket-name/query-results/"],
"type": "string",
"title": "Query",
"order": 5
},
"schema": {
"description": "The schema within the Athena database.",
"examples": ["ATHENA_SCHEMA"],
"type": "string",
"title": "Schema",
"order": 6
}
}
Expand Down
Loading

0 comments on commit 5968529

Please sign in to comment.