Skip to content

Commit

Permalink
[logstash-plugins] output: added options to remove metadata fields fr…
Browse files Browse the repository at this point in the history
…om documents after using them for put/update
  • Loading branch information
radu-gheorghe committed Dec 27, 2024
1 parent 801d6d2 commit 64899ec
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 131 deletions.
4 changes: 3 additions & 1 deletion integration/logstash-plugins/logstash-input-vespa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ export LOGSTASH_SOURCE=1
bundle exec rspec
```

To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information.
To run integration tests, you'll need to have a Vespa instance running with an app deployed that supports an "id" field. And Logstash installed.

Check out the `integration-test` directory for more information.

```
cd integration-test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.6.0
- adds options to remove fields after using them for writing

## 0.5.0
- supports update and remove operations

Expand Down
10 changes: 9 additions & 1 deletion integration/logstash-plugins/logstash-output-vespa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ It looks like the JVM options from [here](https://github.com/logstash-plugins/.c
are useful to make JRuby's `bundle install` work.

### Integration tests
To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information.
To run integration tests, you'll need to have a Vespa instance running with an app deployed that supports an "id" field. And Logstash installed.

Check out the `integration-test` directory for more information.

```
cd integration-test
Expand Down Expand Up @@ -113,6 +115,12 @@ output {
# if the field doesn't exist, we generate a UUID
id_field => "id"
# remove fields from the document after using them for writing
remove_id => false # if set to true, remove the ID field after using it
remove_namespace => false # would remove the namespace field (if dynamic)
remove_document_type => false # same for document type
remove_operation => false # and operation
# how many HTTP/2 connections to keep open
max_connections => 1
# number of streams per connection
Expand Down
2 changes: 1 addition & 1 deletion integration/logstash-plugins/logstash-output-vespa/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.2
0.6.0
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ Writes documents to Vespa.
| <<plugins-{type}s-{plugin}-operation>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-create>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-id_field>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-remove_id>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-remove_namespace>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-remove_document_type>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-remove_operation>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-max_retries>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_connections>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_streams>> |<<number,number>>|No
Expand Down Expand Up @@ -77,6 +81,14 @@ Make sure to set the `client_key` as well. The certificate should be in PEM form

Corresponding private key for the `client_cert` certificate. It must not be password-protected, otherwise Logstash will fail to start complaining `Could not find private key in PEM file`.

[id="plugins-{type}s-{plugin}-id_field"]
===== `id_field`

* Value type is <<string,string>>
* Default value is `id`

Field to get the document id from. If not present, a UUID will be generated

[id="plugins-{type}s-{plugin}-namespace"]
===== `namespace`

Expand Down Expand Up @@ -104,20 +116,45 @@ Like `namespace`, this can be static or dynamic. So you can use `%{foo}` to use

Operation to perform. Can be `put`, `update`, or `remove`. Dynamic values are supported (e.g. `%{foo}`). If not present, the plugin will use `put`.

[id="plugins-{type}s-{plugin}-create"]
===== `create`

[id="plugins-{type}s-{plugin}-remove_id"]
===== `remove_id`

* Value type is <<boolean,boolean>>
* Default value is `false`

If set to `true`, the plugin will add the `create=true` parameter to the request. It works for `put` and `update` operations, but mostly used for `update` as https://docs.vespa.ai/en/document-v1-api-guide.html#upserts[upserts].
If set to `true`, removes the ID field from the document after using it for document identification.

[id="plugins-{type}s-{plugin}-id_field"]
===== `id_field`
[id="plugins-{type}s-{plugin}-remove_namespace"]
===== `remove_namespace`

* Value type is <<string,string>>
* Default value is `id`
* Value type is <<boolean,boolean>>
* Default value is `false`

Field to get the document id from. If not present, a UUID will be generated
When using a dynamic namespace (e.g., `%{my_namespace}`), if set to `true`, removes the field containing the namespace value from the document after using it.

[id="plugins-{type}s-{plugin}-remove_document_type"]
===== `remove_document_type`

* Value type is <<boolean,boolean>>
* Default value is `false`

When using a dynamic document type (e.g., `%{my_doc_type}`), if set to `true`, removes the field containing the document type value from the document after using it.

[id="plugins-{type}s-{plugin}-remove_operation"]
===== `remove_operation`

* Value type is <<boolean,boolean>>
* Default value is `false`

When using a dynamic operation (e.g., `%{my_operation}`), if set to `true`, removes the field containing the operation value from the document after using it.

[id="plugins-{type}s-{plugin}-create"]
===== `create`

* Value type is <<boolean,boolean>>

If set to `true`, the plugin will add the `create=true` parameter to the request. It works for `put` and `update` operations, but mostly used for `update` as https://docs.vespa.ai/en/document-v1-api-guide.html#upserts[upserts].

[id="plugins-{type}s-{plugin}-max_retries"]
===== `max_retries`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# encoding: utf-8

require 'jar_dependencies'
require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.5.2')
require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.6.0')
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ public class VespaFeed implements Output {
PluginConfigSpec.uriSetting("vespa_url", "http://localhost:8080");
public static final PluginConfigSpec<String> NAMESPACE =
PluginConfigSpec.requiredStringSetting("namespace");
// if namespace is set to %{field_name} or %{[field_name]}, it's dynamic
// if remove_namespace is true, the namespace is removed from the document
public static final PluginConfigSpec<Boolean> REMOVE_NAMESPACE =
PluginConfigSpec.booleanSetting("remove_namespace", false);
public static final PluginConfigSpec<String> DOCUMENT_TYPE =
PluginConfigSpec.requiredStringSetting("document_type");
// if remove_document_type is true, the document type is removed from the document (assuming it's dynamic)
public static final PluginConfigSpec<Boolean> REMOVE_DOCUMENT_TYPE =
PluginConfigSpec.booleanSetting("remove_document_type", false);
// field from the event to use as doc ID
public static final PluginConfigSpec<String> ID_FIELD =
PluginConfigSpec.stringSetting("id_field", "id");
// if remove_id is true, the id field is removed from the document
public static final PluginConfigSpec<Boolean> REMOVE_ID =
PluginConfigSpec.booleanSetting("remove_id", false);

// client certificate and key
public static final PluginConfigSpec<String> CLIENT_CERT =
Expand All @@ -46,6 +57,9 @@ public class VespaFeed implements Output {
// put, update or remove
public static final PluginConfigSpec<String> OPERATION =
PluginConfigSpec.stringSetting("operation", "put");
// if remove_operation is true, the operation field is removed from the document (assuming it's dynamic)
public static final PluginConfigSpec<Boolean> REMOVE_OPERATION =
PluginConfigSpec.booleanSetting("remove_operation", false);
// whether to add create=true to the put/update request
public static final PluginConfigSpec<Boolean> CREATE =
PluginConfigSpec.booleanSetting("create", false);
Expand Down Expand Up @@ -76,15 +90,19 @@ public class VespaFeed implements Output {
private final String id;
private final String namespace;
private final boolean dynamicNamespace;
private final boolean removeNamespace;
private final String documentType;
private final boolean dynamicDocumentType;
private final boolean removeDocumentType;
private final String operation;
private final boolean dynamicOperation;
private final boolean create;
private final String idField;
private final boolean removeId;
private final long operationTimeout;
private volatile boolean stopped = false;
ObjectMapper objectMapper;
private final boolean removeOperation;


public VespaFeed(final String id, final Configuration config, final Context context) {
Expand All @@ -94,12 +112,12 @@ public VespaFeed(final String id, final Configuration config, final Context cont
DynamicOption configOption = new DynamicOption(config.get(NAMESPACE));
dynamicNamespace = configOption.isDynamic();
namespace = configOption.getParsedConfigValue();

removeNamespace = config.get(REMOVE_NAMESPACE);
// same with document type
configOption = new DynamicOption(config.get(DOCUMENT_TYPE));
dynamicDocumentType = configOption.isDynamic();
documentType = configOption.getParsedConfigValue();

removeDocumentType = config.get(REMOVE_DOCUMENT_TYPE);
// and operation
configOption = new DynamicOption(config.get(OPERATION));
dynamicOperation = configOption.isDynamic();
Expand All @@ -110,7 +128,8 @@ public VespaFeed(final String id, final Configuration config, final Context cont
operationTimeout = config.get(OPERATION_TIMEOUT);

idField = config.get(ID_FIELD);

removeId = config.get(REMOVE_ID);
this.removeOperation = config.get(REMOVE_OPERATION);
FeedClientBuilder builder = FeedClientBuilder.create(config.get(VESPA_URL))
.setConnectionsPerEndpoint(config.get(MAX_CONNECTIONS).intValue())
.setMaxStreamPerConnection(config.get(MAX_STREAMS).intValue())
Expand Down Expand Up @@ -221,30 +240,35 @@ public void output(final Collection<Event> events) {
}

protected CompletableFuture<Result> asyncFeed(Event event) throws JsonProcessingException {
Map<String, Object> eventData = event.getData();

// we put the doc ID here
// try to get the doc ID from the event, otherwise generate a UUID
String docIdStr;

// see if the event has an ID field (as configured)
// if it does, use it as docIdStr. Otherwise, generate a UUID
if (eventData.containsKey(idField)) {
docIdStr = eventData.get(idField).toString();
} else {
Object docIdObj = event.getField(idField);
if (docIdObj == null) {
docIdStr = UUID.randomUUID().toString();
} else {
docIdStr = docIdObj.toString();
// Remove the ID field if configured to do so
if (removeId) {
event.remove(idField);
}
}

// if the namespace is dynamic, we need to resolve it
// the default (if we don't have such a field) is simply the name of the field
String namespace = this.namespace;
if (dynamicNamespace) {
namespace = getDynamicField(event, this.namespace);
if (removeNamespace) {
event.remove(namespace);
}
}

// similar logic for the document type
String documentType = this.documentType;
if (dynamicDocumentType) {
documentType = getDynamicField(event, this.documentType);
if (removeDocumentType) {
event.remove(documentType);
}
}

// and the operation
Expand All @@ -256,6 +280,9 @@ protected CompletableFuture<Result> asyncFeed(Event event) throws JsonProcessing
// TODO we should put this in the dead letter queue
return null;
}
if (removeOperation) {
event.remove(this.operation);
}
}
// add create=true, if applicable
OperationParameters operationParameters = addCreateIfApplicable(operation, docIdStr);
Expand All @@ -265,10 +292,9 @@ protected CompletableFuture<Result> asyncFeed(Event event) throws JsonProcessing
DocumentId docId = DocumentId.of(namespace,
documentType, docIdStr);

// create a document from the event data. We need an enclosing "fields" object
// to match the Vespa put format
// create a document from the event data
Map<String,Object> doc = new HashMap<>();
doc.put("fields", eventData);
doc.put("fields", event.getData()); // Use the modified eventData here

// create the request to feed the document
if (operation.equals("put")) {
Expand Down Expand Up @@ -335,7 +361,9 @@ public void awaitStop() throws InterruptedException {

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return List.of(VESPA_URL, CLIENT_CERT, CLIENT_KEY, OPERATION, CREATE, NAMESPACE, DOCUMENT_TYPE, ID_FIELD,
return List.of(VESPA_URL, CLIENT_CERT, CLIENT_KEY, OPERATION, CREATE,
NAMESPACE, REMOVE_NAMESPACE, DOCUMENT_TYPE, REMOVE_DOCUMENT_TYPE, ID_FIELD, REMOVE_ID,
REMOVE_OPERATION,
MAX_CONNECTIONS, MAX_STREAMS, MAX_RETRIES, OPERATION_TIMEOUT, GRACE_PERIOD, DOOM_PERIOD);
}

Expand Down
Loading

0 comments on commit 64899ec

Please sign in to comment.