From 64899ec703372fcf24f184c45a95018395cc3d57 Mon Sep 17 00:00:00 2001 From: Radu Gheorghe Date: Fri, 27 Dec 2024 13:11:25 +0200 Subject: [PATCH] [logstash-plugins] output: added options to remove metadata fields from documents after using them for put/update --- .../logstash-input-vespa/README.md | 4 +- .../logstash-output-vespa/CHANGELOG.md | 3 + .../logstash-output-vespa/README.md | 10 +- .../logstash-output-vespa/VERSION | 2 +- .../logstash-output-vespa/docs/index.asciidoc | 53 +++- .../lib/logstash-output-vespa_feed_jars.rb | 2 +- .../java/org/logstashplugins/VespaFeed.java | 62 +++-- .../org/logstashplugins/VespaFeedTest.java | 236 ++++++++++-------- .../logstashplugins/VespaFeedTestHelper.java | 95 +++++++ 9 files changed, 336 insertions(+), 131 deletions(-) create mode 100644 integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTestHelper.java diff --git a/integration/logstash-plugins/logstash-input-vespa/README.md b/integration/logstash-plugins/logstash-input-vespa/README.md index 1199f33b941..399ef02c5c1 100644 --- a/integration/logstash-plugins/logstash-input-vespa/README.md +++ b/integration/logstash-plugins/logstash-input-vespa/README.md @@ -20,7 +20,9 @@ export LOGSTASH_SOURCE=1 bundle exec rspec ``` -To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information. +To run integration tests, you'll need to have a Vespa instance running with an app deployed that supports an "id" field. And Logstash installed. + +Check out the `integration-test` directory for more information. ``` cd integration-test diff --git a/integration/logstash-plugins/logstash-output-vespa/CHANGELOG.md b/integration/logstash-plugins/logstash-output-vespa/CHANGELOG.md index 13348bae0ca..027fcfbd0f7 100644 --- a/integration/logstash-plugins/logstash-output-vespa/CHANGELOG.md +++ b/integration/logstash-plugins/logstash-output-vespa/CHANGELOG.md @@ -1,3 +1,6 @@ +## 0.6.0 +- adds options to remove fields after using them for writing + ## 0.5.0 - supports update and remove operations diff --git a/integration/logstash-plugins/logstash-output-vespa/README.md b/integration/logstash-plugins/logstash-output-vespa/README.md index 72f49491da7..a659828dc70 100644 --- a/integration/logstash-plugins/logstash-output-vespa/README.md +++ b/integration/logstash-plugins/logstash-output-vespa/README.md @@ -27,7 +27,9 @@ It looks like the JVM options from [here](https://github.com/logstash-plugins/.c are useful to make JRuby's `bundle install` work. ### Integration tests -To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information. +To run integration tests, you'll need to have a Vespa instance running with an app deployed that supports an "id" field. And Logstash installed. + +Check out the `integration-test` directory for more information. ``` cd integration-test @@ -113,6 +115,12 @@ output { # if the field doesn't exist, we generate a UUID id_field => "id" + # remove fields from the document after using them for writing + remove_id => false # if set to true, remove the ID field after using it + remove_namespace => false # would remove the namespace field (if dynamic) + remove_document_type => false # same for document type + remove_operation => false # and operation + # how many HTTP/2 connections to keep open max_connections => 1 # number of streams per connection diff --git a/integration/logstash-plugins/logstash-output-vespa/VERSION b/integration/logstash-plugins/logstash-output-vespa/VERSION index cb0c939a936..a918a2aa18d 100644 --- a/integration/logstash-plugins/logstash-output-vespa/VERSION +++ b/integration/logstash-plugins/logstash-output-vespa/VERSION @@ -1 +1 @@ -0.5.2 +0.6.0 diff --git a/integration/logstash-plugins/logstash-output-vespa/docs/index.asciidoc b/integration/logstash-plugins/logstash-output-vespa/docs/index.asciidoc index 29aff2f8cd5..c413e8df6b6 100644 --- a/integration/logstash-plugins/logstash-output-vespa/docs/index.asciidoc +++ b/integration/logstash-plugins/logstash-output-vespa/docs/index.asciidoc @@ -42,6 +42,10 @@ Writes documents to Vespa. | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -77,6 +81,14 @@ Make sure to set the `client_key` as well. The certificate should be in PEM form Corresponding private key for the `client_cert` certificate. It must not be password-protected, otherwise Logstash will fail to start complaining `Could not find private key in PEM file`. +[id="plugins-{type}s-{plugin}-id_field"] +===== `id_field` + +* Value type is <> +* Default value is `id` + +Field to get the document id from. If not present, a UUID will be generated + [id="plugins-{type}s-{plugin}-namespace"] ===== `namespace` @@ -104,20 +116,45 @@ Like `namespace`, this can be static or dynamic. So you can use `%{foo}` to use Operation to perform. Can be `put`, `update`, or `remove`. Dynamic values are supported (e.g. `%{foo}`). If not present, the plugin will use `put`. -[id="plugins-{type}s-{plugin}-create"] -===== `create` + +[id="plugins-{type}s-{plugin}-remove_id"] +===== `remove_id` * Value type is <> +* Default value is `false` -If set to `true`, the plugin will add the `create=true` parameter to the request. It works for `put` and `update` operations, but mostly used for `update` as https://docs.vespa.ai/en/document-v1-api-guide.html#upserts[upserts]. +If set to `true`, removes the ID field from the document after using it for document identification. -[id="plugins-{type}s-{plugin}-id_field"] -===== `id_field` +[id="plugins-{type}s-{plugin}-remove_namespace"] +===== `remove_namespace` -* Value type is <> -* Default value is `id` +* Value type is <> +* Default value is `false` -Field to get the document id from. If not present, a UUID will be generated +When using a dynamic namespace (e.g., `%{my_namespace}`), if set to `true`, removes the field containing the namespace value from the document after using it. + +[id="plugins-{type}s-{plugin}-remove_document_type"] +===== `remove_document_type` + +* Value type is <> +* Default value is `false` + +When using a dynamic document type (e.g., `%{my_doc_type}`), if set to `true`, removes the field containing the document type value from the document after using it. + +[id="plugins-{type}s-{plugin}-remove_operation"] +===== `remove_operation` + +* Value type is <> +* Default value is `false` + +When using a dynamic operation (e.g., `%{my_operation}`), if set to `true`, removes the field containing the operation value from the document after using it. + +[id="plugins-{type}s-{plugin}-create"] +===== `create` + +* Value type is <> + +If set to `true`, the plugin will add the `create=true` parameter to the request. It works for `put` and `update` operations, but mostly used for `update` as https://docs.vespa.ai/en/document-v1-api-guide.html#upserts[upserts]. [id="plugins-{type}s-{plugin}-max_retries"] ===== `max_retries` diff --git a/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb b/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb index 39d5bcc20b4..de5c514e365 100644 --- a/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb +++ b/integration/logstash-plugins/logstash-output-vespa/lib/logstash-output-vespa_feed_jars.rb @@ -2,4 +2,4 @@ # encoding: utf-8 require 'jar_dependencies' -require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.5.2') +require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.6.0') diff --git a/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java b/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java index 86e3a9d803e..bd05a230d1a 100644 --- a/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java +++ b/integration/logstash-plugins/logstash-output-vespa/src/main/java/org/logstashplugins/VespaFeed.java @@ -32,10 +32,21 @@ public class VespaFeed implements Output { PluginConfigSpec.uriSetting("vespa_url", "http://localhost:8080"); public static final PluginConfigSpec NAMESPACE = PluginConfigSpec.requiredStringSetting("namespace"); + // if namespace is set to %{field_name} or %{[field_name]}, it's dynamic + // if remove_namespace is true, the namespace is removed from the document + public static final PluginConfigSpec REMOVE_NAMESPACE = + PluginConfigSpec.booleanSetting("remove_namespace", false); public static final PluginConfigSpec DOCUMENT_TYPE = PluginConfigSpec.requiredStringSetting("document_type"); + // if remove_document_type is true, the document type is removed from the document (assuming it's dynamic) + public static final PluginConfigSpec REMOVE_DOCUMENT_TYPE = + PluginConfigSpec.booleanSetting("remove_document_type", false); + // field from the event to use as doc ID public static final PluginConfigSpec ID_FIELD = PluginConfigSpec.stringSetting("id_field", "id"); + // if remove_id is true, the id field is removed from the document + public static final PluginConfigSpec REMOVE_ID = + PluginConfigSpec.booleanSetting("remove_id", false); // client certificate and key public static final PluginConfigSpec CLIENT_CERT = @@ -46,6 +57,9 @@ public class VespaFeed implements Output { // put, update or remove public static final PluginConfigSpec OPERATION = PluginConfigSpec.stringSetting("operation", "put"); + // if remove_operation is true, the operation field is removed from the document (assuming it's dynamic) + public static final PluginConfigSpec REMOVE_OPERATION = + PluginConfigSpec.booleanSetting("remove_operation", false); // whether to add create=true to the put/update request public static final PluginConfigSpec CREATE = PluginConfigSpec.booleanSetting("create", false); @@ -76,15 +90,19 @@ public class VespaFeed implements Output { private final String id; private final String namespace; private final boolean dynamicNamespace; + private final boolean removeNamespace; private final String documentType; private final boolean dynamicDocumentType; + private final boolean removeDocumentType; private final String operation; private final boolean dynamicOperation; private final boolean create; private final String idField; + private final boolean removeId; private final long operationTimeout; private volatile boolean stopped = false; ObjectMapper objectMapper; + private final boolean removeOperation; public VespaFeed(final String id, final Configuration config, final Context context) { @@ -94,12 +112,12 @@ public VespaFeed(final String id, final Configuration config, final Context cont DynamicOption configOption = new DynamicOption(config.get(NAMESPACE)); dynamicNamespace = configOption.isDynamic(); namespace = configOption.getParsedConfigValue(); - + removeNamespace = config.get(REMOVE_NAMESPACE); // same with document type configOption = new DynamicOption(config.get(DOCUMENT_TYPE)); dynamicDocumentType = configOption.isDynamic(); documentType = configOption.getParsedConfigValue(); - + removeDocumentType = config.get(REMOVE_DOCUMENT_TYPE); // and operation configOption = new DynamicOption(config.get(OPERATION)); dynamicOperation = configOption.isDynamic(); @@ -110,7 +128,8 @@ public VespaFeed(final String id, final Configuration config, final Context cont operationTimeout = config.get(OPERATION_TIMEOUT); idField = config.get(ID_FIELD); - + removeId = config.get(REMOVE_ID); + this.removeOperation = config.get(REMOVE_OPERATION); FeedClientBuilder builder = FeedClientBuilder.create(config.get(VESPA_URL)) .setConnectionsPerEndpoint(config.get(MAX_CONNECTIONS).intValue()) .setMaxStreamPerConnection(config.get(MAX_STREAMS).intValue()) @@ -221,30 +240,35 @@ public void output(final Collection events) { } protected CompletableFuture asyncFeed(Event event) throws JsonProcessingException { - Map eventData = event.getData(); - - // we put the doc ID here + // try to get the doc ID from the event, otherwise generate a UUID String docIdStr; - - // see if the event has an ID field (as configured) - // if it does, use it as docIdStr. Otherwise, generate a UUID - if (eventData.containsKey(idField)) { - docIdStr = eventData.get(idField).toString(); - } else { + Object docIdObj = event.getField(idField); + if (docIdObj == null) { docIdStr = UUID.randomUUID().toString(); + } else { + docIdStr = docIdObj.toString(); + // Remove the ID field if configured to do so + if (removeId) { + event.remove(idField); + } } // if the namespace is dynamic, we need to resolve it - // the default (if we don't have such a field) is simply the name of the field String namespace = this.namespace; if (dynamicNamespace) { namespace = getDynamicField(event, this.namespace); + if (removeNamespace) { + event.remove(namespace); + } } // similar logic for the document type String documentType = this.documentType; if (dynamicDocumentType) { documentType = getDynamicField(event, this.documentType); + if (removeDocumentType) { + event.remove(documentType); + } } // and the operation @@ -256,6 +280,9 @@ protected CompletableFuture asyncFeed(Event event) throws JsonProcessing // TODO we should put this in the dead letter queue return null; } + if (removeOperation) { + event.remove(this.operation); + } } // add create=true, if applicable OperationParameters operationParameters = addCreateIfApplicable(operation, docIdStr); @@ -265,10 +292,9 @@ protected CompletableFuture asyncFeed(Event event) throws JsonProcessing DocumentId docId = DocumentId.of(namespace, documentType, docIdStr); - // create a document from the event data. We need an enclosing "fields" object - // to match the Vespa put format + // create a document from the event data Map doc = new HashMap<>(); - doc.put("fields", eventData); + doc.put("fields", event.getData()); // Use the modified eventData here // create the request to feed the document if (operation.equals("put")) { @@ -335,7 +361,9 @@ public void awaitStop() throws InterruptedException { @Override public Collection> configSchema() { - return List.of(VESPA_URL, CLIENT_CERT, CLIENT_KEY, OPERATION, CREATE, NAMESPACE, DOCUMENT_TYPE, ID_FIELD, + return List.of(VESPA_URL, CLIENT_CERT, CLIENT_KEY, OPERATION, CREATE, + NAMESPACE, REMOVE_NAMESPACE, DOCUMENT_TYPE, REMOVE_DOCUMENT_TYPE, ID_FIELD, REMOVE_ID, + REMOVE_OPERATION, MAX_CONNECTIONS, MAX_STREAMS, MAX_RETRIES, OPERATION_TIMEOUT, GRACE_PERIOD, DOOM_PERIOD); } diff --git a/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTest.java b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTest.java index 3a9e39a77f4..b35db276d19 100644 --- a/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTest.java +++ b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.Collection; +import java.util.Collections; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -42,83 +43,61 @@ public class VespaFeedTest { - private static Configuration createMockConfig(String operation, boolean create, boolean dynamicOperation) { - Configuration config = Mockito.mock(Configuration.class); - try { - // Set required config values - when(config.get(VespaFeed.VESPA_URL)).thenReturn(new URI("http://localhost:8080")); - when(config.get(VespaFeed.NAMESPACE)).thenReturn(dynamicOperation ? "%{namespace}" : "test-namespace"); - when(config.get(VespaFeed.DOCUMENT_TYPE)).thenReturn("test-doc-type"); - when(config.get(VespaFeed.OPERATION)).thenReturn(operation); - when(config.get(VespaFeed.CREATE)).thenReturn(create); - // Set defaults for other required config - when(config.get(VespaFeed.MAX_CONNECTIONS)).thenReturn(1L); - when(config.get(VespaFeed.MAX_STREAMS)).thenReturn(128L); - when(config.get(VespaFeed.MAX_RETRIES)).thenReturn(10L); - when(config.get(VespaFeed.OPERATION_TIMEOUT)).thenReturn(180L); - when(config.get(VespaFeed.GRACE_PERIOD)).thenReturn(10L); - when(config.get(VespaFeed.DOOM_PERIOD)).thenReturn(60L); - return config; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - @Test public void testValidateOperationAndCreate_ValidOperations() { // Test valid "put" operation - VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("put", false, false), null); feed.validateOperationAndCreate(); // Should not throw exception // Test valid "update" operation - feed = new VespaFeed("test-id", createMockConfig("update", false, false), null); + feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("update", false, false), null); feed.validateOperationAndCreate(); // Should not throw exception // Test valid "remove" operation - feed = new VespaFeed("test-id", createMockConfig("remove", false, false), null); + feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("remove", false, false), null); feed.validateOperationAndCreate(); // Should not throw exception } @Test(expected = IllegalArgumentException.class) public void testValidateOperationAndCreate_InvalidOperation() { - VespaFeed feed = new VespaFeed("test-id", createMockConfig("invalid", false, false), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("invalid", false, false), null); feed.validateOperationAndCreate(); } @Test(expected = IllegalArgumentException.class) public void testValidateOperationAndCreate_RemoveWithCreate() { - VespaFeed feed = new VespaFeed("test-id", createMockConfig("remove", true, false), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("remove", true, false), null); feed.validateOperationAndCreate(); } @Test public void testValidateOperationAndCreate_DynamicOperation() { // When operation is dynamic, validation should be skipped - VespaFeed feed = new VespaFeed("test-id", createMockConfig("%{operation}", false, true), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("%{operation}", false, true), null); feed.validateOperationAndCreate(); // Should not throw exception } @Test public void testConstructor_DynamicOptions() { // Test non-dynamic options - VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("put", false, false), null); assertFalse("Operation should not be dynamic", feed.isDynamicOperation()); assertEquals("Operation should be 'put'", "put", feed.getOperation()); // Test dynamic operation - feed = new VespaFeed("test-id", createMockConfig("%{operation}", false, true), null); + feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("%{operation}", false, true), null); assertTrue("Operation should be dynamic", feed.isDynamicOperation()); assertEquals("Operation field should be 'operation'", "operation", feed.getOperation()); // Test dynamic namespace - Configuration config = createMockConfig("put", false, false); + Configuration config = VespaFeedTestHelper.createMockConfig("put", false, false); when(config.get(VespaFeed.NAMESPACE)).thenReturn("%{my_namespace}"); feed = new VespaFeed("test-id", config, null); assertTrue("Namespace should be dynamic", feed.isDynamicNamespace()); assertEquals("Namespace field should be 'my_namespace'", "my_namespace", feed.getNamespace()); // Test dynamic document type - config = createMockConfig("put", false, false); + config = VespaFeedTestHelper.createMockConfig("put", false, false); when(config.get(VespaFeed.DOCUMENT_TYPE)).thenReturn("%{doc_type}"); feed = new VespaFeed("test-id", config, null); assertTrue("Document type should be dynamic", feed.isDynamicDocumentType()); @@ -128,18 +107,18 @@ public void testConstructor_DynamicOptions() { @Test public void testAddCreateIfApplicable() { // Test put operation with create=true - VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", true, false), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("put", true, false), null); OperationParameters params = feed.addCreateIfApplicable("put", "doc1"); assertTrue("Put operation should have createIfNonExistent=true", params.createIfNonExistent()); assertEquals("Timeout should be set", Duration.ofSeconds(180), params.timeout().get()); // Test update operation with create=true - feed = new VespaFeed("test-id", createMockConfig("update", true, false), null); + feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("update", true, false), null); params = feed.addCreateIfApplicable("update", "doc1"); assertTrue("Update operation should have createIfNonExistent=true", params.createIfNonExistent()); // Test put operation with create=false - feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("put", false, false), null); params = feed.addCreateIfApplicable("put", "doc1"); assertFalse("Put operation should not have createIfNonExistent when create=false", params.createIfNonExistent()); } @@ -147,7 +126,7 @@ public void testAddCreateIfApplicable() { @Test public void testConstructor_OperationValidation() { // Test invalid operation - Configuration config = createMockConfig("invalid_op", false, false); + Configuration config = VespaFeedTestHelper.createMockConfig("invalid_op", false, false); try { new VespaFeed("test-id", config, null); fail("Should throw IllegalArgumentException for invalid operation"); @@ -156,7 +135,7 @@ public void testConstructor_OperationValidation() { } // Test remove with create=true - config = createMockConfig("remove", true, false); + config = VespaFeedTestHelper.createMockConfig("remove", true, false); try { new VespaFeed("test-id", config, null); fail("Should throw IllegalArgumentException for remove with create=true"); @@ -165,18 +144,18 @@ public void testConstructor_OperationValidation() { } // Test that create=true is allowed for put and update - config = createMockConfig("put", true, false); + config = VespaFeedTestHelper.createMockConfig("put", true, false); VespaFeed feed = new VespaFeed("test-id", config, null); assertTrue("Create should be true for put operation", feed.isCreate()); - config = createMockConfig("update", true, false); + config = VespaFeedTestHelper.createMockConfig("update", true, false); feed = new VespaFeed("test-id", config, null); assertTrue("Create should be true for update operation", feed.isCreate()); } @Test public void testAddCertAndKeyToBuilder() throws IOException { - Configuration config = createMockConfig("put", false, false); + Configuration config = VespaFeedTestHelper.createMockConfig("put", false, false); FeedClientBuilder builder = FeedClientBuilder.create(URI.create("http://localhost:8080")); // Create temporary cert and key files @@ -206,11 +185,11 @@ public void testAddCertAndKeyToBuilder() throws IOException { @Test public void testGetDynamicField() { // Create a mock Event - Event event = createMockEvent("test-id", "field_value"); + Event event = VespaFeedTestHelper.createMockEvent("test-id", "field_value"); // Test when field exists when(event.getField("my_field")).thenReturn("field_value"); - VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("put", false, false), null); assertEquals("Should return field value", "field_value", feed.getDynamicField(event, "my_field")); // Test when field doesn't exist @@ -221,7 +200,7 @@ public void testGetDynamicField() { @Test public void testToJson() throws Exception { - VespaFeed feed = new VespaFeed("test-id", createMockConfig("put", false, false), null); + VespaFeed feed = new VespaFeed("test-id", VespaFeedTestHelper.createMockConfig("put", false, false), null); ObjectMapper mapper = new ObjectMapper(); // Test simple map @@ -254,23 +233,23 @@ public void testToJson() throws Exception { @Test public void testAsyncFeed_PutOperation() throws Exception { - FeedClient mockClient = createMockFeedClient(); - VespaFeed feed = createVespaFeed("put", true, false, mockClient); - Event event = createMockEvent("test-doc-1", "value1"); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", true, false, mockClient); + Event event = VespaFeedTestHelper.createMockEvent("test-doc-1", "value1"); CompletableFuture future = feed.asyncFeed(event); assertEquals(Result.Type.success, future.get().type()); - verifyDocument(mockClient, "test-doc-1", "value1"); + VespaFeedTestHelper.verifyDocument(mockClient, "test-doc-1", "value1"); } @Test public void testAsyncFeed_DynamicOperation() throws Exception { - FeedClient mockClient = createMockFeedClient(); - VespaFeed feed = createVespaFeed("%{operation}", false, true, mockClient); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("%{operation}", false, true, mockClient); // Create test event - Event event = createMockEvent("test-doc-1", "value1"); + Event event = VespaFeedTestHelper.createMockEvent("test-doc-1", "value1"); when(event.getField("operation")).thenReturn("update"); CompletableFuture future = feed.asyncFeed(event); @@ -285,11 +264,11 @@ public void testAsyncFeed_DynamicOperation() throws Exception { @Test public void testAsyncFeed_UUIDGeneration() throws Exception { - FeedClient mockClient = createMockFeedClient(); - VespaFeed feed = createVespaFeed("put", false, false, mockClient); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, mockClient); // Create event without ID - Event event = createMockEvent(null, "value1"); + Event event = VespaFeedTestHelper.createMockEvent(null, "value1"); CompletableFuture future = feed.asyncFeed(event); assertEquals(Result.Type.success, future.get().type()); @@ -303,13 +282,13 @@ public void testAsyncFeed_UUIDGeneration() throws Exception { @Test public void testOutput_SuccessfulBatch() throws Exception { - FeedClient mockClient = createMockFeedClient(); - VespaFeed feed = createVespaFeed("put", false, false, mockClient); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, mockClient); // Create test events List events = Arrays.asList( - createMockEvent("doc1", "value1"), - createMockEvent("doc2", "value2") + VespaFeedTestHelper.createMockEvent("doc1", "value1"), + VespaFeedTestHelper.createMockEvent("doc2", "value2") ); feed.output(events); @@ -318,8 +297,8 @@ public void testOutput_SuccessfulBatch() throws Exception { @Test public void testOutput_JsonSerializationError() throws Exception { - FeedClient mockClient = createMockFeedClient(); - VespaFeed feed = createVespaFeed("put", false, false, mockClient); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, mockClient); // Create event that will cause serialization error Event badEvent = Mockito.mock(Event.class); @@ -337,29 +316,29 @@ public void testOutput_JsonSerializationError() throws Exception { @Test public void testOutput_FeedClientError() throws Exception { - FeedClient mockClient = createMockFeedClient(); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); Result errorResult = Mockito.mock(Result.class); when(errorResult.type()).thenReturn(Result.Type.conditionNotMet); CompletableFuture errorFuture = CompletableFuture.completedFuture(errorResult); when(mockClient.put(any(), any(), any())).thenReturn(errorFuture); - VespaFeed feed = createVespaFeed("put", false, false, mockClient); - feed.output(Arrays.asList(createMockEvent("doc1", "value1"))); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, mockClient); + feed.output(Arrays.asList(VespaFeedTestHelper.createMockEvent("doc1", "value1"))); // The test passes if no exception is thrown } @Test public void testOutput_StoppedBehavior() throws Exception { - FeedClient mockClient = createMockFeedClient(); - VespaFeed feed = createVespaFeed("put", false, false, mockClient); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, mockClient); // Stop the feed feed.stop(); // Create test events List events = Arrays.asList( - createMockEvent("doc1", "value1"), - createMockEvent("doc2", "value2") + VespaFeedTestHelper.createMockEvent("doc1", "value1"), + VespaFeedTestHelper.createMockEvent("doc2", "value2") ); // Test that output doesn't process events when stopped @@ -369,20 +348,20 @@ public void testOutput_StoppedBehavior() throws Exception { @Test public void testOutput_MultiFeedException() throws Exception { - FeedClient mockClient = createMockFeedClient(); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new RuntimeException("Feed error")); when(mockClient.put(any(), any(), any())).thenReturn(future); - VespaFeed feed = createVespaFeed("put", false, false, mockClient); - feed.output(Arrays.asList(createMockEvent("doc1", "value1"))); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, mockClient); + feed.output(Arrays.asList(VespaFeedTestHelper.createMockEvent("doc1", "value1"))); // Test passes if error is logged but not thrown } @Test public void testAwaitStop() throws Exception { - FeedClient mockClient = createMockFeedClient(); - VespaFeed feed = createVespaFeed("put", false, false, mockClient); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, mockClient); feed.awaitStop(); // Should do nothing verify(mockClient, times(0)).close(); @@ -395,7 +374,7 @@ public void testAwaitStop() throws Exception { public void testConfigSchema() { //makes sure that all config options are present in the schema - VespaFeed feed = createVespaFeed("put", false, false, null); + VespaFeed feed = VespaFeedTestHelper.createVespaFeed("put", false, false, null); Collection> schema = feed.configSchema(); assertTrue(schema.contains(VespaFeed.VESPA_URL)); @@ -404,51 +383,104 @@ public void testConfigSchema() { assertTrue(schema.contains(VespaFeed.OPERATION)); assertTrue(schema.contains(VespaFeed.CREATE)); assertTrue(schema.contains(VespaFeed.NAMESPACE)); + assertTrue(schema.contains(VespaFeed.REMOVE_NAMESPACE)); assertTrue(schema.contains(VespaFeed.DOCUMENT_TYPE)); + assertTrue(schema.contains(VespaFeed.REMOVE_DOCUMENT_TYPE)); assertTrue(schema.contains(VespaFeed.ID_FIELD)); + assertTrue(schema.contains(VespaFeed.REMOVE_ID)); assertTrue(schema.contains(VespaFeed.MAX_CONNECTIONS)); assertTrue(schema.contains(VespaFeed.MAX_STREAMS)); assertTrue(schema.contains(VespaFeed.MAX_RETRIES)); assertTrue(schema.contains(VespaFeed.OPERATION_TIMEOUT)); assertTrue(schema.contains(VespaFeed.GRACE_PERIOD)); assertTrue(schema.contains(VespaFeed.DOOM_PERIOD)); - } - private Event createMockEvent(String docId, String value) { - Event event = Mockito.mock(Event.class); - Map eventData = new HashMap<>(); - eventData.put("field1", value); - if (docId != null) { - eventData.put("doc_id", docId); - } - when(event.getData()).thenReturn(eventData); - when(event.getField("doc_id")).thenReturn(docId); - return event; } + + @Test + public void testExplicitIdField() throws Exception { + Configuration config = VespaFeedTestHelper.createMockConfig("put", false, false); + when(config.get(VespaFeed.ID_FIELD)).thenReturn("id"); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed output = new VespaFeed("test-id", config, null, mockClient); - private FeedClient createMockFeedClient() { - FeedClient mockClient = Mockito.mock(FeedClient.class); - Result mockResult = Mockito.mock(Result.class); - when(mockResult.type()).thenReturn(Result.Type.success); - CompletableFuture successFuture = CompletableFuture.completedFuture(mockResult); - when(mockClient.put(any(), any(), any())).thenReturn(successFuture); - when(mockClient.update(any(), any(), any())).thenReturn(successFuture); - when(mockClient.remove(any(), any())).thenReturn(successFuture); - return mockClient; - } + Event event = VespaFeedTestHelper.createMockEvent("doc1", "value1"); + when(event.getField("id")).thenReturn("doc1"); - private VespaFeed createVespaFeed(String operation, boolean create, boolean dynamicOperation, FeedClient mockClient) { - Configuration config = createMockConfig(operation, create, dynamicOperation); - when(config.get(VespaFeed.ID_FIELD)).thenReturn("doc_id"); - when(config.get(VespaFeed.NAMESPACE)).thenReturn("test-namespace"); - return new VespaFeed("test-id", config, null, mockClient); + output.output(Collections.singletonList(event)); + + // Verify the document ID was still taken from the event's "id" field + verify(mockClient).put( + argThat(docId -> docId.userSpecific().equals("doc1")), + any(), + any() + ); } - private void verifyDocument(FeedClient mockClient, String docId, String value) { + @Test + public void testIdFieldMissing() throws Exception { + Configuration config = VespaFeedTestHelper.createMockConfig("put", false, false); + when(config.get(VespaFeed.ID_FIELD)).thenReturn("missing_field"); // ID field that doesn't exist + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed output = new VespaFeed("test-id", config, null, mockClient); + Event event = VespaFeedTestHelper.createMockEvent("doc1", "value1"); + output.output(Collections.singletonList(event)); + + // Verify a UUID was generated (matches UUID pattern) verify(mockClient).put( - eq(DocumentId.of("test-namespace", "test-doc-type", docId)), - contains("\"fields\":{\"field1\":\"" + value + "\",\"doc_id\":\"" + docId + "\"}"), - any(OperationParameters.class) + argThat(docId -> docId.toString().matches("id:test-namespace:test-doc-type::[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")), + any(), + any() ); } + + @Test + public void testRemoveId() throws Exception { + Configuration config = VespaFeedTestHelper.createMockConfig("put", false, false); + when(config.get(VespaFeed.REMOVE_ID)).thenReturn(true); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed output = new VespaFeed("test-id", config, null, mockClient); + + Event event = VespaFeedTestHelper.createMockEvent("doc1", "value1", "doc_id", "doc1"); + output.output(Collections.singletonList(event)); + verify(event).remove("doc_id"); + } + + @Test + public void testRemoveNamespace() throws Exception { + Configuration config = VespaFeedTestHelper.createMockConfig("put", false, false); + when(config.get(VespaFeed.NAMESPACE)).thenReturn("%{namespace_field}"); + when(config.get(VespaFeed.REMOVE_NAMESPACE)).thenReturn(true); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed output = new VespaFeed("test-id", config, null, mockClient); + + Event event = VespaFeedTestHelper.createMockEvent("doc1", "value1", "namespace_field", "test-ns"); + output.output(Collections.singletonList(event)); + verify(event).remove("test-ns"); + } + + @Test + public void testRemoveDocumentType() throws Exception { + Configuration config = VespaFeedTestHelper.createMockConfig("put", false, false); + when(config.get(VespaFeed.DOCUMENT_TYPE)).thenReturn("%{doc_type_field}"); + when(config.get(VespaFeed.REMOVE_DOCUMENT_TYPE)).thenReturn(true); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed output = new VespaFeed("test-id", config, null, mockClient); + + Event event = VespaFeedTestHelper.createMockEvent("doc1", "value1", "doc_type_field", "test-doc-type"); + output.output(Collections.singletonList(event)); + verify(event).remove("test-doc-type"); + } + + @Test + public void testRemoveOperation() throws Exception { + Configuration config = VespaFeedTestHelper.createMockConfig("%{operation_field}", false, true); + when(config.get(VespaFeed.REMOVE_OPERATION)).thenReturn(true); + FeedClient mockClient = VespaFeedTestHelper.createMockFeedClient(); + VespaFeed output = new VespaFeed("test-id", config, null, mockClient); + + Event event = VespaFeedTestHelper.createMockEvent("doc1", "value1", "operation_field", "put"); + output.output(Collections.singletonList(event)); + verify(event).remove("operation_field"); + } } \ No newline at end of file diff --git a/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTestHelper.java b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTestHelper.java new file mode 100644 index 00000000000..15cbfd060ab --- /dev/null +++ b/integration/logstash-plugins/logstash-output-vespa/src/test/java/org/logstashplugins/VespaFeedTestHelper.java @@ -0,0 +1,95 @@ +package org.logstashplugins; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Event; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class VespaFeedTestHelper { + + public static Configuration createMockConfig(String operation, boolean create, boolean dynamicOperation) { + Configuration config = mock(Configuration.class); + try { + // Set required config values + when(config.get(VespaFeed.VESPA_URL)).thenReturn(new URI("http://localhost:8080")); + when(config.get(VespaFeed.NAMESPACE)).thenReturn(dynamicOperation ? "%{namespace}" : "test-namespace"); + when(config.get(VespaFeed.DOCUMENT_TYPE)).thenReturn("test-doc-type"); + when(config.get(VespaFeed.ID_FIELD)).thenReturn("doc_id"); + when(config.get(VespaFeed.REMOVE_ID)).thenReturn(false); + when(config.get(VespaFeed.REMOVE_NAMESPACE)).thenReturn(false); + when(config.get(VespaFeed.REMOVE_DOCUMENT_TYPE)).thenReturn(false); + when(config.get(VespaFeed.OPERATION)).thenReturn(operation); + when(config.get(VespaFeed.CREATE)).thenReturn(create); + when(config.get(VespaFeed.REMOVE_OPERATION)).thenReturn(false); + // Set defaults for other required config + when(config.get(VespaFeed.MAX_CONNECTIONS)).thenReturn(1L); + when(config.get(VespaFeed.MAX_STREAMS)).thenReturn(128L); + when(config.get(VespaFeed.MAX_RETRIES)).thenReturn(10L); + when(config.get(VespaFeed.OPERATION_TIMEOUT)).thenReturn(180L); + when(config.get(VespaFeed.GRACE_PERIOD)).thenReturn(10L); + when(config.get(VespaFeed.DOOM_PERIOD)).thenReturn(60L); + return config; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static Event createMockEvent(String docId, String value, String additionalFieldName, String additionalFieldValue) { + Event event = mock(Event.class); + Map eventData = new HashMap<>(); + eventData.put("field1", value); + if (docId != null) { + eventData.put("doc_id", docId); + } + if (additionalFieldName != null && additionalFieldValue != null) { + eventData.put(additionalFieldName, additionalFieldValue); + when(event.getField(additionalFieldName)).thenReturn(additionalFieldValue); + when(event.remove(additionalFieldName)).thenReturn(additionalFieldName); + } + + when(event.getData()).thenReturn(eventData); + when(event.getField("doc_id")).thenReturn(docId); + return event; + } + + public static Event createMockEvent(String docId, String value) { + return createMockEvent(docId, value, null, null); + } + + public static FeedClient createMockFeedClient() { + FeedClient mockClient = mock(FeedClient.class); + Result mockResult = mock(Result.class); + when(mockResult.type()).thenReturn(Result.Type.success); + CompletableFuture successFuture = CompletableFuture.completedFuture(mockResult); + when(mockClient.put(any(), any(), any())).thenReturn(successFuture); + when(mockClient.update(any(), any(), any())).thenReturn(successFuture); + when(mockClient.remove(any(), any())).thenReturn(successFuture); + return mockClient; + } + + public static VespaFeed createVespaFeed(String operation, boolean create, boolean dynamicOperation, FeedClient mockClient) { + Configuration config = createMockConfig(operation, create, dynamicOperation); + when(config.get(VespaFeed.ID_FIELD)).thenReturn("doc_id"); + when(config.get(VespaFeed.NAMESPACE)).thenReturn("test-namespace"); + return new VespaFeed("test-id", config, null, mockClient); + } + + public static void verifyDocument(FeedClient mockClient, String docId, String value) { + org.mockito.Mockito.verify(mockClient).put( + org.mockito.ArgumentMatchers.eq(DocumentId.of("test-namespace", "test-doc-type", docId)), + org.mockito.ArgumentMatchers.contains("\"fields\":{\"field1\":\"" + value + "\",\"doc_id\":\"" + docId + "\"}"), + any(OperationParameters.class) + ); + } +} \ No newline at end of file