Default Elastic sink connector and open source alternatives read data from Kafka topic, and index/delete them with respect to startup configuration.
This custom connector created for reading the configuration from the data itself.
That is,
- If data has "status" field set to "insert", then connector sends index request.
- If data has "status" field set to "delete", then connector delete request.
"status" flag name is configurable.
Your record must be a JSON string and these fields should be included:
- flag : Used to get behaviour, should be
insert
ordelete
, field nameflag
is configurable byflag.field
property. (e.g.status
in example configuration below) - You can also change
insert
ordelete
valuesConstants.java
file. - payload: To send Elastic, should contain Json data, field name
dataList
is configurable bydata.array
property. (e.g.dataList
in example configuration below). Payload can be array or object. - Your data in
dataList
must includeid
field. You can change id field name fromConstants.java
elastic.url=elasticsearch
name=ElasticSinkConnector
topics=first_topic
tasks.max=1
type.name=targettype
connector.class=com.skynyrd.kafka.ElasticSinkConnector
elastic.port=9200
index.name=targetindex
flag.field=status
data.array=dataList
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
You should have Docker and Docker Compose to run and test it in your local.
- Clone this repo
mvn clean
andmvn install
- Change absolute path in
volumes
underkafka-cluster
indocker-compose.yml
- Run with
docker-compose up
- Wait a bit and open connect console
http://localhost:3030/kafka-connect-ui/#/cluster/fast-data-dev/select-connector
- Select
ElasticSinkConnector
in right bottom of the page. (Appears if volume is correctly set in compose file.) - For logs:
http://localhost:3030/logs/connect-distributed.log
Curious about how I implemented this repository? Here is my medium post
- Missing tests.