Skip to content

Examples

Nicolò Martini edited this page May 9, 2023 · 12 revisions

Working with different data formats

In this section we will see how to configure the connector to work with different data formats in the input topic.

In all the following examples the connector is configured with the following upload rules:

  • 10MB file or
  • 100000 records or
  • 30 seconds since last write to cater for no more records available for the time being

The connector also flushes the parquet file every 1000 records. It is when the flush happens that the file size check is performed.

AVRO input

Here is an example for uploading the data from a Kafka topic which has the message value stored as Avro. The key converter is set to StringConverter but the key information is not being used.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false

JSON input

Here is an example for uploading the data from a Kafka topic which has the message value stored as JSON. The key converter is set to StringConverter but the key information is not being used.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false

XML input

Here is an example for uploading the data from a Kafka topic which has the message value stored as XML. The key converter is set to StringConverter but the key information is not being used. The value converter uses the EMS XmlConverter, provided within the connector Jar.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.celonis.kafka.connect.ems.converter.XmlConverter
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false

Notes

  • The conversion works converting the input XML string into JSON, and then using the same flattening strategy as the JSON use-case.
  • The XML to JSON conversion is handled by the jackson-dataformat-xml library, and you should be aware of some peculiarities checking the library home page.
  • In particular repeated xml elements are converted to arrays, but single ones are converted to objects. This may lead to incompatible schema changes from one record to another. For example the record
        <root>
            <item><nested>value1</nested></item>
        </root>
    will be converted to
        { "item": { "nested": "value1" } }
    while the following
    <root>
        <item><nested>value1</nested></item>
        <item><nested>value2</nested></item>
    </root>
    will be converted to
      { "item": [{ "nested": "value1" }, { "nested": "value1" }] }
    As you can see this may result in schema incompatibility problems when pushing into EMS.

Handling nested records

The connector provides built-in support for flattening nested records. This can be enabled by setting the following flag:

# ...
connect.ems.flattener.enable=true
# ...

When encountering arrays, the flattener will attempt at serialising them as a JSON string field. Alternatively, arrays and maps can be completely discarded by setting connect.ems.flattener.connections.discard=true.

Notes

Currently, the connector fails with an exception whenever it encounters a list containing values of heterogeneous types (e.g. [1, 2, "three", null, true]).

Adding Kafka metadata as EMS columns

You can add kafka metadata as columns to EMS by using the InsertField SMT. This is useful when you want to produce a timeline or an activity log from a dataset that doesn't have a time column itself. You can use the Kafka timestamp as a time column, provided that the timestamps are set by the Kafka producer in away that makes sense for your activity log.

Below we're inserting 3 columns

  1. the Kafka partition
  2. the Kafka offset
  3. the Kafka timestamp.
# ...
# Configure the SMT to output the Kafka partition in column kafka_partition
transforms=insertKafkaPartition
transforms.insertKafkaPartition.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertKafkaPartition.partition.field=kafka_partition


# Configure the SMT to output the Kafka offset in column kafka_offset
transforms=insertKafkaOffset
transforms.insertKafkaOffset.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertKafkaOffset.offset.field=kafka_offset


# Configure the SMT to output the Kafka timestamp in column kafka_ts 
transforms=insertKafkaTimestamp
transforms.insertKafkaTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertKafkaTimestamp.timestamp.field=kafka_ts
# ...

Primary Key(s)

Specifies a set of fields, from the incoming payload, which should be used as Primary Keys in Celonis. If this is not provided then all the fields are used.

# ...
# Single PK
connect.ems.data.primary.key=customer_id
# ...

# Composite PK
# ...
connect.ems.data.primary.key=name,address
# ...

Please refer to primary keys documentation to learn about the best practices.

Overwrite the Order field when using Primary Key(s)

If your data contains already an order field, use it since it will improve the performance and leads to less disk spaced required in EMS. Here is an example configuration when a field timestamp guarantees that two records with the same PK won't share the same value:

# ...

# Single PK
connect.ems.data.primary.key=customer_id
connect.ems.order.field.name=processed_ts
# ...

# Composite PK
connect.ems.data.primary.key=name,address
connect.ems.order.field.name=processed_ts
# ...

Use the Kafka timestamp as the Order field when using Primary Key(s)

This is useful when you want to upsert to EMS by primary key but there's no order column available. In this case, you could use the Kafka timestamp as the order field provided that:

  1. All events of a given primary key are situation in the same Kafka partition.
  2. You are happy with how the Kafka timestamp is set by the producer. The Kafka timestamp could either indicate the event-time (when the event occurred) or the process-time (when the message was received by Kafka itself).

Use the InsertField SMT to add the Kafka timestamp as a new field in the connector's output. Then, pass it to the order-field configuration of the connector:

# ...
# Configure the SMT to use the Kafka timestamp
transforms=insertKafkaTimestamp
transforms.insertKafkaTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value

# Set the name of the output column where the Kafka timestamp will be placed
transforms.insertKafkaTimestamp.timestamp.field=kafka_ts
# ...

connect.ems.data.primary.key=customer_id
connect.ems.order.field.name=kafka_ts
# ...

Obfuscation

All the obfuscated fields are uploaded to EMS as *****. In this example credit_card and ssn fields are obfuscated.

# ...
name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.obfuscation.method="fix"
connect.ems.obfuscation.fields="credit_card, ssn"
# ...

SHA1 obfuscation

All the fields are encrypted with SHA1 and the result is converted to hex string. For example the text "this is a test" will end up translated to "9938a75e6d10a74d6b2e9bc204177de5b95f28fe". In this example credit_card and ssn fields are obfuscated.

# ...
connect.ems.obfuscation.method="sha1"
connect.ems.obfuscation.fields="credit_card, ssn"
# ...

SHA512 obfuscation

All the fields are encrypted with SHA1 and the result is converted to hex string. In this example credit_card and ssn fields are obfuscated.

# ...
connect.ems.obfuscation.method="sha512"
connect.ems.obfusation.sha512.salt="very secure not"
connect.ems.obfuscation.fields="credit_card, ssn"
# ...

Exploding Lists

The connector contains limited support for 'exploding' a message. This means converting a message consisting of a list of records into individual records that are individually presented to Connect.

Here is an example, presented as Json.

Input Data

{
  "employees": [
    {
      "employee_number": 1,
      "name": "Arturo",
      "birth_year": 1940
    },
    {
      "employee_number": 2,
      "name": "Mallory",
      "birth_year": 1973
    },
    {
      "employee_number": 3,
      "name": "Wells",
      "birth_year": 1972
    },
    {
      "employee_number": 4,
      "name": "Brown",
      "birth_year": 1955
    }
  ]
}

Upon adding the connector configuration:

# ...
connect.ems.explode.mode=LIST
# ...

Output Data

The sink will discard the List wrapper and pass each record to EMS independently.

First Message:

{
  "employee_number": 1,
  "name": "Arturo",
  "birth_year": 1940
}

Second Message:

{
  "employee_number": 2,
  "name": "Mallory",
  "birth_year": 1973
}

(... and so on.)