Skip to content

Configuration

Stefan Bocutiu edited this page Sep 28, 2021 · 28 revisions

The sink offers a set of configuration keys alongside the Kafka Connect defaults for convertors, consumer settings and so on. Here is the full list:

Key Description Type Required Default
connect.ems.endpoint Contains the EMS API endpoint in the form of: https://[team].[realm].celonis.cloud/continuous-batch-processing/api/v1/[pool-id]/items STRING YES null
connect.ems.authorization.key Contains the EMS API Authorization header. It should be AppKey <<app-key>> or Bearer <<api-key>>. STRING YES null
connect.ems.target.table The table in EMS to store the data. STRING YES null
connect.ems.connection.id Optional parameter. It represents the unique EMS connection identifier. STRING NO null
connect.ems.client.id Optional parameter representing the client unique identifier STRING NO null
connect.ems.data.primary.key Optional parameter to contain a list of comma separated columns which are primary keys for the EMS table. If not specified, and the table does not exists, all columns will form the primary key. STRING NO null
connect.ems.data.fallback.varchar.length Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS STRING NO null
connect.ems.tmp.dir The folder to store the temporary files as it accumulates data. If not specified then [${System.getProperty("java.io.tmpdir")}] is being used. STRING NO System temp directory
connect.ems.commit.size.bytes The accumulated file maximum size before it is uploaded to EMS. It cannot be less than 1 MB (1000000). A file will be uploaded if the other commit policies are triggered. A file smaller than 1MB can be still uploaded if the records count, the time interval, or there’s a schema change comes first. LONG YES System temp directory
connect.ems.commit.records The maximum number of records in the accumulated file before it is uploaded to EMS. INT YES null
connect.ems.commit.interval.ms The time interval in milliseconds to upload the data to EMS if the other two commit policies are not yet applicable. It cannot be less than 1 second. LONG YES null
connect.ems.parquet.write.flush.records The number of records after which it should flush the data to the file, to ensure the file size policy. INT NO 100
connect.ems.error.policy Specifies the action to be taken if an error occurs while inserting the data.
There are three available options:
* CONTINUE - the error is swallowed
* THROW - the error is allowed to propagate.
* RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.ems.max.retries.
All errors will be logged automatically, even if the code swallows them.
STRING NO THROW
connect.ems.max.retries The maximum number of times to re-attempt to write the records before the task is marked as failed. INT NO 10
connect.ems.obfuscation.fields An optional value for a comma separated fields to obfuscate. It supports nested values including arrays. STRING NO null
connnect.ems.obfuscation.method The connector offers 3 types: fix, sha1 and sha512. When fix is used, the strings values are transformed to:***** . For SHA512 a salt is required. See connect.ems.obfusation.sha512.salt STRING NO fix
connect.ems.obfusation.sha512.salt Required only when connect.ems.obfusation.method is set to sha512 and obfuscation fields have been set. If no obfuscation fields have been provided this configuration is ignored. STRING NO null
connect.ems.debug.keep.parquet.files For debug purpose, set the setting to true for the connector to keep the files after they were uploaded BOOL NO false

Obfuscation

Rules

To enable obfuscation connect.ems.obfuscation.fields has to be set. When this is set then connect.ems.obfuscation.method is required.

  • fix obfuscation converts the data to ***** for those fields required to obfuscate
  • only text (string) columns can be obfuscated.
  • the obfuscation process does not validate if the obfuscated path does not exist.
  • it supports nested structures as well
  • it supports arrays

When the data structure looks like:

{
   "a": {
    "b": "secret",
    ...
   },
   "x": {
     "y": {
        "z": "another secret",
        ...
     }
   }
}

set the value to be: a.b, x.y.z. This produces the following output:

{
   "a": {
    "b": "*****",
    ...
   },
   "x": {
     "y": {
        "z": "*****",
        ...
     }
   }
}

When the data structure contains arrays, and the obfuscation logic needs to cover the array items then the path resolving the array should contain the keyword: value. For example, given this data structure:

{
   "a": [
    "secret1",
    "secret2",
    ...
   ],
   "x": [
     "y": {
        "z": "another secret",
        ...
     }
   ]
}

then set the obfuscation to: a.value, x.value.y.z. This produces this output:

{
   "a": [
    "*****",
    "*****",
    ...
   ],
   "x": [
     "y": {
        "z": "*****",
        ...
     }
   ]
}

Examples

  • Constant obfuscation
name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false
connect.ems.obfuscation.method="fix"
connect.ems.obfuscation.fields="credit_card, ssn"
  • SHA1 obfuscation
name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false
connect.ems.obfuscation.method="sha1"
connect.ems.obfuscation.fields="credit_card, ssn"
  • SHA512 obfuscation
name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false
connect.ems.obfuscation.method="sha512"
connect.ems.obfusation.sha512.salt="very secure not"
connect.ems.obfuscation.fields="credit_card, ssn"
Clone this wiki locally