Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[logstash-plugins] output: added options to remove metadata fields #33059

Merged
merged 1 commit into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion integration/logstash-plugins/logstash-input-vespa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ export LOGSTASH_SOURCE=1
bundle exec rspec
```

To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information.
To run integration tests, you'll need to have a Vespa instance running with an app deployed that supports an "id" field. And Logstash installed.

Check out the `integration-test` directory for more information.

```
cd integration-test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.6.0
- adds options to remove fields after using them for writing

## 0.5.0
- supports update and remove operations

Expand Down
10 changes: 9 additions & 1 deletion integration/logstash-plugins/logstash-output-vespa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ It looks like the JVM options from [here](https://github.com/logstash-plugins/.c
are useful to make JRuby's `bundle install` work.

### Integration tests
To run integration tests, you'll need to have a Vespa instance running + Logstash installed. Check out the `integration-test` directory for more information.
To run integration tests, you'll need to have a Vespa instance running with an app deployed that supports an "id" field. And Logstash installed.

Check out the `integration-test` directory for more information.

```
cd integration-test
Expand Down Expand Up @@ -113,6 +115,12 @@ output {
# if the field doesn't exist, we generate a UUID
id_field => "id"

# remove fields from the document after using them for writing
remove_id => false # if set to true, remove the ID field after using it
remove_namespace => false # would remove the namespace field (if dynamic)
remove_document_type => false # same for document type
remove_operation => false # and operation

# how many HTTP/2 connections to keep open
max_connections => 1
# number of streams per connection
Expand Down
2 changes: 1 addition & 1 deletion integration/logstash-plugins/logstash-output-vespa/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.2
0.6.0
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ Writes documents to Vespa.
| <<plugins-{type}s-{plugin}-operation>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-create>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-id_field>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-remove_id>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-remove_namespace>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-remove_document_type>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-remove_operation>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-max_retries>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_connections>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_streams>> |<<number,number>>|No
Expand Down Expand Up @@ -77,6 +81,14 @@ Make sure to set the `client_key` as well. The certificate should be in PEM form

Corresponding private key for the `client_cert` certificate. It must not be password-protected, otherwise Logstash will fail to start complaining `Could not find private key in PEM file`.

[id="plugins-{type}s-{plugin}-id_field"]
===== `id_field`

* Value type is <<string,string>>
* Default value is `id`

Field to get the document id from. If not present, a UUID will be generated

[id="plugins-{type}s-{plugin}-namespace"]
===== `namespace`

Expand Down Expand Up @@ -104,20 +116,45 @@ Like `namespace`, this can be static or dynamic. So you can use `%{foo}` to use

Operation to perform. Can be `put`, `update`, or `remove`. Dynamic values are supported (e.g. `%{foo}`). If not present, the plugin will use `put`.

[id="plugins-{type}s-{plugin}-create"]
===== `create`

[id="plugins-{type}s-{plugin}-remove_id"]
===== `remove_id`

* Value type is <<boolean,boolean>>
* Default value is `false`

If set to `true`, the plugin will add the `create=true` parameter to the request. It works for `put` and `update` operations, but mostly used for `update` as https://docs.vespa.ai/en/document-v1-api-guide.html#upserts[upserts].
If set to `true`, removes the ID field from the document after using it for document identification.

[id="plugins-{type}s-{plugin}-id_field"]
===== `id_field`
[id="plugins-{type}s-{plugin}-remove_namespace"]
===== `remove_namespace`

* Value type is <<string,string>>
* Default value is `id`
* Value type is <<boolean,boolean>>
* Default value is `false`

Field to get the document id from. If not present, a UUID will be generated
When using a dynamic namespace (e.g., `%{my_namespace}`), if set to `true`, removes the field containing the namespace value from the document after using it.

[id="plugins-{type}s-{plugin}-remove_document_type"]
===== `remove_document_type`

* Value type is <<boolean,boolean>>
* Default value is `false`

When using a dynamic document type (e.g., `%{my_doc_type}`), if set to `true`, removes the field containing the document type value from the document after using it.

[id="plugins-{type}s-{plugin}-remove_operation"]
===== `remove_operation`

* Value type is <<boolean,boolean>>
* Default value is `false`

When using a dynamic operation (e.g., `%{my_operation}`), if set to `true`, removes the field containing the operation value from the document after using it.

[id="plugins-{type}s-{plugin}-create"]
===== `create`

* Value type is <<boolean,boolean>>

If set to `true`, the plugin will add the `create=true` parameter to the request. It works for `put` and `update` operations, but mostly used for `update` as https://docs.vespa.ai/en/document-v1-api-guide.html#upserts[upserts].

[id="plugins-{type}s-{plugin}-max_retries"]
===== `max_retries`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# encoding: utf-8

require 'jar_dependencies'
require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.5.2')
require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.6.0')
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ public class VespaFeed implements Output {
PluginConfigSpec.uriSetting("vespa_url", "http://localhost:8080");
public static final PluginConfigSpec<String> NAMESPACE =
PluginConfigSpec.requiredStringSetting("namespace");
// if namespace is set to %{field_name} or %{[field_name]}, it's dynamic
// if remove_namespace is true, the namespace is removed from the document
public static final PluginConfigSpec<Boolean> REMOVE_NAMESPACE =
PluginConfigSpec.booleanSetting("remove_namespace", false);
public static final PluginConfigSpec<String> DOCUMENT_TYPE =
PluginConfigSpec.requiredStringSetting("document_type");
// if remove_document_type is true, the document type is removed from the document (assuming it's dynamic)
public static final PluginConfigSpec<Boolean> REMOVE_DOCUMENT_TYPE =
PluginConfigSpec.booleanSetting("remove_document_type", false);
// field from the event to use as doc ID
public static final PluginConfigSpec<String> ID_FIELD =
PluginConfigSpec.stringSetting("id_field", "id");
// if remove_id is true, the id field is removed from the document
public static final PluginConfigSpec<Boolean> REMOVE_ID =
PluginConfigSpec.booleanSetting("remove_id", false);

// client certificate and key
public static final PluginConfigSpec<String> CLIENT_CERT =
Expand All @@ -46,6 +57,9 @@ public class VespaFeed implements Output {
// put, update or remove
public static final PluginConfigSpec<String> OPERATION =
PluginConfigSpec.stringSetting("operation", "put");
// if remove_operation is true, the operation field is removed from the document (assuming it's dynamic)
public static final PluginConfigSpec<Boolean> REMOVE_OPERATION =
PluginConfigSpec.booleanSetting("remove_operation", false);
// whether to add create=true to the put/update request
public static final PluginConfigSpec<Boolean> CREATE =
PluginConfigSpec.booleanSetting("create", false);
Expand Down Expand Up @@ -76,15 +90,19 @@ public class VespaFeed implements Output {
private final String id;
private final String namespace;
private final boolean dynamicNamespace;
private final boolean removeNamespace;
private final String documentType;
private final boolean dynamicDocumentType;
private final boolean removeDocumentType;
private final String operation;
private final boolean dynamicOperation;
private final boolean create;
private final String idField;
private final boolean removeId;
private final long operationTimeout;
private volatile boolean stopped = false;
ObjectMapper objectMapper;
private final boolean removeOperation;


public VespaFeed(final String id, final Configuration config, final Context context) {
Expand All @@ -94,12 +112,12 @@ public VespaFeed(final String id, final Configuration config, final Context cont
DynamicOption configOption = new DynamicOption(config.get(NAMESPACE));
dynamicNamespace = configOption.isDynamic();
namespace = configOption.getParsedConfigValue();

removeNamespace = config.get(REMOVE_NAMESPACE);
// same with document type
configOption = new DynamicOption(config.get(DOCUMENT_TYPE));
dynamicDocumentType = configOption.isDynamic();
documentType = configOption.getParsedConfigValue();

removeDocumentType = config.get(REMOVE_DOCUMENT_TYPE);
// and operation
configOption = new DynamicOption(config.get(OPERATION));
dynamicOperation = configOption.isDynamic();
Expand All @@ -110,7 +128,8 @@ public VespaFeed(final String id, final Configuration config, final Context cont
operationTimeout = config.get(OPERATION_TIMEOUT);

idField = config.get(ID_FIELD);

removeId = config.get(REMOVE_ID);
this.removeOperation = config.get(REMOVE_OPERATION);
FeedClientBuilder builder = FeedClientBuilder.create(config.get(VESPA_URL))
.setConnectionsPerEndpoint(config.get(MAX_CONNECTIONS).intValue())
.setMaxStreamPerConnection(config.get(MAX_STREAMS).intValue())
Expand Down Expand Up @@ -221,30 +240,35 @@ public void output(final Collection<Event> events) {
}

protected CompletableFuture<Result> asyncFeed(Event event) throws JsonProcessingException {
Map<String, Object> eventData = event.getData();

// we put the doc ID here
// try to get the doc ID from the event, otherwise generate a UUID
String docIdStr;

// see if the event has an ID field (as configured)
// if it does, use it as docIdStr. Otherwise, generate a UUID
if (eventData.containsKey(idField)) {
docIdStr = eventData.get(idField).toString();
} else {
Object docIdObj = event.getField(idField);
if (docIdObj == null) {
docIdStr = UUID.randomUUID().toString();
} else {
docIdStr = docIdObj.toString();
// Remove the ID field if configured to do so
if (removeId) {
event.remove(idField);
}
}

// if the namespace is dynamic, we need to resolve it
// the default (if we don't have such a field) is simply the name of the field
String namespace = this.namespace;
if (dynamicNamespace) {
namespace = getDynamicField(event, this.namespace);
if (removeNamespace) {
event.remove(namespace);
}
}

// similar logic for the document type
String documentType = this.documentType;
if (dynamicDocumentType) {
documentType = getDynamicField(event, this.documentType);
if (removeDocumentType) {
event.remove(documentType);
}
}

// and the operation
Expand All @@ -256,6 +280,9 @@ protected CompletableFuture<Result> asyncFeed(Event event) throws JsonProcessing
// TODO we should put this in the dead letter queue
return null;
}
if (removeOperation) {
event.remove(this.operation);
}
}
// add create=true, if applicable
OperationParameters operationParameters = addCreateIfApplicable(operation, docIdStr);
Expand All @@ -265,10 +292,9 @@ protected CompletableFuture<Result> asyncFeed(Event event) throws JsonProcessing
DocumentId docId = DocumentId.of(namespace,
documentType, docIdStr);

// create a document from the event data. We need an enclosing "fields" object
// to match the Vespa put format
// create a document from the event data
Map<String,Object> doc = new HashMap<>();
doc.put("fields", eventData);
doc.put("fields", event.getData()); // Use the modified eventData here

// create the request to feed the document
if (operation.equals("put")) {
Expand Down Expand Up @@ -335,7 +361,9 @@ public void awaitStop() throws InterruptedException {

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return List.of(VESPA_URL, CLIENT_CERT, CLIENT_KEY, OPERATION, CREATE, NAMESPACE, DOCUMENT_TYPE, ID_FIELD,
return List.of(VESPA_URL, CLIENT_CERT, CLIENT_KEY, OPERATION, CREATE,
NAMESPACE, REMOVE_NAMESPACE, DOCUMENT_TYPE, REMOVE_DOCUMENT_TYPE, ID_FIELD, REMOVE_ID,
REMOVE_OPERATION,
MAX_CONNECTIONS, MAX_STREAMS, MAX_RETRIES, OPERATION_TIMEOUT, GRACE_PERIOD, DOOM_PERIOD);
}

Expand Down
Loading
Loading