Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CE): Databricks AI Model connector #325

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.8.6)
multiwoven-integrations (0.9.0)
activesupport
async-websocket
aws-sdk-athena
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
require_relative "integrations/source/amazon_s3/client"
require_relative "integrations/source/maria_db/client"
require_relative "integrations/source/oracle_db/client"
require_relative "integrations/source/databrics_model/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def failure_status(error)
message = error&.message || "failed"
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: message).to_multiwoven_message
end

def auth_headers(access_token)
{
"Accept" => "application/json",
"Authorization" => "Bearer #{access_token}",
"Content-Type" => "application/json"
}
end
end
end
end
7 changes: 7 additions & 0 deletions integrations/lib/multiwoven/integrations/core/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ module Constants
"workbook/worksheets"
MS_EXCEL_SHEET_RANGE_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/"\
"workbook/worksheets/%<sheet_name>s/range(address='A1:Z1')/usedRange?$select=values"
<<<<<<< HEAD
=======

DATABRICKS_HEALTH_URL = "https://%<databricks_host>s/api/2.0/serving-endpoints/%<endpoint_name>s"
DATABRICKS_SERVING_URL = "https://%<databricks_host>s/serving-endpoints/%<endpoint_name>s/invocations"

>>>>>>> 1224a32b (feat(CE): Databricks AI Model connector (#385))
AWS_ACCESS_KEY_ID = ENV["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = ENV["AWS_SECRET_ACCESS_KEY"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ def tracking_message(success, failure, log_message_array)
success: success, failed: failure, logs: log_message_array
).to_multiwoven_message
end

def auth_headers(access_token)
{
"Accept" => "application/json",
"Authorization" => "Bearer #{access_token}",
"Content-Type" => "application/json"
}
end
end
end
end
6 changes: 3 additions & 3 deletions integrations/lib/multiwoven/integrations/protocol/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ module Types
SyncStatus = Types::String.enum("started", "running", "complete", "incomplete")
DestinationSyncMode = Types::String.enum("insert", "upsert")
ConnectorType = Types::String.enum("source", "destination")
ConnectorQueryType = Types::String.enum("raw_sql", "soql")
ModelQueryType = Types::String.enum("raw_sql", "dbt", "soql", "table_selector")
ConnectorQueryType = Types::String.enum("raw_sql", "soql", "ai_ml")
ModelQueryType = Types::String.enum("raw_sql", "dbt", "soql", "table_selector", "ai_ml")
ConnectionStatusType = Types::String.enum("succeeded", "failed")
StreamType = Types::String.enum("static", "dynamic")
StreamType = Types::String.enum("static", "dynamic", "user_defined")
StreamAction = Types::String.enum("fetch", "create", "update", "delete")
MultiwovenMessageType = Types::String.enum(
"record", "log", "connector_spec",
Expand Down
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.8.6"
VERSION = "0.9.0"

ENABLED_SOURCES = %w[
Snowflake
Expand All @@ -16,6 +16,7 @@ module Integrations
AmazonS3
MariaDB
Oracle
DatabricksModel
].freeze

ENABLED_DESTINATIONS = %w[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Source
module DatabricksModel
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
url = build_url(DATABRICKS_HEALTH_URL, connection_config)
response = Multiwoven::Integrations::Core::HttpClient.request(
url,
HTTP_GET,
headers: auth_headers(connection_config[:token])
)
if success?(response)
success_status
else
failure_status(nil)
end
rescue StandardError => e
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
end

def discover(_connection_config = nil)
catalog_json = read_json(CATALOG_SPEC_PATH)
catalog = build_catalog(catalog_json)
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS MODEL:DISCOVER:EXCEPTION",
type: "error"
})
end

def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
# The server checks the ConnectorQueryType.
# If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol.
# This query is then sent to the AI/ML model.
payload = JSON.parse(sync_config.model.query)
run_model(connection_config, payload)
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS MODEL:READ:EXCEPTION",
type: "error"
})
end

private

def run_model(connection_config, payload)
connection_config = connection_config.with_indifferent_access

url = build_url(DATABRICKS_SERVING_URL, connection_config)
token = connection_config[:token]
response = send_request(url, token, payload)
process_response(response)
rescue StandardError => e
handle_exception(e, context: "DATABRICKS MODEL:RUN_MODEL:EXCEPTION", type: "error")
end

def process_response(response)
if success?(response)
data = JSON.parse(response.body)
[RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message]
else
create_log_message("DATABRICKS MODEL:RUN_MODEL", "error", "request failed")
end
end

def build_url(url, connection_config)
format(url, databricks_host: connection_config[:databricks_host],
endpoint_name: connection_config[:endpoint_name])
end

def send_request(url, token, payload)
Multiwoven::Integrations::Core::HttpClient.request(
url,
HTTP_POST,
payload: payload,
headers: auth_headers(token)
)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"request_rate_limit": 600,
"request_rate_limit_unit": "minute",
"request_rate_concurrency": 10,
"streams": []
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"data": {
"name": "DatabricksModel",
"title": "Databricks Model",
"connector_type": "source",
"category": "AI Model",
"documentation_url": "https://docs.mutliwoven.com",
"github_issue_label": "source-databricks-foundation",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/sources/databricks",
"stream_type": "user_defined",
"connector_query_type": "ai_ml",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Databricks Model",
"type": "object",
"required": ["databricks_host", "token", "model_name"],
"properties": {
"databricks_host": {
"title": "databricks_host",
"description": "databricks_host",
"type": "string",
"examples": ["app.databricks.com"],
"order": 0
},
"token": {
"title": "Databricks Token",
"description": "personal access token",
"type": "string",
"multiwoven_secret": true,
"order": 1
},
"endpoint": {
"title": "Endpoint name",
"description": "Endpoint name",
"examples": ["databricks-dbrx-instruct"],
"type": "string",
"order": 2
},
"request_format": {
"title": "Request Format",
"description": "Sample Request Format",
"type": "string",
"order": 3
},
"response_format": {
"title": "Response Format",
"description": "Sample Response Format",
"type": "string",
"order": 4
}
}
}
}

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ module Integrations::Protocol
expect(instance.supported_destination_sync_modes).to eq(["insert"])
expect(instance.connector_query_type).to eq("soql")
end

it "creates an instance from JSON connector_query_type ai_ml" do
json_data[:connector_query_type] = "ai_ml"
json_data[:stream_type] = "user_defined"
instance = ConnectorSpecification.from_json(json_data.to_json)
expect(instance).to be_a(ConnectorSpecification)
expect(instance.connection_specification).to eq(key: "value")
expect(instance.supports_normalization).to eq(true)
expect(instance.supported_destination_sync_modes).to eq(["insert"])
expect(instance.connector_query_type).to eq("ai_ml")
expect(instance.stream_type).to eq("user_defined")
end
end

describe "#to_multiwoven_message" do
Expand Down Expand Up @@ -366,25 +378,42 @@ module Integrations::Protocol
model = Model.new(name: "Test", query: "SELECT * FROM table", query_type: "table_selector", primary_key: "id")
expect(ModelQueryType.values).to include(model.query_type)
end

it "has a query_type 'ai_ml'" do
model = Model.new(name: "Test", query: "SELECT * FROM table", query_type: "ai_ml", primary_key: "id")
expect("ai_ml").to include(model.query_type)
end
end
end

RSpec.describe Connector do
context ".from_json" do
it "creates an instance from JSON" do
json_data = {
"name": "example_connector",
"type": "source",
"connection_specification": { "key": "value" }
}.to_json
json_data = {
"name": "example_connector",
"type": "source",
"connection_specification": { "key": "value" },
"query_type": "raw_sql"
}

connector = Connector.from_json(json_data)
it "creates an instance from JSON" do
connector = Connector.from_json(json_data.to_json)
expect(connector).to be_a(described_class)
expect(connector.name).to eq("example_connector")
expect(connector.type).to eq("source")
expect(connector.query_type).to eq("raw_sql")
expect(connector.connection_specification).to eq(key: "value")
end

it "creates an instance from JSON connector_query_type ai_ml" do
json_data[:query_type] = "ai_ml"

connector = Connector.from_json(json_data.to_json)
expect(connector).to be_a(described_class)
expect(connector.name).to eq("example_connector")
expect(connector.type).to eq("source")
expect(connector.query_type).to eq("ai_ml")
expect(connector.connection_specification).to eq(key: "value")
end
end
end

Expand Down
Loading
Loading