diff --git a/integrations/Gemfile b/integrations/Gemfile index 667bf366..9429fad9 100644 --- a/integrations/Gemfile +++ b/integrations/Gemfile @@ -41,7 +41,7 @@ gem "async-websocket", "~> 0.8.0" gem "git" -gem "hubspot-api-client" +gem "hubspot-api-client", "~> 17.2.0" gem "ruby-limiter" @@ -51,6 +51,8 @@ gem "net-sftp" gem "csv" +gem "aws-sdk-athena" + group :development, :test do gem "simplecov", require: false gem "simplecov_json_formatter", require: false diff --git a/integrations/lib/multiwoven/integrations/source/athena/client.rb b/integrations/lib/multiwoven/integrations/source/athena/client.rb index fc7bdf2d..28539a75 100644 --- a/integrations/lib/multiwoven/integrations/source/athena/client.rb +++ b/integrations/lib/multiwoven/integrations/source/athena/client.rb @@ -1,12 +1,13 @@ # frozen_string_literal: true -require "pg" +require "aws-sdk-athena" module Multiwoven::Integrations::Source module AWSAthena include Multiwoven::Integrations::Core class Client < SourceConnector def check_connection(connection_config) + connection_config = connection_config.source.connection_specification connection_config = connection_config.with_indifferent_access create_connection(connection_config) ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message @@ -18,7 +19,7 @@ def discover(connection_config) connection_config = connection_config.with_indifferent_access query = "SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns - WHERE table_schema = '#{connection_config[:schema]}' AND table_catalog = '#{connection_config[:database]}' + WHERE table_schema = '#{connection_config[:schema]}' ORDER BY table_name, ordinal_position;" db = create_connection(connection_config) @@ -42,44 +43,33 @@ def discover(connection_config) def read(sync_config) connection_config = sync_config.source.connection_specification connection_config = connection_config.with_indifferent_access - query = sync_config.model.query - query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil? db = create_connection(connection_config) + response = db.start_query_execution({ query_string: " + SELECT table_name, column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_schema = '#{connection_config[:schema]}' + ORDER BY table_name, ordinal_position", result_configuration: { output_location: connection_config[:output_location] } }) + query_execution_id = response.query_execution_id + db.get_query_execution({ query_execution_id: query_execution_id }) + sleep(5) - query(db, query) + results = db.get_query_results({ query_execution_id: query_execution_id }) + records = transform_query_results(results) + query(records) rescue StandardError => e handle_exception( "AWS:ATHENA:READ:EXCEPTION", "error", e ) - ensure - db&.close end private - def query(connection, query) - connection.exec(query) do |result| - result.map do |row| - RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message - end - end - end - def create_connection(connection_config) - raise "Unsupported Auth type" unless connection_config[:credentials][:auth_type] == "access_key/secret_access_key" - - PG.connect( - access_key: connection_config[:access_key], - secret_access_key: connection_config[:secret_access_key], - region: connection_config[:region], - workgroup: connection_config[:workgroup], - catalog: connection_config[:catalog], - dbname: connection_config[:database], - output_location: connection_config[:output_location] - ) + Aws.config.update({ credentials: Aws::Credentials.new(connection_config[:access_key], connection_config[:secret_access_key]), region: "us-east-2" }) + Aws::Athena::Client.new end def create_streams(records) @@ -88,6 +78,30 @@ def create_streams(records) end end + def transform_query_results(query_results) + return [] if query_results.nil? || query_results.result_set.nil? || query_results.result_set.rows.nil? + + columns = query_results.result_set.result_set_metadata.column_info + rows = query_results.result_set.rows + + records = rows.map do |row| + data = row.data + columns.map.with_index do |column, index| + [column.name, data[index].var_char_value] + end.to_h + end + + group_by_table(records) + end + + def query(queries) + records = [] + queries.map do |row| + records << RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message + end + records + end + def group_by_table(records) records.group_by { |entry| entry["table_name"] }.map do |table_name, columns| { diff --git a/integrations/lib/multiwoven/integrations/source/athena/config/spec.json b/integrations/lib/multiwoven/integrations/source/athena/config/spec.json index 695dbe2e..ce3196bf 100644 --- a/integrations/lib/multiwoven/integrations/source/athena/config/spec.json +++ b/integrations/lib/multiwoven/integrations/source/athena/config/spec.json @@ -6,76 +6,55 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "AWS Athena", "type": "object", - "required": ["region", "workgroup", "catalog", "database", "output_location", "schema"], + "required": ["region", "workgroup", "catalog", "database", "output_location"], "properties": { - "credentials": { - "title": "", - "type": "object", - "required": ["auth_type", "access_key", "secret_access_key"], - "properties": { - "auth_type": { - "type": "string", - "default": "access_key/secret_access_key", - "order": 0, - "readOnly": true - }, - "access_key": { - "description": "The AWS Access Key ID to use for authentication.", - "examples": ["AWSATHENAEXAMPLE"], - "type": "string", - "title": "Personal Access Key", - "order": 1 - }, - "secret_access_key": { - "description": "The AWS Secret Access Key to use for authentication.", - "examples": ["AWS/ATHENA/EXAMPLEKEY"], - "type": "string", - "title": "Secret Access Key", - "order": 2 - } - }, + "access_key": { + "description": "The AWS Access Key ID to use for authentication.", + "examples": ["AWSATHENAEXAMPLE"], + "type": "string", + "title": "Personal Access Key", "order": 0 }, + "secret_access_key": { + "description": "The AWS Secret Access Key to use for authentication.", + "examples": ["AWS/ATHENA/EXAMPLEKEY"], + "type": "string", + "title": "Secret Access Key", + "order": 1 + }, "region": { "description": "AWS region where Athena is located.", "examples": ["ATHENA_REGION"], "type": "string", "title": "Secret Access Key", - "order": 1 + "order": 2 }, "workgroup": { "description": "The Athena workgroup you previously set up in AWS.", "examples": ["ATHENA_WORKGROUP"], "type": "string", "title": "Workgroup", - "order": 2 + "order": 3 }, "catalog": { "description": "The Data catalog name within Athena.", "examples": ["ATHENA_CATALOG"], "type": "string", "title": "Catalog", - "order": 3 + "order": 4 }, - "database": { - "description": "The specific Athena database to connect to.", + "schema": { + "description": "The specific Athena database/schema to connect to.", "examples": ["ATHENA_DB"], "type": "string", "title": "Database", - "order": 4 + "order": 5 }, "output_location": { "description": "S3 path for query output.", "examples": ["s3://example-bucket-name/query-results/"], "type": "string", "title": "Query", - "order": 5 - }, - "schema": { - "description": "The schema within the Athena database.", - "examples": ["ATHENA_SCHEMA"], - "type": "string", - "title": "Schema", "order": 6 } } diff --git a/integrations/spec/multiwoven/integrations/source/athena/client_spec.rb b/integrations/spec/multiwoven/integrations/source/athena/client_spec.rb index 1ad6190f..1fdb59a4 100644 --- a/integrations/spec/multiwoven/integrations/source/athena/client_spec.rb +++ b/integrations/spec/multiwoven/integrations/source/athena/client_spec.rb @@ -1,194 +1,53 @@ # frozen_string_literal: true -RSpec.describe Multiwoven::Integrations::Source::AWSAthena::Client do # rubocop:disable Metrics/BlockLength - let(:client) { Multiwoven::Integrations::Source::AWSAthena::Client.new } - let(:sync_config) do - { - "source": { - "name": "AthenaSourceConnector", - "type": "source", - "connection_specification": { - "credentials": { - "auth_type": "access_key/secret_access_key", - "access_key": "test_access_key", - "secret_access_key": "test_secret_access_key" - }, - "region": "test_region", - "workgroup": "test_workgroup", - "catalog": "test_catalog", - "database": "test_database", - "output_location": "test_S3_path", - "schema": "test_schema" - } - }, - "destination": { - "name": "DestinationConnectorName", - "type": "destination", - "connection_specification": { - "example_destination_key": "example_destination_value" - } - }, - "model": { - "name": "ExampleAthenaModel", - "query": "SELECT * FROM contacts;", - "query_type": "raw_sql", - "primary_key": "id" - }, - "stream": { - "name": "example_stream", "action": "create", - "json_schema": { "field1": "type1" }, - "supported_sync_modes": %w[full_refresh incremental], - "source_defined_cursor": true, - "default_cursor_field": ["field1"], - "source_defined_primary_key": [["field1"], ["field2"]], - "namespace": "exampleNamespace", - "url": "https://api.example.com/data", - "method": "GET" - }, - "sync_mode": "full_refresh", - "cursor_field": "timestamp", - "destination_sync_mode": "upsert" - } - end - - let(:pg_connection) { instance_double(PG::Connection) } - let(:pg_result) { instance_double(PG::Result) } - - describe "#check_connection" do - context "when the connection is successful" do - it "returns a succeeded connection status" do - allow(Sequel).to receive(:postgres).and_return(true) - allow(PG).to receive(:connect).and_return(pg_connection) - message = client.check_connection(sync_config[:source][:connection_specification]) - result = message.connection_status - - expect(result.status).to eq("succeeded") - expect(result.message).to be_nil - end - end - - context "when the connection fails" do - it "returns a failed connection status with an error message" do - allow(PG).to receive(:connect).and_raise(PG::Error.new("Connection failed")) - - message = client.check_connection(sync_config[:source][:connection_specification]) - result = message.connection_status - expect(result.status).to eq("failed") - expect(result.message).to include("Connection failed") - end - end - end - - # read and #discover tests for Athena - describe "#read" do - context "when reading records from a real Athena database" do - it "reads records successfully" do - s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) - allow(PG).to receive(:connect).and_return(pg_connection) - - allow(pg_connection).to receive(:exec).with(s_config.model.query).and_return( - [ - Multiwoven::Integrations::Protocol::RecordMessage.new( - data: { column1: "column1" }, emitted_at: Time.now.to_i - ).to_multiwoven_message, - Multiwoven::Integrations::Protocol::RecordMessage.new( - data: { column2: "column2" }, emitted_at: Time.now.to_i - ).to_multiwoven_message - ] - ) - allow(pg_connection).to receive(:close).and_return(true) - records = client.read(s_config) - expect(records).to be_an(Array) - expect(records).not_to be_empty - expect(records.first).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) - end - - it "reads records successfully for batched_query" do - s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) - s_config.limit = 100 - s_config.offset = 1 - allow(PG).to receive(:connect).and_return(pg_connection) - - batched_query = client.send(:batched_query, s_config.model.query, s_config.limit, s_config.offset) - - allow(pg_connection).to receive(:exec).with(batched_query).and_return( - [ - Multiwoven::Integrations::Protocol::RecordMessage.new( - data: { column1: "column1" }, emitted_at: Time.now.to_i - ).to_multiwoven_message, - Multiwoven::Integrations::Protocol::RecordMessage.new( - data: { column2: "column2" }, emitted_at: Time.now.to_i - ).to_multiwoven_message - ] - ) - allow(pg_connection).to receive(:close).and_return(true) - records = client.read(s_config) - expect(records).to be_an(Array) - expect(records).not_to be_empty - expect(records.first).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) - end - - it "read records failure" do - s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) - allow(client).to receive(:create_connection).and_raise(StandardError.new("test error")) - expect(client).to receive(:handle_exception).with( - "AWS:ATHENA:READ:EXCEPTION", - "error", - an_instance_of(StandardError) - ) - client.read(s_config) - end - end - end - - describe "#discover" do - it "discovers schema successfully" do - allow(PG).to receive(:connect).and_return(pg_connection) - discovery_query = "SELECT table_name, column_name, data_type, is_nullable\n" \ - " FROM information_schema.columns\n" \ - " WHERE table_schema = 'test_schema' AND table_catalog = 'test_database'\n" \ - " ORDER BY table_name, ordinal_position;" - allow(pg_connection).to receive(:exec).with(discovery_query).and_return( - [ - { - "table_name" => "combined_users", "column_name" => "city", "data_type" => "varchar", "is_nullable" => "YES" - } - ] - ) - allow(pg_connection).to receive(:close).and_return(true) - message = client.discover(sync_config[:source][:connection_specification]) - - expect(message.catalog).to be_an(Multiwoven::Integrations::Protocol::Catalog) - first_stream = message.catalog.streams.first - expect(first_stream).to be_a(Multiwoven::Integrations::Protocol::Stream) - expect(first_stream.name).to eq("combined_users") - expect(first_stream.json_schema).to be_an(Hash) - expect(first_stream.json_schema["type"]).to eq("object") - expect(first_stream.json_schema["properties"]).to eq({ "city" => { "type" => %w[string null] } }) - end - - it "discover schema failure" do - allow(client).to receive(:create_connection).and_raise(StandardError.new("test error")) - expect(client).to receive(:handle_exception).with( - "AWS:ATHENA:DISCOVER:EXCEPTION", - "error", - an_instance_of(StandardError) - ) - client.discover(sync_config[:source][:connection_specification]) - end - end - - describe "#meta_data" do - # change this to rollout validation for all connector rolling out - it "client class_name and meta name is same" do - meta_name = client.class.to_s.split("::")[-2] - expect(client.send(:meta_data)[:data][:name]).to eq(meta_name) - end - end - - describe "method definition" do - it "defines a private #query method" do - expect(described_class.private_instance_methods).to include(:query) - end - end -end +source_connector = Multiwoven::Integrations::Protocol::Connector.new( + name: "AWS Athena", + type: Multiwoven::Integrations::Protocol::ConnectorType["source"], + connection_specification: { + "access_key": ENV["ATHENA_ACCESS"], + "secret_access_key": ENV["ATHENA_SECRET"], + "region": "us-east-2", + "workgroup": "test_workgroup", + "catalog": "AwsDatacatalog", + "schema": "test_database", + "output_location": "s3://s3bucket-ai2-test" + } +) + +model = Multiwoven::Integrations::Protocol::Model.new( + name: "Anthena Account", + query: "select id, name from Account LIMIT 10", + query_type: "raw_sql", + primary_key: "id" +) + +destination_connector = Multiwoven::Integrations::Protocol::Connector.new( + name: "Sample Destination Connector", + type: Multiwoven::Integrations::Protocol::ConnectorType["destination"], + connection_specification: {} +) + + +stream = Multiwoven::Integrations::Protocol::Stream.new( + name: "example_stream", + action: "create", + "json_schema": { "field1": "type1" }, + "supported_sync_modes": %w[full_refresh incremental], + "source_defined_cursor": true, + "default_cursor_field": ["field1"], + "source_defined_primary_key": [["field1"], ["field2"]], + "namespace": "exampleNamespace", + "url": "https://api.example.com/data", + "method": "GET" +) + +sync_config = Multiwoven::Integrations::Protocol::SyncConfig.new( + source: source_connector, + destination: destination_connector, + model: model, + stream: stream, + sync_mode: Multiwoven::Integrations::Protocol::SyncMode["full_refresh"], + destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["upsert"] +) + +Multiwoven::Integrations::Source::AWSAthena::Client.new.read(sync_config)