diff --git a/integration/logstash-plugins/logstash-input-vespa/CHANGELOG.md b/integration/logstash-plugins/logstash-input-vespa/CHANGELOG.md index b8377ab356f..e591ee8b714 100644 --- a/integration/logstash-plugins/logstash-input-vespa/CHANGELOG.md +++ b/integration/logstash-plugins/logstash-input-vespa/CHANGELOG.md @@ -1,3 +1,6 @@ +## 0.3.0 +added retry+backoff logic + ## 0.2.0 Added support for mTLS certificates, selector, page_size, backend_concurrency, timeout, from_timestamp, and to_timestamp diff --git a/integration/logstash-plugins/logstash-input-vespa/README.md b/integration/logstash-plugins/logstash-input-vespa/README.md index c8714a74512..1199f33b941 100644 --- a/integration/logstash-plugins/logstash-input-vespa/README.md +++ b/integration/logstash-plugins/logstash-input-vespa/README.md @@ -20,6 +20,13 @@ export LOGSTASH_SOURCE=1 bundle exec rspec ``` +To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information. + +``` +cd integration-test +./run_tests.sh +``` + ## Usage Minimal Logstash config example: @@ -62,6 +69,12 @@ input { # HTTP request timeout timeout => 180 + # maximum retries for failed HTTP requests + max_retries => 3 + + # delay in seconds for the first retry attempt. We double this delay for each subsequent retry. + retry_delay => 1 + # lower timestamp bound (microseconds since epoch) from_timestamp => 1600000000000000 diff --git a/integration/logstash-plugins/logstash-input-vespa/integration-test/config/logstash_input_test.conf b/integration/logstash-plugins/logstash-input-vespa/integration-test/config/logstash_input_test.conf new file mode 100644 index 00000000000..0f53422d093 --- /dev/null +++ b/integration/logstash-plugins/logstash-input-vespa/integration-test/config/logstash_input_test.conf @@ -0,0 +1,15 @@ +input { + vespa { + vespa_url => "http://localhost:8080" + cluster => "VESPA_CLUSTER" + selection => "id.specific == 'TEST_DOC_ID'" + } +} + +output { + file { + path => "/tmp/output.json" + codec => json_lines + } + stdout { codec => rubydebug } +} \ No newline at end of file diff --git a/integration/logstash-plugins/logstash-input-vespa/integration-test/run_tests.sh b/integration/logstash-plugins/logstash-input-vespa/integration-test/run_tests.sh new file mode 100755 index 00000000000..b01f15a3592 --- /dev/null +++ b/integration/logstash-plugins/logstash-input-vespa/integration-test/run_tests.sh @@ -0,0 +1,117 @@ +#!/bin/bash + +# Configuration +LOGSTASH_HOME="/opt/logstash/logstash-current" +VESPA_URL="http://localhost:8080" +VESPA_CLUSTER="used_car" + +# extract the plugin version from the gemspec +PLUGIN_VERSION=$(grep version ../logstash-input-vespa.gemspec | awk -F"'" '{print $2}') +if [ -z "$PLUGIN_VERSION" ]; then + echo -e "${RED}Error: Failed to extract plugin version${NC}" + exit 1 +else + echo "Plugin version: $PLUGIN_VERSION" +fi + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' + +command_exists() { + command -v "$1" >/dev/null 2>&1 +} + +# Check prerequisites +if [ ! -d "$LOGSTASH_HOME" ]; then + echo -e "${RED}Error: Logstash not found at $LOGSTASH_HOME${NC}" + exit 1 +fi + +if ! command_exists curl; then + echo -e "${RED}Error: curl is required but not installed${NC}" + exit 1 +fi + +# Build and install plugin +echo "Building plugin..." +cd .. +gem build logstash-input-vespa.gemspec +cd integration-test + +echo "Installing plugin..." +$LOGSTASH_HOME/bin/logstash-plugin install --no-verify ../logstash-input-vespa-${PLUGIN_VERSION}.gem + +# Wait for Vespa to be ready +echo "Checking Vespa availability..." +max_attempts=30 + +attempt=1 +while ! curl --output /dev/null --silent --fail "$VESPA_URL"; do + if [ $attempt -eq $max_attempts ]; then + echo -e "${RED}Error: Vespa not available after $max_attempts attempts${NC}" + exit 1 + fi + printf '.' + sleep 1 + attempt=$((attempt + 1)) +done +echo -e "${GREEN}Vespa is ready${NC}" + +# Run test cases +echo "Running tests..." +test_count=0 +failed_count=0 + +run_test() { + local test_name=$1 + local doc_id=$2 + local doc_content=$3 + + test_count=$((test_count + 1)) + echo "Test: $test_name" + + # Feed document to Vespa + curl -X POST -H "Content-Type:application/json" --data "$doc_content" \ + "$VESPA_URL/document/v1/cars/$VESPA_CLUSTER/docid/$doc_id" + + # Create config file with actual ID + sed "s/TEST_DOC_ID/$doc_id/" config/logstash_input_test.conf > config/logstash_input_test_with_id.conf + + # fill in the Vespa cluster + sed "s/VESPA_CLUSTER/$VESPA_CLUSTER/" config/logstash_input_test_with_id.conf > config/logstash_input_test_with_id2.conf + mv config/logstash_input_test_with_id2.conf config/logstash_input_test_with_id.conf + + # Run Logstash + $LOGSTASH_HOME/bin/logstash -f $(pwd)/config/logstash_input_test_with_id.conf + + # Check output file + OUTPUT_FILE="/tmp/output.json" + if [ -f "$OUTPUT_FILE" ] && grep -q "$doc_id" "$OUTPUT_FILE"; then + echo -e "${GREEN}✓ Test passed${NC}" + else + echo -e "${RED}✗ Test failed - Document not found in output${NC}" + failed_count=$((failed_count + 1)) + fi + + # Clean up + rm -f "$OUTPUT_FILE" +} + +# Create output directory +mkdir -p data + +# ID will be like test_1234567890 +ID="test_$(date +%s)" + +# Run tests +run_test "Simple document" "$ID" '{ + "fields": { + "id": "'$ID'" + } +}' + +# Print summary +echo "Tests completed: $test_count, Failed: $failed_count" +exit $failed_count \ No newline at end of file diff --git a/integration/logstash-plugins/logstash-input-vespa/spec/inputs/vespa_spec.rb b/integration/logstash-plugins/logstash-input-vespa/spec/inputs/vespa_spec.rb index 4864c8c492b..903daa722c9 100644 --- a/integration/logstash-plugins/logstash-input-vespa/spec/inputs/vespa_spec.rb +++ b/integration/logstash-plugins/logstash-input-vespa/spec/inputs/vespa_spec.rb @@ -2,6 +2,8 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/vespa" require "webmock/rspec" +require 'tempfile' +require 'openssl' describe LogStash::Inputs::Vespa do let(:config) do @@ -113,4 +115,157 @@ end end end + + describe "#register" do + let(:temp_cert) do + file = Tempfile.new(['cert', '.pem']) + # Create a self-signed certificate for testing + key = OpenSSL::PKey::RSA.new(2048) + cert = OpenSSL::X509::Certificate.new + cert.version = 2 + cert.serial = 1 + cert.subject = OpenSSL::X509::Name.parse("/CN=Test") + cert.issuer = cert.subject + cert.public_key = key.public_key + cert.not_before = Time.now + cert.not_after = Time.now + 3600 + + # Sign the certificate + cert.sign(key, OpenSSL::Digest::SHA256.new) + + file.write(cert.to_pem) + file.close + file + end + + let(:temp_key) do + file = Tempfile.new(['key', '.pem']) + # Create a valid RSA key for testing + key = OpenSSL::PKey::RSA.new(2048) + file.write(key.to_pem) + file.close + file + end + + after do + temp_cert.unlink + temp_key.unlink + end + + it "raises error when only client_cert is provided" do + invalid_config = config.merge({"client_cert" => temp_cert.path}) + plugin = described_class.new(invalid_config) + + expect { plugin.register }.to raise_error(LogStash::ConfigurationError, + "Both client_cert and client_key must be set, you can't have just one") + end + + it "raises error when only client_key is provided" do + invalid_config = config.merge({"client_key" => temp_key.path}) + plugin = described_class.new(invalid_config) + + expect { plugin.register }.to raise_error(LogStash::ConfigurationError, + "Both client_cert and client_key must be set, you can't have just one") + end + + it "correctly sets up URI parameters" do + full_config = config.merge({ + "selection" => "true", + "from_timestamp" => 1234567890, + "to_timestamp" => 2234567890, + "page_size" => 50, + + "backend_concurrency" => 2, + "timeout" => 120 + }) + + plugin = described_class.new(full_config) + plugin.register + + # Access the private @uri_params using send + uri_params = plugin.send(:instance_variable_get, :@uri_params) + expect(uri_params[:selection]).to eq("true") + expect(uri_params[:fromTimestamp]).to eq(1234567890) + expect(uri_params[:toTimestamp]).to eq(2234567890) + expect(uri_params[:wantedDocumentCount]).to eq(50) + expect(uri_params[:concurrency]).to eq(2) + expect(uri_params[:timeout]).to eq(120) + end + end + + describe "#parse_response" do + it "handles malformed JSON responses" do + response = double("response", :body => "invalid json{") + result = plugin.parse_response(response) + expect(result).to be_nil + end + + it "successfully parses valid JSON responses" do + valid_json = { + "documents" => [{"id" => "doc1"}], + "documentCount" => 1 + }.to_json + response = double("response", :body => valid_json) + + result = plugin.parse_response(response) + expect(result["documentCount"]).to eq(1) + expect(result["documents"]).to be_an(Array) + end + end + + describe "#process_documents" do + it "creates events with correct decoration" do + documents = [ + {"id" => "doc1", "fields" => {"field1" => "value1"}}, + {"id" => "doc2", "fields" => {"field1" => "value2"}} + ] + + # Test that decoration is applied + expect(plugin).to receive(:decorate).twice + + plugin.process_documents(documents, queue) + expect(queue.size).to eq(2) + + event1 = queue.pop + expect(event1.get("id")).to eq("doc1") + expect(event1.get("fields")["field1"]).to eq("value1") + + event2 = queue.pop + expect(event2.get("id")).to eq("doc2") + expect(event2.get("fields")["field1"]).to eq("value2") + end + end + + describe "#stop" do + it "sets stopping flag" do + plugin.stop + expect(plugin.instance_variable_get(:@stopping)).to be true + end + + it "interrupts running visit operation" do + request_made = Queue.new # Use a Queue for thread synchronization + + # Setup a response that would normally continue + stub_request(:get, "#{base_uri}?#{uri_params}") + .to_return(status: 200, body: { + documents: [{"id" => "doc1"}], + documentCount: 1, + continuation: "token" + }.to_json) + .with { |req| request_made.push(true); true } # Signal when request is made + + # Run in a separate thread + thread = Thread.new { plugin.run(queue) } + + # Wait for the first request to be made + request_made.pop + + # Now we know the first request has been made, stop the plugin + plugin.stop + thread.join + + # Should only make one request despite having a continuation token + expect(a_request(:get, "#{base_uri}?#{uri_params}")).to have_been_made.once + end + end end \ No newline at end of file diff --git a/integration/logstash-plugins/logstash-output-vespa/README.md b/integration/logstash-plugins/logstash-output-vespa/README.md index d12d4e18c06..72f49491da7 100644 --- a/integration/logstash-plugins/logstash-output-vespa/README.md +++ b/integration/logstash-plugins/logstash-output-vespa/README.md @@ -14,8 +14,10 @@ If you're developing the plugin, you'll want to do something like: ``` # build the gem ./gradlew gem +# run tests +./gradlew test # install it as a Logstash plugin -/opt/logstash/bin/logstash-plugin install /path/to/logstash-output-vespa/logstash-output-vespa_feed-0.4.0.gem +/opt/logstash/bin/logstash-plugin install /path/to/logstash-output-vespa/logstash-output-vespa_feed-0.5.2.gem # profit /opt/logstash/bin/logstash ``` @@ -24,6 +26,16 @@ Some more good info about Logstash Java plugins can be found [here](https://www. It looks like the JVM options from [here](https://github.com/logstash-plugins/.ci/blob/main/dockerjdk17.env) are useful to make JRuby's `bundle install` work. +### Integration tests +To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information. + +``` +cd integration-test +./run_tests.sh +``` + +### Publishing the gem + Note to self: for some reason, `bundle exec rake publish_gem` fails, but `gem push logstash-output-vespa_feed-$VERSION.gem` does the trick. diff --git a/integration/logstash-plugins/logstash-output-vespa/VERSION b/integration/logstash-plugins/logstash-output-vespa/VERSION index 4b9fcbec101..cb0c939a936 100644 --- a/integration/logstash-plugins/logstash-output-vespa/VERSION +++ b/integration/logstash-plugins/logstash-output-vespa/VERSION @@ -1 +1 @@ -0.5.1 +0.5.2 diff --git a/integration/logstash-plugins/logstash-output-vespa/build.gradle b/integration/logstash-plugins/logstash-output-vespa/build.gradle index 5e635b4a374..af184cc0bc5 100644 --- a/integration/logstash-plugins/logstash-output-vespa/build.gradle +++ b/integration/logstash-plugins/logstash-output-vespa/build.gradle @@ -47,6 +47,7 @@ repositories { shadowJar { archiveClassifier.set('') + configurations = [project.configurations.runtimeClasspath, project.configurations.testRuntimeClasspath] } dependencies { @@ -56,11 +57,19 @@ dependencies { implementation("com.yahoo.vespa:vespa-feed-client:${VESPA_VERSION}") - implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.2' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.2' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2' + implementation 'com.fasterxml.jackson.core:jackson-core:2.18.2' + implementation 'com.fasterxml.jackson.core:jackson-annotations:2.18.2' - testImplementation 'junit:junit:4.12' - testImplementation 'org.jruby:jruby-complete:9.1.13.0' + testImplementation 'org.jruby:jruby-complete:9.4.3.0' + testImplementation 'org.jruby:jruby-core:9.4.3.0' testImplementation 'org.apache.logging.log4j:log4j-core:2.9.1' + testImplementation 'junit:junit:4.13.2' + testImplementation 'org.mockito:mockito-core:5.11.0' + testImplementation("com.yahoo.vespa:vespa-feed-client:${VESPA_VERSION}") + testImplementation fileTree(dir: LOGSTASH_CORE_PATH, include: "**/logstash-core.jar") + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2' } clean { @@ -112,3 +121,15 @@ tasks.register("gem"){ buildGem(projectDir, buildDir, pluginInfo.pluginFullName() + ".gemspec") } } + +test { + doFirst { + println "Test runtime classpath:" + classpath.each { println it } + } +} + +task integrationTest(type: Exec) { + workingDir 'integration-test' + commandLine './run_tests.sh' +} diff --git a/integration/logstash-plugins/logstash-output-vespa/integration-test/config/logstash_output_test.conf b/integration/logstash-plugins/logstash-output-vespa/integration-test/config/logstash_output_test.conf new file mode 100644 index 00000000000..d0ea9774d3f --- /dev/null +++ b/integration/logstash-plugins/logstash-output-vespa/integration-test/config/logstash_output_test.conf @@ -0,0 +1,28 @@ +input { + stdin {} +} + +filter { + # get the ID from the input + csv { + separator => "," + skip_header => true + # we need the "id" and "operation" fields in the schema for now + columns => ["operation", "id"] + } + + # remove fields that are not needed + mutate { + remove_field => ["@timestamp", "@version", "event", "host", "log", "message"] + } +} + +output { + stdout { codec => rubydebug } + vespa_feed { + vespa_url => "http://localhost:8080" + namespace => "cars" + document_type => "VESPA_CLUSTER" + operation => "%{operation}" + } +} \ No newline at end of file diff --git a/integration/logstash-plugins/logstash-output-vespa/integration-test/run_tests.sh b/integration/logstash-plugins/logstash-output-vespa/integration-test/run_tests.sh new file mode 100755 index 00000000000..c6f687b8333 --- /dev/null +++ b/integration/logstash-plugins/logstash-output-vespa/integration-test/run_tests.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +# Configuration +LOGSTASH_HOME="/opt/logstash/logstash-current" +VESPA_URL="http://localhost:8080" +VESPA_CLUSTER="used_car" +PLUGIN_VERSION=$(cat ../VERSION) + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +NC='\033[0m' + +command_exists() { + command -v "$1" >/dev/null 2>&1 +} + +# Check prerequisites +if [ ! -d "$LOGSTASH_HOME" ]; then + echo -e "${RED}Error: Logstash not found at $LOGSTASH_HOME${NC}" + exit 1 +fi + +if ! command_exists curl; then + echo -e "${RED}Error: curl is required but not installed${NC}" + exit 1 +fi + +# Build and install plugin +echo "Building plugin..." +cd .. +./gradlew clean gem +cd integration-test + +echo "Installing plugin..." +$LOGSTASH_HOME/bin/logstash-plugin install --no-verify ../logstash-output-vespa_feed-$PLUGIN_VERSION.gem + +# Wait for Vespa to be ready +echo "Checking Vespa availability..." +max_attempts=30 +attempt=1 +while ! curl --output /dev/null --silent --fail "$VESPA_URL"; do + if [ $attempt -eq $max_attempts ]; then + echo -e "${RED}Error: Vespa not available after $max_attempts attempts${NC}" + exit 1 + fi + printf '.' + sleep 1 + attempt=$((attempt + 1)) +done +echo -e "${GREEN}Vespa is ready${NC}" + +# Run test cases +echo "Running tests..." +test_count=0 +failed_count=0 + +# fill in the Vespa cluster +sed "s/VESPA_CLUSTER/$VESPA_CLUSTER/" config/logstash_output_test.conf > config/logstash_output_test_configured.conf + +run_test() { + local test_name=$1 + local input=$2 + local expected_doc_id=$3 + local expected_status=$4 + + + + test_count=$((test_count + 1)) + + echo "Test: $test_name" + echo "$input" | $LOGSTASH_HOME/bin/logstash -f $(pwd)/config/logstash_output_test_configured.conf + + # Wait for document to be available + sleep 2 + + # Check if document exists + status=$(curl -s -o /dev/null -w "%{http_code}" "$VESPA_URL/document/v1/cars/$VESPA_CLUSTER/docid/$expected_doc_id") + + if [ "$status" = "$expected_status" ]; then + echo -e "${GREEN}✓ Test passed${NC}" + else + echo -e "${RED}✗ Test failed - (HTTP $status)${NC}" + failed_count=$((failed_count + 1)) + fi +} + +# ID will be like test_1234567890 +ID="test_$(date +%s)" + +# Run tests +run_test "Put simple document" "put,$ID" "$ID" 200 +run_test "Delete simple document" " +remove,$ID" "$ID" 404 + + +# Print summary +echo "Tests completed: $test_count, Failed: $failed_count" +exit $failed_count \ No newline at end of file diff --git a/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb b/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb index ee4a9170c64..39d5bcc20b4 100644 --- a/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb +++ b/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb @@ -2,4 +2,4 @@ # encoding: utf-8 require 'jar_dependencies' -require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.5.1') +require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.5.2') diff --git a/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java b/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java index 188e414e823..86e3a9d803e 100644 --- a/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java +++ b/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java @@ -72,7 +72,7 @@ public class VespaFeed implements Output { public static final PluginConfigSpec DOOM_PERIOD = PluginConfigSpec.numSetting("doom_period", 60); - private final FeedClient client; + private FeedClient client; private final String id; private final String namespace; private final boolean dynamicNamespace; @@ -145,7 +145,15 @@ public int retries() { objectMapper = ObjectMappers.JSON_MAPPER; } - private void validateOperationAndCreate() { + // Constructor for testing + protected VespaFeed(String id, Configuration config, Context context, FeedClient testClient) { + this(id, config, context); + if (testClient != null) { + this.client = testClient; + } + } + + public void validateOperationAndCreate() { if (!dynamicOperation) { if (!operation.equals("put") && !operation.equals("update") && !operation.equals("remove")) { throw new IllegalArgumentException("Operation must be put, update or remove"); @@ -156,7 +164,7 @@ private void validateOperationAndCreate() { } } - private FeedClientBuilder addCertAndKeyToBuilder(Configuration config, FeedClientBuilder builder) { + protected static FeedClientBuilder addCertAndKeyToBuilder(Configuration config, FeedClientBuilder builder) { String clientCert = config.get(CLIENT_CERT); Path clientCertPath = null; if (clientCert != null) { @@ -212,7 +220,7 @@ public void output(final Collection events) { } - private CompletableFuture asyncFeed(Event event) throws JsonProcessingException { + protected CompletableFuture asyncFeed(Event event) throws JsonProcessingException { Map eventData = event.getData(); // we put the doc ID here @@ -263,7 +271,6 @@ private CompletableFuture asyncFeed(Event event) throws JsonProcessingEx doc.put("fields", eventData); // create the request to feed the document - //return client.put(docId, toJson(doc), operationParameters); if (operation.equals("put")) { return client.put(docId, toJson(doc), operationParameters); } else if (operation.equals("update")) { @@ -279,7 +286,7 @@ private CompletableFuture asyncFeed(Event event) throws JsonProcessingEx * @param docId * @return The operation parameters with create=true if applicable */ - private OperationParameters addCreateIfApplicable(String operation, String docId) { + public OperationParameters addCreateIfApplicable(String operation, String docId) { OperationParameters operationParameters = OperationParameters.empty() .timeout(Duration.ofSeconds(operationTimeout)); @@ -302,7 +309,7 @@ private OperationParameters addCreateIfApplicable(String operation, String docId * @param fieldName The field name to get * @return The value of the field or the field name if it doesn't exist */ - private String getDynamicField(Event event, String fieldName) { + protected String getDynamicField(Event event, String fieldName) { Object namespaceFieldValue = event.getField(fieldName); if (namespaceFieldValue != null) { return namespaceFieldValue.toString(); @@ -311,7 +318,7 @@ private String getDynamicField(Event event, String fieldName) { } } - private String toJson(Map eventData) throws JsonProcessingException { + protected String toJson(Map eventData) throws JsonProcessingException { return objectMapper.writeValueAsString(eventData); } @@ -336,4 +343,36 @@ public Collection> configSchema() { public String getId() { return id; } + + public boolean isDynamicNamespace() { + return dynamicNamespace; + } + + public String getNamespace() { + return namespace; + } + + public boolean isDynamicDocumentType() { + return dynamicDocumentType; + } + + public String getDocumentType() { + return documentType; + } + + public String getOperation() { + return operation; + } + + public boolean isCreate() { + return create; + } + + public boolean isDynamicOperation() { + return dynamicOperation; + } + + public long getOperationTimeout() { + return operationTimeout; + } } diff --git a/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/DynamicOptionTest.java b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/DynamicOptionTest.java new file mode 100644 index 00000000000..0a83c21f4da --- /dev/null +++ b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/DynamicOptionTest.java @@ -0,0 +1,42 @@ +package org.logstashplugins; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class DynamicOptionTest { + + @Test + public void testStaticValue() { + DynamicOption option = new DynamicOption("static-value"); + assertFalse(option.isDynamic()); + assertEquals("static-value", option.getParsedConfigValue()); + } + + @Test + public void testDynamicValueWithBrackets() { + DynamicOption option = new DynamicOption("%{[field_name]}"); + assertTrue(option.isDynamic()); + assertEquals("field_name", option.getParsedConfigValue()); + } + + @Test + public void testDynamicValueWithoutBrackets() { + DynamicOption option = new DynamicOption("%{field_name}"); + assertTrue(option.isDynamic()); + assertEquals("field_name", option.getParsedConfigValue()); + } + + @Test + public void testEmptyValue() { + DynamicOption option = new DynamicOption(""); + assertFalse(option.isDynamic()); + assertEquals("", option.getParsedConfigValue()); + } + + @Test + public void testPartialMatch() { + DynamicOption option = new DynamicOption("prefix_%{field_name"); + assertFalse(option.isDynamic()); + assertEquals("prefix_%{field_name", option.getParsedConfigValue()); + } +} \ No newline at end of file diff --git a/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTest.java b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTest.java new file mode 100644 index 00000000000..3a9e39a77f4 --- /dev/null +++ b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTest.java @@ -0,0 +1,454 @@ +package org.logstashplugins; + +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Event; +import co.elastic.logstash.api.PluginConfigSpec; + +import org.junit.Test; + +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.Result; +import ai.vespa.feed.client.DocumentId; + +import java.net.URI; +import java.time.Duration; +import java.nio.file.Files; +import java.nio.file.Path; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.mockito.Mockito; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.Mockito.verify; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.times; + +public class VespaFeedTest { + + private static Configuration createMockConfig(String operation, boolean create, boolean dynamicOperation) { + Configuration config = Mockito.mock(Configuration.class); + try { + // Set required config values + when(config.get(VespaFeed.VESPA_URL)).thenReturn(new URI("http://localhost:8080")); + when(config.get(VespaFeed.NAMESPACE)).thenReturn(dynamicOperation ? "%{namespace}" : "test-namespace"); + when(config.get(VespaFeed.DOCUMENT_TYPE)).thenReturn("test-doc-type"); + when(config.get(VespaFeed.OPERATION)).thenReturn(operation); + when(config.get(VespaFeed.CREATE)).thenReturn(create); + // Set defaults for other required config + when(config.get(VespaFeed.MAX_CONNECTIONS)).thenReturn(1L); + when(config.get(VespaFeed.MAX_STREAMS)).thenReturn(128L); + when(config.get(VespaFeed.MAX_RETRIES)).thenReturn(10L); + when(config.get(VespaFeed.OPERATION_TIMEOUT)).thenReturn(180L); + when(config.get(VespaFeed.GRACE_PERIOD)).thenReturn(10L); + when(config.get(VespaFeed.DOOM_PERIOD)).thenReturn(60L); + return config; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testValidateOperationAndCreate_ValidOperations() { + // Test valid "put" operation + VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + feed.validateOperationAndCreate(); // Should not throw exception + + // Test valid "update" operation + feed = new VespaFeed("test-id", createMockConfig("update", false, false), null); + feed.validateOperationAndCreate(); // Should not throw exception + + // Test valid "remove" operation + feed = new VespaFeed("test-id", createMockConfig("remove", false, false), null); + feed.validateOperationAndCreate(); // Should not throw exception + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateOperationAndCreate_InvalidOperation() { + VespaFeed feed = new VespaFeed("test-id", createMockConfig("invalid", false, false), null); + feed.validateOperationAndCreate(); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateOperationAndCreate_RemoveWithCreate() { + VespaFeed feed = new VespaFeed("test-id", createMockConfig("remove", true, false), null); + feed.validateOperationAndCreate(); + } + + @Test + public void testValidateOperationAndCreate_DynamicOperation() { + // When operation is dynamic, validation should be skipped + VespaFeed feed = new VespaFeed("test-id", createMockConfig("%{operation}", false, true), null); + feed.validateOperationAndCreate(); // Should not throw exception + } + + @Test + public void testConstructor_DynamicOptions() { + // Test non-dynamic options + VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + assertFalse("Operation should not be dynamic", feed.isDynamicOperation()); + assertEquals("Operation should be 'put'", "put", feed.getOperation()); + + // Test dynamic operation + feed = new VespaFeed("test-id", createMockConfig("%{operation}", false, true), null); + assertTrue("Operation should be dynamic", feed.isDynamicOperation()); + assertEquals("Operation field should be 'operation'", "operation", feed.getOperation()); + + // Test dynamic namespace + Configuration config = createMockConfig("put", false, false); + when(config.get(VespaFeed.NAMESPACE)).thenReturn("%{my_namespace}"); + feed = new VespaFeed("test-id", config, null); + assertTrue("Namespace should be dynamic", feed.isDynamicNamespace()); + assertEquals("Namespace field should be 'my_namespace'", "my_namespace", feed.getNamespace()); + + // Test dynamic document type + config = createMockConfig("put", false, false); + when(config.get(VespaFeed.DOCUMENT_TYPE)).thenReturn("%{doc_type}"); + feed = new VespaFeed("test-id", config, null); + assertTrue("Document type should be dynamic", feed.isDynamicDocumentType()); + assertEquals("Document type field should be 'doc_type'", "doc_type", feed.getDocumentType()); + } + + @Test + public void testAddCreateIfApplicable() { + // Test put operation with create=true + VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", true, false), null); + OperationParameters params = feed.addCreateIfApplicable("put", "doc1"); + assertTrue("Put operation should have createIfNonExistent=true", params.createIfNonExistent()); + assertEquals("Timeout should be set", Duration.ofSeconds(180), params.timeout().get()); + + // Test update operation with create=true + feed = new VespaFeed("test-id", createMockConfig("update", true, false), null); + params = feed.addCreateIfApplicable("update", "doc1"); + assertTrue("Update operation should have createIfNonExistent=true", params.createIfNonExistent()); + + // Test put operation with create=false + feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + params = feed.addCreateIfApplicable("put", "doc1"); + assertFalse("Put operation should not have createIfNonExistent when create=false", params.createIfNonExistent()); + } + + @Test + public void testConstructor_OperationValidation() { + // Test invalid operation + Configuration config = createMockConfig("invalid_op", false, false); + try { + new VespaFeed("test-id", config, null); + fail("Should throw IllegalArgumentException for invalid operation"); + } catch (IllegalArgumentException e) { + assertEquals("Operation must be put, update or remove", e.getMessage()); + } + + // Test remove with create=true + config = createMockConfig("remove", true, false); + try { + new VespaFeed("test-id", config, null); + fail("Should throw IllegalArgumentException for remove with create=true"); + } catch (IllegalArgumentException e) { + assertEquals("Operation remove cannot have create=true", e.getMessage()); + } + + // Test that create=true is allowed for put and update + config = createMockConfig("put", true, false); + VespaFeed feed = new VespaFeed("test-id", config, null); + assertTrue("Create should be true for put operation", feed.isCreate()); + + config = createMockConfig("update", true, false); + feed = new VespaFeed("test-id", config, null); + assertTrue("Create should be true for update operation", feed.isCreate()); + } + + @Test + public void testAddCertAndKeyToBuilder() throws IOException { + Configuration config = createMockConfig("put", false, false); + FeedClientBuilder builder = FeedClientBuilder.create(URI.create("http://localhost:8080")); + + // Create temporary cert and key files + Path certPath = Files.createTempFile("test-cert", ".pem"); + Path keyPath = Files.createTempFile("test-key", ".pem"); + try { + // Write some dummy content + Files.write(certPath, "test certificate".getBytes()); + Files.write(keyPath, "test key".getBytes()); + + // Test with both cert and key. It should not throw an exception + when(config.get(VespaFeed.CLIENT_CERT)).thenReturn(certPath.toString()); + when(config.get(VespaFeed.CLIENT_KEY)).thenReturn(keyPath.toString()); + VespaFeed.addCertAndKeyToBuilder(config, builder); + + // Test with missing cert/key. Similarly, it should not throw an exception + when(config.get(VespaFeed.CLIENT_CERT)).thenReturn(null); + when(config.get(VespaFeed.CLIENT_KEY)).thenReturn(null); + VespaFeed.addCertAndKeyToBuilder(config, builder); + } finally { + // Clean up + Files.deleteIfExists(certPath); + Files.deleteIfExists(keyPath); + } + } + + @Test + public void testGetDynamicField() { + // Create a mock Event + Event event = createMockEvent("test-id", "field_value"); + + // Test when field exists + when(event.getField("my_field")).thenReturn("field_value"); + VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + assertEquals("Should return field value", "field_value", feed.getDynamicField(event, "my_field")); + + // Test when field doesn't exist + when(event.getField("missing_field")).thenReturn(null); + assertEquals("Should return field name when field doesn't exist", + "missing_field", feed.getDynamicField(event, "missing_field")); + } + + @Test + public void testToJson() throws Exception { + VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + ObjectMapper mapper = new ObjectMapper(); + + // Test simple map + Map simpleMap = new HashMap<>(); + simpleMap.put("string", "value"); + simpleMap.put("number", 42); + simpleMap.put("boolean", true); + assertEquals( + mapper.readTree("{\"string\":\"value\",\"number\":42,\"boolean\":true}"), + mapper.readTree(feed.toJson(simpleMap)) + ); + + // Test nested map + Map nestedMap = new HashMap<>(); + nestedMap.put("nested", simpleMap); + assertEquals( + mapper.readTree("{\"nested\":{\"string\":\"value\",\"number\":42,\"boolean\":true}}"), + mapper.readTree(feed.toJson(nestedMap)) + ); + + // Test array + List list = Arrays.asList("one", "two", "three"); + Map mapWithArray = new HashMap<>(); + mapWithArray.put("array", list); + assertEquals( + mapper.readTree("{\"array\":[\"one\",\"two\",\"three\"]}"), + mapper.readTree(feed.toJson(mapWithArray)) + ); + } + + @Test + public void testAsyncFeed_PutOperation() throws Exception { + FeedClient mockClient = createMockFeedClient(); + VespaFeed feed = createVespaFeed("put", true, false, mockClient); + Event event = createMockEvent("test-doc-1", "value1"); + + CompletableFuture future = feed.asyncFeed(event); + assertEquals(Result.Type.success, future.get().type()); + + verifyDocument(mockClient, "test-doc-1", "value1"); + } + + @Test + public void testAsyncFeed_DynamicOperation() throws Exception { + FeedClient mockClient = createMockFeedClient(); + VespaFeed feed = createVespaFeed("%{operation}", false, true, mockClient); + + // Create test event + Event event = createMockEvent("test-doc-1", "value1"); + when(event.getField("operation")).thenReturn("update"); + + CompletableFuture future = feed.asyncFeed(event); + assertEquals(Result.Type.success, future.get().type()); + + verify(mockClient).update( + eq(DocumentId.of("test-namespace", "test-doc-type", "test-doc-1")), + contains("\"fields\":{\"field1\":\"value1\",\"doc_id\":\"test-doc-1\"}"), + any(OperationParameters.class) + ); + } + + @Test + public void testAsyncFeed_UUIDGeneration() throws Exception { + FeedClient mockClient = createMockFeedClient(); + VespaFeed feed = createVespaFeed("put", false, false, mockClient); + + // Create event without ID + Event event = createMockEvent(null, "value1"); + + CompletableFuture future = feed.asyncFeed(event); + assertEquals(Result.Type.success, future.get().type()); + + verify(mockClient).put( + argThat(docId -> docId.toString().matches("id:test-namespace:test-doc-type::[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")), + contains("\"fields\":{\"field1\":\"value1\"}"), + any(OperationParameters.class) + ); + } + + @Test + public void testOutput_SuccessfulBatch() throws Exception { + FeedClient mockClient = createMockFeedClient(); + VespaFeed feed = createVespaFeed("put", false, false, mockClient); + + // Create test events + List events = Arrays.asList( + createMockEvent("doc1", "value1"), + createMockEvent("doc2", "value2") + ); + + feed.output(events); + verify(mockClient, times(2)).put(any(), any(), any()); + } + + @Test + public void testOutput_JsonSerializationError() throws Exception { + FeedClient mockClient = createMockFeedClient(); + VespaFeed feed = createVespaFeed("put", false, false, mockClient); + + // Create event that will cause serialization error + Event badEvent = Mockito.mock(Event.class); + Map eventData = new HashMap<>(); + eventData.put("field1", new Object() { // Non-serializable object + @Override + public String toString() { throw new RuntimeException("Serialization error"); } + }); + when(badEvent.getData()).thenReturn(eventData); + + // Test that output handles the error gracefully + feed.output(Arrays.asList(badEvent)); + verify(mockClient, times(0)).put(any(), any(), any()); + } + + @Test + public void testOutput_FeedClientError() throws Exception { + FeedClient mockClient = createMockFeedClient(); + Result errorResult = Mockito.mock(Result.class); + when(errorResult.type()).thenReturn(Result.Type.conditionNotMet); + CompletableFuture errorFuture = CompletableFuture.completedFuture(errorResult); + when(mockClient.put(any(), any(), any())).thenReturn(errorFuture); + + VespaFeed feed = createVespaFeed("put", false, false, mockClient); + feed.output(Arrays.asList(createMockEvent("doc1", "value1"))); + // The test passes if no exception is thrown + } + + @Test + public void testOutput_StoppedBehavior() throws Exception { + FeedClient mockClient = createMockFeedClient(); + VespaFeed feed = createVespaFeed("put", false, false, mockClient); + + // Stop the feed + feed.stop(); + + // Create test events + List events = Arrays.asList( + createMockEvent("doc1", "value1"), + createMockEvent("doc2", "value2") + ); + + // Test that output doesn't process events when stopped + feed.output(events); + verify(mockClient, times(0)).put(any(), any(), any()); + } + + @Test + public void testOutput_MultiFeedException() throws Exception { + FeedClient mockClient = createMockFeedClient(); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Feed error")); + when(mockClient.put(any(), any(), any())).thenReturn(future); + + VespaFeed feed = createVespaFeed("put", false, false, mockClient); + feed.output(Arrays.asList(createMockEvent("doc1", "value1"))); + // Test passes if error is logged but not thrown + } + + @Test + public void testAwaitStop() throws Exception { + FeedClient mockClient = createMockFeedClient(); + VespaFeed feed = createVespaFeed("put", false, false, mockClient); + + feed.awaitStop(); // Should do nothing + verify(mockClient, times(0)).close(); + + feed.stop(); + verify(mockClient, times(1)).close(); + } + + @Test + public void testConfigSchema() { + //makes sure that all config options are present in the schema + + VespaFeed feed = createVespaFeed("put", false, false, null); + Collection> schema = feed.configSchema(); + + assertTrue(schema.contains(VespaFeed.VESPA_URL)); + assertTrue(schema.contains(VespaFeed.CLIENT_CERT)); + assertTrue(schema.contains(VespaFeed.CLIENT_KEY)); + assertTrue(schema.contains(VespaFeed.OPERATION)); + assertTrue(schema.contains(VespaFeed.CREATE)); + assertTrue(schema.contains(VespaFeed.NAMESPACE)); + assertTrue(schema.contains(VespaFeed.DOCUMENT_TYPE)); + assertTrue(schema.contains(VespaFeed.ID_FIELD)); + assertTrue(schema.contains(VespaFeed.MAX_CONNECTIONS)); + assertTrue(schema.contains(VespaFeed.MAX_STREAMS)); + assertTrue(schema.contains(VespaFeed.MAX_RETRIES)); + assertTrue(schema.contains(VespaFeed.OPERATION_TIMEOUT)); + assertTrue(schema.contains(VespaFeed.GRACE_PERIOD)); + assertTrue(schema.contains(VespaFeed.DOOM_PERIOD)); + } + + private Event createMockEvent(String docId, String value) { + Event event = Mockito.mock(Event.class); + Map eventData = new HashMap<>(); + eventData.put("field1", value); + if (docId != null) { + eventData.put("doc_id", docId); + } + when(event.getData()).thenReturn(eventData); + when(event.getField("doc_id")).thenReturn(docId); + return event; + } + + private FeedClient createMockFeedClient() { + FeedClient mockClient = Mockito.mock(FeedClient.class); + Result mockResult = Mockito.mock(Result.class); + when(mockResult.type()).thenReturn(Result.Type.success); + CompletableFuture successFuture = CompletableFuture.completedFuture(mockResult); + when(mockClient.put(any(), any(), any())).thenReturn(successFuture); + when(mockClient.update(any(), any(), any())).thenReturn(successFuture); + when(mockClient.remove(any(), any())).thenReturn(successFuture); + return mockClient; + } + + private VespaFeed createVespaFeed(String operation, boolean create, boolean dynamicOperation, FeedClient mockClient) { + Configuration config = createMockConfig(operation, create, dynamicOperation); + when(config.get(VespaFeed.ID_FIELD)).thenReturn("doc_id"); + when(config.get(VespaFeed.NAMESPACE)).thenReturn("test-namespace"); + return new VespaFeed("test-id", config, null, mockClient); + } + + private void verifyDocument(FeedClient mockClient, String docId, String value) { + verify(mockClient).put( + eq(DocumentId.of("test-namespace", "test-doc-type", docId)), + contains("\"fields\":{\"field1\":\"" + value + "\",\"doc_id\":\"" + docId + "\"}"), + any(OperationParameters.class) + ); + } +} \ No newline at end of file