Skip to content

Configuration

Andrea Fiore edited this page Mar 13, 2023 · 28 revisions

The sink offers a set of configuration keys alongside the Kafka Connect defaults for convertors, consumer settings and so on. Here is the full list:

Key Description Type Required Default
connect.ems.endpoint Contains the EMS API endpoint in the form of: https://[team].[realm].celonis.cloud/continuous-batch-processing/api/v1/[pool-id]/items STRING YES null
connect.ems.authorization.key Contains the EMS API Authorization header. It should be AppKey <<app-key>> or Bearer <<api-key>>. STRING YES null
connect.ems.target.table The table in EMS to store the data. STRING YES null
connect.ems.connection.id Optional parameter. It represents the unique EMS connection identifier. STRING NO null
connect.ems.embed.kafka.metadata Include Kafka metadata fields (i.e. kafkaOffset, kafkaPartition, kafkaTimestmap) in the target EMS table. BOOLEAN YES true
connect.ems.flattener.enable Enable message flattening transformation. This has to be set to true when source topic contains nested data. BOOLEAN NO false
connect.ems.flattener.collections.discard Discard array and map fields. Default behaviour is to transform them into JSON-encoded strings. BOOLEAN NO false
connect.ems.flattener.jsonblob.chunks The number of string chunks the input record should be JSON encoded into. The byte-size of each JSON-encoded chunk is driven by the connect.ems.data.fallback.varchar.length parameter, which needs to be supplied in order for this configuration keyto be accepted. INT NO null
connect.ems.client.id Optional parameter representing the client unique identifier STRING NO null
connect.ems.data.primary.key Optional parameter to contain a list of comma separated columns which are primary keys for the EMS table. If not specified, no primary key will be used, unique will not be enforced and the data will not be deduplicated. STRING NO null
connect.ems.order.field.name Optional parameter used only when primary keys are set. It needs to be a sortable field, present in the incoming data, to allow records deduplication for those ones sharing the same primary key(s). For details, see the Primary Key(s) section. STRING NO null
connect.ems.data.fallback.varchar.length Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS. This value must be between 1 and 65000. STRING NO null
connect.ems.tmp.dir The folder to store the temporary files as it accumulates data. If not specified then it uses System.getProperty("java.io.tmpdir"). STRING NO System temp directory
connect.ems.commit.size.bytes The accumulated file maximum size before it is uploaded to EMS. It cannot be less than 1 MB (1000000). A file will be uploaded if the other commit policies are triggered. A file smaller than 1MB can be still uploaded if the records count, the time interval, or there’s a schema change comes first. LONG YES System temp directory
connect.ems.commit.records The maximum number of records in the accumulated file before it is uploaded to EMS. INT YES null
connect.ems.commit.interval.ms The time interval in milliseconds to upload the data to EMS if the other two commit policies are not yet applicable. It cannot be less than 1 second. LONG YES null
connect.ems.parquet.write.flush.records The number of records after which it should flush the data to the file, to ensure the file size policy. INT NO 100
connect.ems.error.policy Specifies the action to be taken if an error occurs while inserting the data.
There are three available options:
* CONTINUE - the error is swallowed
* THROW - the error is allowed to propagate.
* RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.ems.max.retries.
All errors will be logged automatically, even if the code swallows them.
STRING NO THROW
connect.ems.max.retries The maximum number of times to re-attempt to write the records before the task is marked as failed. INT NO 10
connect.ems.retry.interval The interval to wait between retries when using RETRY mode LONG NO 1000
connect.ems.obfuscation.fields An optional value for a comma separated fields to obfuscate. It supports nested values including arrays. STRING NO null
connect.ems.obfuscation.method The connector offers 3 types: fix, sha1 and sha512. When fix is used, the strings values are transformed to:***** . For SHA512 a salt is required. See connect.ems.obfusation.sha512.salt STRING NO fix
connect.ems.obfusation.sha512.salt Required only when connect.ems.obfusation.method is set to sha512 and obfuscation fields have been set. If no obfuscation fields have been provided this configuration is ignored. STRING NO null
connect.ems.debug.keep.parquet.files For debug purpose, set the setting to true for the connector to keep the files after they were uploaded BOOL NO false
connect.ems.proxy.host The hostname of the proxy server, if a proxy is required to access external services. STRING NO null
connect.ems.proxy.port The port number of the proxy server, if a proxy is required to access external services. INT NO null
connect.ems.proxy.auth.type The type of proxy to use, if a proxy is required to access external services.
There is currently one available option:
  • BASIC - Basic Authentication will be used
STRING NO null
connect.ems.proxy.auth.username The username for proxy authentication, if a proxy is required to access external services. STRING NO null
connect.ems.proxy.auth.password The password for proxy authentication, if a proxy is required to access external services. STRING NO null
connect.ems.explode.mode When each incoming record is a list of records, this will explode (flatten) the records on output.
The possible values are:
  • NONE
  • LIST - the record must be a List of records types. The sink will discard the List wrapper and write each record.
When using this setting with flush counts, the number of exploded records will not be reflected in the flush counts, only the unexploded.
STRING NO NONE
connect.ems.pool.max.idle Connection pool - Maximum number of idle connections to allow. INT NO 5
connect.ems.pool.keepalive Connection pool - Number of milliseconds to keep connection alive. LONG NO 300000
connect.ems.pool.explicit.close Connection pool - Explicitly close connections on completion of request. BOOLEAN NO false

Record flattening

As EMS is designed to work with relational data, the EMS connector supports a mechanism to automatically flatten object fields nested within the source record. This can be enabled by setting connect.ems.flattener.enable to true. When enabling the built-in flattener, repeated values such as arrays and maps will be JSON encoded and persisted as nullable VARCHAR columns in the target EMS table.

Chunked JSON encoding

As part of the built-in support for flattening nested structures, we provide the connect.ems.flattener.jsonblob.chunks setting which allows to JSON encode record payloads into an configurable amount of target table fields (i.e. chunks). The byte-size of such chunks is set via connect.ems.data.fallback.varchar.length. Notice that this value must be between 1 and 65000 bytes as that is the maximum length of an varchar field in EMS. This is intended as an escape hatch to allow ingesting unstructured JSON data into EMS and then implement parsing and normalisation as a separate process (e.g. a custom script run at regular intervals).

Primary Key(s)

An UPSERT behaviour is used for the data pushed to EMS. If the target EMS table is not defined by the time the connector is created, its schema it determined by the first set of data file(s) uploaded. If the connector drives the schema, it is recommended to set the primary key configuration entry: connect.ems.data.primary.key. When this is left out, then all the input records are appended to the target EMS table. Depending on the scenario this might not be desired.

Apache Kafka guarantees message order at the partition level only. Best practices requires the records with the same primary key to be written to one partition.

Records deduplication

It's assumed the reader has a basic understanding of Kafka in regards with topic, partitions and message structure.

EMS data ingestion relies on Parquet file uploads. As data records are accumulated in the files, it can be the case that records with the same primary key are present in one file or files which are processed together as part of the ingestion. To ensure the latest record is the one stored, a sortable field needs to be provided. Otherwise there is no guarantee on the outcome.

When the primary key configuration is set, the connector automatically injects a field __celonis_order to ensure the deduplication. This field is populated with the incoming Kafka message offset. For this approach it is required to ensure all records sharing the same PK to always be written to the same partition. When this is not the case, it's the user responsibility to set the connect.ems.order.field.name to an incoming data field which guarantees the records order over the same primary key.

How to ensure partition-affinity

By default the Kafka producer determines the target partition based on the message Key raw bytes values. To compute the target partition, it hashes the value using Murmur3 and then it does a modulo-Number of partitions. Therefore to ensure the records with the same primary key end up always in the same partition, the Kafka message Key should contain only the primary key(s) values. Note that the connector does not consider the Key fields. It's the user responsibility to add the primary key(s) to the record Value, either at produce time (duplicating data between the Key and the Value) or at the connector level by using Kafka Connect Single Message Transform which will transfer fields from the Key into the Value (a reverse of this)

Obfuscation

Rules

To enable obfuscation connect.ems.obfuscation.fields has to be set. When this is set then connect.ems.obfuscation.method is required.

  • fix obfuscation converts the data to ***** for those fields required to obfuscate
  • only text (string) columns can be obfuscated.
  • the obfuscation process does not validate if the obfuscated path does not exist.
  • it supports nested structures as well
  • it supports arrays

When the data structure looks like:

{
   "a": {
    "b": "secret",
    ...
   },
   "x": {
     "y": {
        "z": "another secret",
        ...
     }
   }
}

set the value to be: a.b, x.y.z. This produces the following output:

{
   "a": {
    "b": "*****",
    ...
   },
   "x": {
     "y": {
        "z": "*****",
        ...
     }
   }
}

When the data structure contains arrays, and the obfuscation logic needs to cover the array items then the path resolving the array should contain the keyword: value. For example, given this data structure:

{
   "a": [
    "secret1",
    "secret2",
    ...
   ],
   "x": [
     "y": {
        "z": "another secret",
        ...
     }
   ]
}

then set the obfuscation to: a.value, x.value.y.z. This produces this output:

{
   "a": [
    "*****",
    "*****",
    ...
   ],
   "x": [
     "y": {
        "z": "*****",
        ...
     }
   ]
}
Clone this wiki locally