-
Notifications
You must be signed in to change notification settings - Fork 5
Configuration
The sink offers a set of configuration keys alongside the Kafka Connect defaults for convertors, consumer settings and so on. Here is the full list:
Key | Description | Type | Required | Default |
---|---|---|---|---|
connect.ems.endpoint | Contains the EMS API endpoint in the form of: https://[team].[realm].celonis.cloud/continuous-batch-processing/api/v1/[pool-id]/items
|
STRING | YES | null |
connect.ems.authorization.key | Contains the EMS API Authorization header. It should be AppKey <<app-key>> or Bearer <<api-key>> . |
STRING | YES | null |
connect.ems.target.table | The table in EMS to store the data. | STRING | YES | null |
connect.ems.connection.id | Optional parameter. It represents the unique EMS connection identifier. | STRING | NO | null |
connect.ems.client.id | Optional parameter representing the client unique identifier | STRING | NO | null |
connect.ems.data.primary.key | Optional parameter to contain a list of comma separated columns which are primary keys for the EMS table. If not specified, and the table does not exists, all columns will form the primary key. | STRING | NO | null |
connect.ems.order.field.name | Optional parameter used only when primary keys are set. It needs to be a sortable field, present in the incoming data, to allow records deduplication for those ones sharing the same primary key(s). If this field is not set, the connector already injects a column named _celonis_order using the Kafka message offset. |
STRING | NO | null |
connect.ems.data.fallback.varchar.length | Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS | STRING | NO | null |
connect.ems.tmp.dir | The folder to store the temporary files as it accumulates data. If not specified then it uses System.getProperty("java.io.tmpdir") . |
STRING | NO | System temp directory |
connect.ems.commit.size.bytes | The accumulated file maximum size before it is uploaded to EMS. It cannot be less than 1 MB (1000000). A file will be uploaded if the other commit policies are triggered. A file smaller than 1MB can be still uploaded if the records count, the time interval, or there’s a schema change comes first. | LONG | YES | System temp directory |
connect.ems.commit.records | The maximum number of records in the accumulated file before it is uploaded to EMS. | INT | YES | null |
connect.ems.commit.interval.ms | The time interval in milliseconds to upload the data to EMS if the other two commit policies are not yet applicable. It cannot be less than 1 second. | LONG | YES | null |
connect.ems.parquet.write.flush.records | The number of records after which it should flush the data to the file, to ensure the file size policy. | INT | NO | 100 |
connect.ems.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are three available options: * CONTINUE - the error is swallowed * THROW - the error is allowed to propagate. * RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.ems.max.retries. All errors will be logged automatically, even if the code swallows them. |
STRING | NO | THROW |
connect.ems.max.retries | The maximum number of times to re-attempt to write the records before the task is marked as failed. | INT | NO | 10 |
connect.ems.obfuscation.fields | An optional value for a comma separated fields to obfuscate. It supports nested values including arrays. | STRING | NO | null |
connect.ems.obfuscation.method | The connector offers 3 types: fix, sha1 and sha512. When fix is used, the strings values are transformed to:***** . For SHA512 a salt is required. See connect.ems.obfusation.sha512.salt | STRING | NO | fix |
connect.ems.obfusation.sha512.salt | Required only when connect.ems.obfusation.method is set to sha512 and obfuscation fields have been set. If no obfuscation fields have been provided this configuration is ignored. | STRING | NO | null |
connect.ems.debug.keep.parquet.files | For debug purpose, set the setting to true for the connector to keep the files after they were uploaded | BOOL | NO | false |
connect.ems.proxy.host | The hostname of the proxy server, if a proxy is required to access external services. | STRING | NO | null |
connect.ems.proxy.port | The port number of the proxy server, if a proxy is required to access external services. | INT | NO | null |
connect.ems.proxy.auth.type | The type of proxy to use, if a proxy is required to access external services. There is currently one available option:
|
STRING | NO | null |
connect.ems.proxy.auth.username | The username for proxy authentication, if a proxy is required to access external services. | STRING | NO | null |
connect.ems.proxy.auth.password | The password for proxy authentication, if a proxy is required to access external services. | STRING | NO | null |
connect.ems.explode.mode | When each incoming record is a list of records, this will explode (flatten) the records on output. The possible values are:
|
STRING | NO | NONE |
An UPSERT behaviour is used for the data pushed to EMS. If the target EMS table is not defined by the time the connector is created, its schema it determined by the first set of data file(s) uploaded. If the connector drives the schema, it is recommended to set the primary key configuration entry: connect.ems.data.primary.key
. When this is left out, then all the fields are used as part of a composite primary key. Depending on the scenario this might not be desired.
To enable obfuscation connect.ems.obfuscation.fields
has to be set. When this is set then connect.ems.obfuscation.method
is required.
-
fix
obfuscation converts the data to ***** for those fields required to obfuscate - only text (string) columns can be obfuscated.
- the obfuscation process does not validate if the obfuscated path does not exist.
- it supports nested structures as well
- it supports arrays
When the data structure looks like:
{
"a": {
"b": "secret",
...
},
"x": {
"y": {
"z": "another secret",
...
}
}
}
set the value to be: a.b, x.y.z. This produces the following output:
{
"a": {
"b": "*****",
...
},
"x": {
"y": {
"z": "*****",
...
}
}
}
When the data structure contains arrays, and the obfuscation logic needs to cover the array items then the path resolving the array should contain the keyword: value. For example, given this data structure:
{
"a": [
"secret1",
"secret2",
...
],
"x": [
"y": {
"z": "another secret",
...
}
]
}
then set the obfuscation to: a.value, x.value.y.z. This produces this output:
{
"a": [
"*****",
"*****",
...
],
"x": [
"y": {
"z": "*****",
...
}
]
}
Copyright @ Celonis 2022