-
Notifications
You must be signed in to change notification settings - Fork 5
Home
Celonis EMS Kafka connector is an Apache Kafka Connect compatible source plugin (or connector). This allows users to push data, which is present in their Apache Kafka, to EMS.
Check How it works to understand how the data is landed in EMS.
Apache Kafka is data payload agnostic, the message information is an array of bytes. The actual bytes can be the storage for:
- AVRO
- PROTOBUF
- XML
- JSON
- JSON with Schema
Below is a table with the supported formats and schema evolution. For AVRO and PROTOBUF, it is expected a schema registry solution is in place.
Data Format | Supported | Schema |
---|---|---|
AVRO | Yes | Yes |
PROTOBUF | Yes | Yes |
JSON | Yes | Inferred |
JSON with schema | Yes | Yes |
XML | Yes | Inferred |
Every record is associated with a schema (explicit or inferred by the connector). This record schema may differ with the schema of the target table in EMS. Only two kind of schema evolutions are allowed:
- Omitting a column that is not a primary key (since in EMS every column is nullable, unless it is part of a primary key)
- Adding a new column
Any other kind of mismatch will cause the batch containing that record fail the insertion into EMS.
For (schemaless) JSON
and XML
data formats, schemas are not available, and the connector infers one for every record coming from the Kafka source.
This is not ideal for enforcing the data quality and schema validation, and may lead to unexpected behaviours.
Each Kafka message is self-contained. This means the information it carries is not dependent on previous messages. As a result, the connector can infer the schema only at the message level.
But then a JSON document can contain a field address=null
or maybe the nullable field is not even written in the payload - for performance reasons.
Therefore, there is no way to correctly infer the type, and tracking cross messages is not a bulletproof solution.
Example:
{
"firstName": "Alexandra",
"lastName": "Jones",
"address": null
}
{
"firstName": "Alexandra",
"lastName": "Jones",
"address": null,
"age": 32
}
In some use cases, the JSON payload can contain the schema, and for this, as stated in the table above, the support is better. Here is an example of a JSON with schema which the Kafka Connect converter: JsonConverter can interpret.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "registertime"
},
{
"type": "string",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "regionid"
},
{
"type": "string",
"optional": false,
"field": "gender"
}
],
"optional": false,
"name": "ksql.users"
},
"payload": {
"registertime": 1493819497170,
"userid": "User_1",
"regionid": "Region_5",
"gender": "MALE"
}
}
Delete instructions in Apache Kafka is a data protocol. A Kafka message contains a Key and Value component (there are more like headers, and metadata, but they don't play any role in this context). Whenever there's a message with a Key: K1 and Value: null, it's interpreted as a delete for K1.
At the moment, the connector skips "deleted" records.
Inside EMS, data is landed in a table within a data pool. The target table might be created upfront or as a result of the first data upload. Check "How it works" to understand the connector design. However, the connector is not in a position to validate the records schema matches EMS table schema. The user is expected to align the data.
Copyright @ Celonis 2022