From f020a968c56e6c41b546b3905965d992602c3db7 Mon Sep 17 00:00:00 2001 From: afthab vp Date: Tue, 20 Aug 2024 20:02:55 +0530 Subject: [PATCH 1/2] Resolve conflict in cherry-pick of 1224a32b5a7b1da0a9be3c3869947f6a55b85647 and change the commit message --- integrations/Gemfile.lock | 4 + integrations/lib/multiwoven/integrations.rb | 1 + .../integrations/core/base_connector.rb | 8 + .../multiwoven/integrations/core/constants.rb | 7 + .../core/destination_connector.rb | 8 - .../integrations/protocol/protocol.rb | 6 +- .../lib/multiwoven/integrations/rollout.rb | 5 + .../source/databrics_model/client.rb | 87 ++++++++ .../databrics_model/config/catalog.json | 6 + .../source/databrics_model/config/meta.json | 16 ++ .../source/databrics_model/config/spec.json | 47 +++++ .../source/databrics_model/icon.svg | 19 ++ .../integrations/protocol/protocol_spec.rb | 43 +++- .../source/databricks_model/client_spec.rb | 193 ++++++++++++++++++ 14 files changed, 432 insertions(+), 18 deletions(-) create mode 100644 integrations/lib/multiwoven/integrations/source/databrics_model/client.rb create mode 100644 integrations/lib/multiwoven/integrations/source/databrics_model/config/catalog.json create mode 100644 integrations/lib/multiwoven/integrations/source/databrics_model/config/meta.json create mode 100644 integrations/lib/multiwoven/integrations/source/databrics_model/config/spec.json create mode 100644 integrations/lib/multiwoven/integrations/source/databrics_model/icon.svg create mode 100644 integrations/spec/multiwoven/integrations/source/databricks_model/client_spec.rb diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index e6d685ea..ff96ed48 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,11 @@ GIT PATH remote: . specs: +<<<<<<< HEAD multiwoven-integrations (0.8.4) +======= + multiwoven-integrations (0.9.0) +>>>>>>> 1224a32b (feat(CE): Databricks AI Model connector (#385)) activesupport async-websocket aws-sdk-athena diff --git a/integrations/lib/multiwoven/integrations.rb b/integrations/lib/multiwoven/integrations.rb index fbcc5537..02f2e262 100644 --- a/integrations/lib/multiwoven/integrations.rb +++ b/integrations/lib/multiwoven/integrations.rb @@ -62,6 +62,7 @@ require_relative "integrations/source/amazon_s3/client" require_relative "integrations/source/maria_db/client" require_relative "integrations/source/oracle_db/client" +require_relative "integrations/source/databrics_model/client" # Destination require_relative "integrations/destination/klaviyo/client" diff --git a/integrations/lib/multiwoven/integrations/core/base_connector.rb b/integrations/lib/multiwoven/integrations/core/base_connector.rb index 672e474c..68230b51 100644 --- a/integrations/lib/multiwoven/integrations/core/base_connector.rb +++ b/integrations/lib/multiwoven/integrations/core/base_connector.rb @@ -66,6 +66,14 @@ def failure_status(error) message = error&.message || "failed" ConnectionStatus.new(status: ConnectionStatusType["failed"], message: message).to_multiwoven_message end + + def auth_headers(access_token) + { + "Accept" => "application/json", + "Authorization" => "Bearer #{access_token}", + "Content-Type" => "application/json" + } + end end end end diff --git a/integrations/lib/multiwoven/integrations/core/constants.rb b/integrations/lib/multiwoven/integrations/core/constants.rb index 7c8e5248..61eb9219 100644 --- a/integrations/lib/multiwoven/integrations/core/constants.rb +++ b/integrations/lib/multiwoven/integrations/core/constants.rb @@ -46,6 +46,13 @@ module Constants "workbook/worksheets" MS_EXCEL_SHEET_RANGE_API = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/"\ "workbook/worksheets/%s/range(address='A1:Z1')/usedRange?$select=values" +<<<<<<< HEAD +======= + + DATABRICKS_HEALTH_URL = "https://%s/api/2.0/serving-endpoints/%s" + DATABRICKS_SERVING_URL = "https://%s/serving-endpoints/%s/invocations" + +>>>>>>> 1224a32b (feat(CE): Databricks AI Model connector (#385)) AWS_ACCESS_KEY_ID = ENV["AWS_ACCESS_KEY_ID"] AWS_SECRET_ACCESS_KEY = ENV["AWS_SECRET_ACCESS_KEY"] diff --git a/integrations/lib/multiwoven/integrations/core/destination_connector.rb b/integrations/lib/multiwoven/integrations/core/destination_connector.rb index da327412..f0595a31 100644 --- a/integrations/lib/multiwoven/integrations/core/destination_connector.rb +++ b/integrations/lib/multiwoven/integrations/core/destination_connector.rb @@ -15,14 +15,6 @@ def tracking_message(success, failure, log_message_array) success: success, failed: failure, logs: log_message_array ).to_multiwoven_message end - - def auth_headers(access_token) - { - "Accept" => "application/json", - "Authorization" => "Bearer #{access_token}", - "Content-Type" => "application/json" - } - end end end end diff --git a/integrations/lib/multiwoven/integrations/protocol/protocol.rb b/integrations/lib/multiwoven/integrations/protocol/protocol.rb index 9b4c5efd..22fcdfbb 100644 --- a/integrations/lib/multiwoven/integrations/protocol/protocol.rb +++ b/integrations/lib/multiwoven/integrations/protocol/protocol.rb @@ -10,10 +10,10 @@ module Types SyncStatus = Types::String.enum("started", "running", "complete", "incomplete") DestinationSyncMode = Types::String.enum("insert", "upsert") ConnectorType = Types::String.enum("source", "destination") - ConnectorQueryType = Types::String.enum("raw_sql", "soql") - ModelQueryType = Types::String.enum("raw_sql", "dbt", "soql", "table_selector") + ConnectorQueryType = Types::String.enum("raw_sql", "soql", "ai_ml") + ModelQueryType = Types::String.enum("raw_sql", "dbt", "soql", "table_selector", "ai_ml") ConnectionStatusType = Types::String.enum("succeeded", "failed") - StreamType = Types::String.enum("static", "dynamic") + StreamType = Types::String.enum("static", "dynamic", "user_defined") StreamAction = Types::String.enum("fetch", "create", "update", "delete") MultiwovenMessageType = Types::String.enum( "record", "log", "connector_spec", diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index 9518790e..ba7ab246 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,11 @@ module Multiwoven module Integrations +<<<<<<< HEAD VERSION = "0.8.4" +======= + VERSION = "0.9.0" +>>>>>>> 1224a32b (feat(CE): Databricks AI Model connector (#385)) ENABLED_SOURCES = %w[ Snowflake @@ -16,6 +20,7 @@ module Integrations AmazonS3 MariaDB Oracle + DatabricksModel ].freeze ENABLED_DESTINATIONS = %w[ diff --git a/integrations/lib/multiwoven/integrations/source/databrics_model/client.rb b/integrations/lib/multiwoven/integrations/source/databrics_model/client.rb new file mode 100644 index 00000000..63db472d --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/databrics_model/client.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +module Multiwoven::Integrations::Source + module DatabricksModel + include Multiwoven::Integrations::Core + class Client < SourceConnector + def check_connection(connection_config) + connection_config = connection_config.with_indifferent_access + url = build_url(DATABRICKS_HEALTH_URL, connection_config) + response = Multiwoven::Integrations::Core::HttpClient.request( + url, + HTTP_GET, + headers: auth_headers(connection_config[:token]) + ) + if success?(response) + success_status + else + failure_status(nil) + end + rescue StandardError => e + ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message + end + + def discover(_connection_config = nil) + catalog_json = read_json(CATALOG_SPEC_PATH) + catalog = build_catalog(catalog_json) + catalog.to_multiwoven_message + rescue StandardError => e + handle_exception(e, { + context: "DATABRICKS MODEL:DISCOVER:EXCEPTION", + type: "error" + }) + end + + def read(sync_config) + connection_config = sync_config.source.connection_specification + connection_config = connection_config.with_indifferent_access + # The server checks the ConnectorQueryType. + # If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol. + # This query is then sent to the AI/ML model. + payload = JSON.parse(sync_config.model.query) + run_model(connection_config, payload) + rescue StandardError => e + handle_exception(e, { + context: "DATABRICKS MODEL:READ:EXCEPTION", + type: "error" + }) + end + + private + + def run_model(connection_config, payload) + connection_config = connection_config.with_indifferent_access + + url = build_url(DATABRICKS_SERVING_URL, connection_config) + token = connection_config[:token] + response = send_request(url, token, payload) + process_response(response) + rescue StandardError => e + handle_exception(e, context: "DATABRICKS MODEL:RUN_MODEL:EXCEPTION", type: "error") + end + + def process_response(response) + if success?(response) + data = JSON.parse(response.body) + [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] + else + create_log_message("DATABRICKS MODEL:RUN_MODEL", "error", "request failed") + end + end + + def build_url(url, connection_config) + format(url, databricks_host: connection_config[:databricks_host], + endpoint_name: connection_config[:endpoint_name]) + end + + def send_request(url, token, payload) + Multiwoven::Integrations::Core::HttpClient.request( + url, + HTTP_POST, + payload: payload, + headers: auth_headers(token) + ) + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/source/databrics_model/config/catalog.json b/integrations/lib/multiwoven/integrations/source/databrics_model/config/catalog.json new file mode 100644 index 00000000..dacb788b --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/databrics_model/config/catalog.json @@ -0,0 +1,6 @@ +{ + "request_rate_limit": 600, + "request_rate_limit_unit": "minute", + "request_rate_concurrency": 10, + "streams": [] +} diff --git a/integrations/lib/multiwoven/integrations/source/databrics_model/config/meta.json b/integrations/lib/multiwoven/integrations/source/databrics_model/config/meta.json new file mode 100644 index 00000000..078e2f18 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/databrics_model/config/meta.json @@ -0,0 +1,16 @@ +{ + "data": { + "name": "DatabricksModel", + "title": "Databricks Model", + "connector_type": "source", + "category": "AI Model", + "documentation_url": "https://docs.mutliwoven.com", + "github_issue_label": "source-databricks-foundation", + "icon": "icon.svg", + "license": "MIT", + "release_stage": "alpha", + "support_level": "community", + "tags": ["language:ruby", "multiwoven"] + } + } + \ No newline at end of file diff --git a/integrations/lib/multiwoven/integrations/source/databrics_model/config/spec.json b/integrations/lib/multiwoven/integrations/source/databrics_model/config/spec.json new file mode 100644 index 00000000..ef7ca0c9 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/databrics_model/config/spec.json @@ -0,0 +1,47 @@ +{ + "documentation_url": "https://docs.multiwoven.com/integrations/sources/databricks", + "stream_type": "user_defined", + "connector_query_type": "ai_ml", + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Databricks Model", + "type": "object", + "required": ["databricks_host", "token", "model_name"], + "properties": { + "databricks_host": { + "title": "databricks_host", + "description": "databricks_host", + "type": "string", + "examples": ["app.databricks.com"], + "order": 0 + }, + "token": { + "title": "Databricks Token", + "description": "personal access token", + "type": "string", + "multiwoven_secret": true, + "order": 1 + }, + "endpoint": { + "title": "Endpoint name", + "description": "Endpoint name", + "examples": ["databricks-dbrx-instruct"], + "type": "string", + "order": 2 + }, + "request_format": { + "title": "Request Format", + "description": "Sample Request Format", + "type": "string", + "order": 3 + }, + "response_format": { + "title": "Response Format", + "description": "Sample Response Format", + "type": "string", + "order": 4 + } + } + } + } + \ No newline at end of file diff --git a/integrations/lib/multiwoven/integrations/source/databrics_model/icon.svg b/integrations/lib/multiwoven/integrations/source/databrics_model/icon.svg new file mode 100644 index 00000000..553c3e41 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/databrics_model/icon.svg @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb b/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb index bcb9e589..f0d2c640 100644 --- a/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb +++ b/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb @@ -61,6 +61,18 @@ module Integrations::Protocol expect(instance.supported_destination_sync_modes).to eq(["insert"]) expect(instance.connector_query_type).to eq("soql") end + + it "creates an instance from JSON connector_query_type ai_ml" do + json_data[:connector_query_type] = "ai_ml" + json_data[:stream_type] = "user_defined" + instance = ConnectorSpecification.from_json(json_data.to_json) + expect(instance).to be_a(ConnectorSpecification) + expect(instance.connection_specification).to eq(key: "value") + expect(instance.supports_normalization).to eq(true) + expect(instance.supported_destination_sync_modes).to eq(["insert"]) + expect(instance.connector_query_type).to eq("ai_ml") + expect(instance.stream_type).to eq("user_defined") + end end describe "#to_multiwoven_message" do @@ -366,25 +378,42 @@ module Integrations::Protocol model = Model.new(name: "Test", query: "SELECT * FROM table", query_type: "table_selector", primary_key: "id") expect(ModelQueryType.values).to include(model.query_type) end + + it "has a query_type 'ai_ml'" do + model = Model.new(name: "Test", query: "SELECT * FROM table", query_type: "ai_ml", primary_key: "id") + expect("ai_ml").to include(model.query_type) + end end end RSpec.describe Connector do context ".from_json" do - it "creates an instance from JSON" do - json_data = { - "name": "example_connector", - "type": "source", - "connection_specification": { "key": "value" } - }.to_json + json_data = { + "name": "example_connector", + "type": "source", + "connection_specification": { "key": "value" }, + "query_type": "raw_sql" + } - connector = Connector.from_json(json_data) + it "creates an instance from JSON" do + connector = Connector.from_json(json_data.to_json) expect(connector).to be_a(described_class) expect(connector.name).to eq("example_connector") expect(connector.type).to eq("source") expect(connector.query_type).to eq("raw_sql") expect(connector.connection_specification).to eq(key: "value") end + + it "creates an instance from JSON connector_query_type ai_ml" do + json_data[:query_type] = "ai_ml" + + connector = Connector.from_json(json_data.to_json) + expect(connector).to be_a(described_class) + expect(connector.name).to eq("example_connector") + expect(connector.type).to eq("source") + expect(connector.query_type).to eq("ai_ml") + expect(connector.connection_specification).to eq(key: "value") + end end end diff --git a/integrations/spec/multiwoven/integrations/source/databricks_model/client_spec.rb b/integrations/spec/multiwoven/integrations/source/databricks_model/client_spec.rb new file mode 100644 index 00000000..ff80de03 --- /dev/null +++ b/integrations/spec/multiwoven/integrations/source/databricks_model/client_spec.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +RSpec.describe Multiwoven::Integrations::Source::DatabricksModel::Client do + include WebMock::API + + before(:each) do + WebMock.disable_net_connect!(allow_localhost: true) + end + + let(:client) { described_class.new } + let(:mock_http_session) { double("Net::Http::Session") } + + let(:payload) do + { + messages: [ + { + role: "user", + content: "Hello there" + } + ] + } + end + + let(:sync_config_json) do + { + source: { + name: "databrick-model-source", + type: "source", + connection_specification: { + databricks_host: "test-host.databricks.com", + token: "test_token", + endpoint_name: "test", + request_format: "{}", + response_format: "{}" + } + }, + destination: { + name: "DestinationConnectorName", + type: "destination", + connection_specification: { + example_destination_key: "example_destination_value" + } + }, + model: { + name: "ExampleModel", + query: payload.to_json, + query_type: "ai_ml", + primary_key: "id" + }, + stream: { + name: "example_stream", + json_schema: { "field1": "type1" }, + request_method: "POST", + request_rate_limit: 4, + rate_limit_unit_seconds: 1 + }, + sync_mode: "full_refresh", + cursor_field: "timestamp", + destination_sync_mode: "upsert", + sync_id: "1" + } + end + + let(:sync_config) { Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) } + + before do + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + end + + let(:headers) do + { + "Accept" => "application/json", + "Authorization" => "Bearer test_token", + "Content-Type" => "application/json" + } + end + + describe "#check_connection" do + context "when the connection is successful" do + let(:response_body) { { "message" => "success" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized") + response.content_type = "application/json" + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with("https://test-host.databricks.com/api/2.0/serving-endpoints/test", + "GET", + headers: headers) + .and_return(response) + end + + it "returns a succeeded connection status" do + message = client.check_connection(sync_config_json[:source][:connection_specification]) + result = message.connection_status + expect(result).to be_a(Multiwoven::Integrations::Protocol::ConnectionStatus) + expect(result.status).to eq("succeeded") + expect(result.message).to be_nil + end + end + + context "when the connection fails" do + let(:response_body) { { "message" => "failed" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized") + response.content_type = "application/json" + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with("https://test-host.databricks.com/api/2.0/serving-endpoints/test", + "GET", + headers: headers) + .and_return(response) + end + + it "returns a failed connection status with an error message" do + message = client.check_connection(sync_config_json[:source][:connection_specification]) + result = message.connection_status + expect(result).to be_a(Multiwoven::Integrations::Protocol::ConnectionStatus) + expect(result.status).to eq("failed") + end + end + end + + describe "#discover" do + it "successfully returns the catalog message" do + message = client.discover(nil) + catalog = message.catalog + expect(catalog).to be_a(Multiwoven::Integrations::Protocol::Catalog) + expect(catalog.request_rate_limit).to eql(600) + expect(catalog.request_rate_limit_unit).to eql("minute") + expect(catalog.request_rate_concurrency).to eql(10) + end + + it "handles exceptions during discovery" do + allow(client).to receive(:read_json).and_raise(StandardError.new("test error")) + expect(client).to receive(:handle_exception).with( + an_instance_of(StandardError), + hash_including(context: "DATABRICKS MODEL:DISCOVER:EXCEPTION", type: "error") + ) + client.discover + end + end + + describe "#read" do + context "when the read is successful" do + let(:response_body) do + { + "id": "chatcmpl_090b5500-1e59-4226-878c-2e1ec5e0b3d3", + "object": "chat.completion", + "created": 1_724_067_784, + "model": "dbrx-instruct-071224", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "Hello! How can I assist you today?" + } + } + ] + }.to_json + end + before do + response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized") + response.content_type = "application/json" + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with("https://test-host.databricks.com/serving-endpoints/test/invocations", + "POST", + payload: JSON.parse(payload.to_json), + headers: headers) + .and_return(response) + end + it "successfully reads records" do + records = client.read(sync_config) + expect(records).to be_an(Array) + expect(records.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(records.first.record.data).to eq(JSON.parse(response_body)) + end + end + context "when the read is failed" do + it "handles exceptions during reading" do + error_instance = StandardError.new("test error") + allow(client).to receive(:run_model).and_raise(error_instance) + expect(client).to receive(:handle_exception).with( + error_instance, + hash_including(context: "DATABRICKS MODEL:READ:EXCEPTION", type: "error") + ) + + client.read(sync_config) + end + end + end +end From c554140673a9e5989bbe977ac81a1a073918ba9f Mon Sep 17 00:00:00 2001 From: afthab vp Date: Wed, 21 Aug 2024 13:48:28 +0530 Subject: [PATCH 2/2] fix(CE): conflict issue --- integrations/lib/multiwoven/integrations/core/constants.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/integrations/lib/multiwoven/integrations/core/constants.rb b/integrations/lib/multiwoven/integrations/core/constants.rb index 61eb9219..95e369dc 100644 --- a/integrations/lib/multiwoven/integrations/core/constants.rb +++ b/integrations/lib/multiwoven/integrations/core/constants.rb @@ -46,13 +46,10 @@ module Constants "workbook/worksheets" MS_EXCEL_SHEET_RANGE_API = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/"\ "workbook/worksheets/%s/range(address='A1:Z1')/usedRange?$select=values" -<<<<<<< HEAD -======= DATABRICKS_HEALTH_URL = "https://%s/api/2.0/serving-endpoints/%s" DATABRICKS_SERVING_URL = "https://%s/serving-endpoints/%s/invocations" ->>>>>>> 1224a32b (feat(CE): Databricks AI Model connector (#385)) AWS_ACCESS_KEY_ID = ENV["AWS_ACCESS_KEY_ID"] AWS_SECRET_ACCESS_KEY = ENV["AWS_SECRET_ACCESS_KEY"]