Skip to content

Commit

Permalink
Merge branch 'main' into cherry-pick-ce-commit-be23d2c679f589c21322b4…
Browse files Browse the repository at this point in the history
…dac0b0a44f9d70e495
  • Loading branch information
afthabvp committed Aug 2, 2024
2 parents c7e8ec0 + 63df2a5 commit da80d93
Show file tree
Hide file tree
Showing 140 changed files with 4,729 additions and 589 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/integrations-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ jobs:
sudo mv libduckdb/libduckdb.so /usr/local/lib
sudo ldconfig /usr/local/lib
- name: Download and Install Oracle Instant Client
run: |
sudo apt-get install -y libaio1 alien
wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-basic-19.6.0.0.0-1.x86_64.rpm
wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-devel-19.6.0.0.0-1.x86_64.rpm
sudo alien -i --scripts oracle-instantclient*.rpm
rm -f oracle-instantclient*.rpm
- name: Install dependencies
run: |
gem install bundler
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/integrations-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ jobs:
sudo mv libduckdb/libduckdb.so /usr/local/lib
sudo ldconfig /usr/local/lib
- name: Download and Install Oracle Instant Client
run: |
sudo apt-get install -y libaio1 alien
wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-basic-19.6.0.0.0-1.x86_64.rpm
wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-devel-19.6.0.0.0-1.x86_64.rpm
sudo alien -i --scripts oracle-instantclient*.rpm
rm -f oracle-instantclient*.rpm
- name: Install dependencies
run: bundle install
working-directory: ./integrations
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/server-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ jobs:
sudo mv libduckdb/libduckdb.so /usr/local/lib
sudo ldconfig /usr/local/lib
- name: Download and Install Oracle Instant Client
run: |
sudo apt-get install -y libaio1 alien
wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-basic-19.6.0.0.0-1.x86_64.rpm
wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-devel-19.6.0.0.0-1.x86_64.rpm
sudo alien -i --scripts oracle-instantclient*.rpm
rm -f oracle-instantclient*.rpm
- name: Bundle Install
run: bundle install
working-directory: ./server
Expand Down
2 changes: 2 additions & 0 deletions integrations/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ gem "mysql2"

gem "aws-sdk-sts"

gem "ruby-oci8"

group :development, :test do
gem "simplecov", require: false
gem "simplecov_json_formatter", require: false
Expand Down
6 changes: 5 additions & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.4.1)
multiwoven-integrations (0.7.1)
activesupport
async-websocket
aws-sdk-athena
Expand All @@ -28,6 +28,7 @@ PATH
rake
restforce
ruby-limiter
ruby-oci8
ruby-odbc
rubyzip
sequel
Expand Down Expand Up @@ -275,6 +276,8 @@ GEM
rubocop-ast (1.31.3)
parser (>= 3.3.1.0)
ruby-limiter (2.3.0)
ruby-oci8 (2.2.12)
ruby-oci8 (2.2.12-x64-mingw-ucrt)
ruby-progressbar (1.13.0)
ruby2_keywords (0.0.5)
rubyzip (2.3.2)
Expand Down Expand Up @@ -357,6 +360,7 @@ DEPENDENCIES
rspec (~> 3.0)
rubocop (~> 1.21)
ruby-limiter
ruby-oci8
ruby-odbc!
rubyzip
sequel
Expand Down
4 changes: 4 additions & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
require "duckdb"
require "iterable-api-client"
require "aws-sdk-sts"
require "ruby-oci8"

# Service
require_relative "integrations/config"
Expand Down Expand Up @@ -60,6 +61,7 @@
require_relative "integrations/source/clickhouse/client"
require_relative "integrations/source/amazon_s3/client"
require_relative "integrations/source/maria_db/client"
require_relative "integrations/source/oracle_db/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand All @@ -77,6 +79,8 @@
require_relative "integrations/destination/http/client"
require_relative "integrations/destination/iterable/client"
require_relative "integrations/destination/maria_db/client"
require_relative "integrations/destination/databricks_lakehouse/client"
require_relative "integrations/destination/oracle_db/client"

module Multiwoven
module Integrations
Expand Down
2 changes: 2 additions & 0 deletions integrations/lib/multiwoven/integrations/core/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ module Constants
JSON_SCHEMA_URL = "https://json-schema.org/draft-07/schema#"

# CONNECTORS
INSTALL_HTTPFS_QUERY = ENV["INSTALL_HTTPFS_QUERY"] || "INSTALL HTTPFS; LOAD HTTPFS;"

KLAVIYO_AUTH_ENDPOINT = "https://a.klaviyo.com/api/lists/"
KLAVIYO_AUTH_PAYLOAD = {
data: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# frozen_string_literal: true

module Multiwoven
module Integrations
module Destination
module DatabricksLakehouse
include Multiwoven::Integrations::Core
class Client < DestinationConnector
MAX_CHUNK_SIZE = 10
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
db = create_connection(connection_config)
response = db.get("/api/2.0/clusters/list")
if response.status == 200
success_status
else
failure_status(nil)
end
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS:LAKEHOUSE:CHECK_CONNECTION:EXCEPTION",
type: "error"
})
failure_status(e)
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
table_query = "SHOW TABLES IN #{connection_config[:catalog]}.#{connection_config[:schema]};"
db = create_connection(connection_config)
records = []
table_response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], table_query).to_json)
table_response_body = JSON.parse(table_response.body)
table_response_body["result"]["data_array"].each do |table|
table_name = table[1]
query = "DESCRIBE TABLE #{connection_config[:catalog]}.#{connection_config[:schema]}.#{table_name};"
column_response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query).to_json)
column_response_body = JSON.parse(column_response.body)
records << [table_name, column_response_body["result"]["data_array"]]
end
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(
"DATABRICKS:LAKEHOUSE:DISCOVER:EXCEPTION",
"error",
e
)
end

def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = "#{connection_config[:catalog]}.#{connection_config[:schema]}.#{sync_config.stream.name}"
primary_key = sync_config.model.primary_key
db = create_connection(connection_config)
write_success = 0
write_failure = 0
log_message_array = []

records.each do |record|
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("DATABRICKS:LAKEHOUSE:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
arg = ["/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query)]
response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query).to_json)
if response.status == 200
write_success += 1
else
write_failure += 1
end
log_message_array << log_request_response("info", arg, response)
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
end
end
tracking_message(write_success, write_failure)
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def create_connection(connection_config)
Faraday.new(url: connection_config[:host]) do |conn|
conn.headers["Authorization"] = "Bearer #{connection_config[:api_token]}"
conn.headers["Content-Type"] = "application/json"
conn.adapter Faraday.default_adapter
end
end

def generate_body(warehouse_id, query)
{
warehouse_id: warehouse_id,
statement: query,
wait_timeout: "15s"
}
end

def create_streams(records)
message = []
group_by_table(records).each_value do |r|
message << Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
message
end

def group_by_table(records)
result = {}
records.each_with_index do |entries, index|
table_name = records[index][0]
column = []
entry_data = entries[1]
entry_data.each do |entry|
column << {
column_name: entry[0],
data_type: entry[1],
is_nullable: true
}
end
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = column
end
result
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "DatabricksLakehouse",
"title": "Databricks Datawarehouse",
"connector_type": "destination",
"category": "Database",
"documentation_url": "https://docs.multiwoven.com/destinations/databricks_lakehouse",
"github_issue_label": "destination-databricks-lakehouse",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/destination/databrick_lakehouse",
"stream_type": "static",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Databricks Datawarehouse",
"type": "object",
"required": ["host", "api_token", "warehouse_id", "catalog", "schema"],
"properties": {
"host": {
"description": "The databrick lakehouse host domain.",
"type": "string",
"title": "Host",
"order": 0
},
"api_token": {
"description": "The databrick lakehouse api token.",
"type": "string",
"multiwoven_secret": true,
"title": "API Token",
"order": 1
},"warehouse_id": {
"description": "The databrick lakehouse warehouse ID.",
"type": "string",
"title": "Warehouse ID",
"order": 2
},
"catalog": {
"description": "The name of the catalog",
"default": "hive_metastore",
"type": "string",
"title": "Databricks catalog",
"order": 3
},
"schema": {
"description": "The default schema tables are written.",
"default": "default",
"type": "string",
"title": "Database schema",
"order": 4
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit da80d93

Please sign in to comment.