Skip to content

Latest commit

 

History

History
315 lines (276 loc) · 9.34 KB

File metadata and controls

315 lines (276 loc) · 9.34 KB

Twelve Days of Single Message Transforms - Day 2 - ValueToKey and ExtractField

🎥 Recording

maxresdefault

Setup

  1. Clone the repository

    git clone https://github.com/confluentinc/demo-scene.git
    cd demo-scene/kafka-connect-single-message-transforms
  2. Bring the stack up

    docker-compose up -d
  3. Wait for Kafka Connect to start up

    bash -c ' \
    echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
      echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    done
    echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
    '

Populate the source database

Launch MySQL CLI:

docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'

Create & populate a table:

create table customers (
	id INT,
	full_name VARCHAR(50),
	birthdate DATE,
	fav_animal VARCHAR(50),
	fav_colour VARCHAR(50),
	fav_movie VARCHAR(50)
);
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (1, 'Leone Puxley', '1995-02-06', 'Violet-eared waxbill', 'Puce', 'Oh! What a Lovely War');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (2, 'Angelo Sharkey', '1996-04-08', 'Macaw, green-winged', 'Red', 'View from the Top, A');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (3, 'Jozef Bailey', '1954-07-10', 'Little brown bat', 'Indigo', '99 francs');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (4, 'Evelyn Deakes', '1975-09-13', 'Vervet monkey', 'Teal', 'Jane Austen in Manhattan');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (5, 'Dermot Perris', '1991-01-29', 'African ground squirrel (unidentified)', 'Khaki', 'Restless');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (6, 'Renae Bonsale', '1965-01-05', 'Brown antechinus', 'Fuscia', 'Perfect Day, A (Un giorno perfetto)');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (7, 'Florella Fridlington', '1950-08-07', 'Burmese brown mountain tortoise', 'Purple', 'Dot the I');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (8, 'Hettie Keepence', '1971-10-14', 'Crab-eating raccoon', 'Puce', 'Outer Space');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (9, 'Briano Quene', '1990-05-02', 'Cormorant, large', 'Yellow', 'Peacekeeper, The');
insert into customers (id, full_name, birthdate, fav_animal, fav_colour, fav_movie) values (10, 'Jeddy Cassell', '1978-12-24', 'Badger, european', 'Indigo', 'Shadow of a Doubt');

Create source connector

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-00/config \
         -H "Content-Type: application/json" -d '{
          "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
          "connection.url": "jdbc:mysql://mysql:3306/demo",
          "connection.user": "mysqluser",
          "connection.password": "mysqlpw",
          "topic.prefix": "mysql-00-",
          "poll.interval.ms": 1000,
          "tasks.max":1,
          "table.whitelist" : "customers",
          "mode":"incrementing",
          "incrementing.column.name": "id",
          "validate.non.null": false
          }'

List topics

docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort
"__consumer_offsets"
"_connect-configs"
"_connect-offsets"
"_connect-status"
"_schemas"
"mysql-00-customers"

Preview data

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-00-customers -C -c1 -o beginning -u -q -J | jq  '.'
{
  "topic": "mysql-00-customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607512308962,
  "broker": 1,
  "key": null,
  "payload": {
    "id": {
      "int": 1
    },
    "full_name": {
      "string": "Leone Puxley"
    },
    "birthdate": {
      "int": 9167
    },
    "fav_animal": {
      "string": "Violet-eared waxbill"
    },
    "fav_colour": {
      "string": "Puce"
    },
    "fav_movie": {
      "string": "Oh! What a Lovely War"
    }
  }
}

ValueToKey

Very useful for setting the key of a message to a field from the value. The SMT takes a field from the Value part of the message and overwrite it to the Key.

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-01/config \
         -H "Content-Type: application/json" -d '{
          "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
          "connection.url": "jdbc:mysql://mysql:3306/demo",
          "connection.user": "mysqluser",
          "connection.password": "mysqlpw",
          "topic.prefix": "mysql-01-",
          "poll.interval.ms": 1000,
          "tasks.max":1,
          "table.whitelist" : "customers",
          "mode":"incrementing",
          "incrementing.column.name": "id",
          "validate.non.null": false,
          "transforms": "copyIdToKey",
          "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
          "transforms.copyIdToKey.fields": "id"
          }'

List topics

docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort
"__consumer_offsets"
"_connect-configs"
"_connect-offsets"
"_connect-status"
"_schemas"
"mysql-00-customers"
"mysql-01-customers"

Preview data

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-01-customers -C -c1 -o beginning -u -q -J | jq  '.'
{
  "topic": "mysql-01-customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607512553963,
  "broker": 1,
  "key": "Struct{id=1}",
  "payload": {
    "id": {
      "int": 1
    },
    "full_name": {
      "string": "Leone Puxley"
    },
    "birthdate": {
      "int": 9167
    },
    "fav_animal": {
      "string": "Violet-eared waxbill"
    },
    "fav_colour": {
      "string": "Puce"
    },
    "fav_movie": {
      "string": "Oh! What a Lovely War"
    }
  }
}

Combining ValueToKey and ExtractField

The above SMT will write a struct to the key, and often you want just the primitive value instead. That’s what ExtractField does.

curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-02/config \
         -H "Content-Type: application/json" -d '{
          "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
          "connection.url": "jdbc:mysql://mysql:3306/demo",
          "connection.user": "mysqluser",
          "connection.password": "mysqlpw",
          "topic.prefix": "mysql-02-",
          "poll.interval.ms": 1000,
          "tasks.max":1,
          "table.whitelist" : "customers",
          "mode":"incrementing",
          "incrementing.column.name": "id",
          "validate.non.null": false,
          "transforms": "copyIdToKey,extractKeyFromStruct",
          "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
          "transforms.copyIdToKey.fields": "id",
          "transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
          "transforms.extractKeyFromStruct.field":"id"
          }'

List topics

docker exec kafkacat kafkacat -b broker:29092 -L -J | jq '.topics[].topic'|sort
"__consumer_offsets"
"_connect-configs"
"_connect-offsets"
"_connect-status"
"_schemas"
"mysql-00-customers"
"mysql-01-customers"
"mysql-02-customers"

Preview data

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t mysql-02-customers -C -c1 -o beginning -u -q -J | jq  '.'
{
  "topic": "mysql-02-customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1607512714619,
  "broker": 1,
  "key": "1",
  "payload": {
    "id": {
      "int": 1
    },
    "full_name": {
      "string": "Leone Puxley"
    },
    "birthdate": {
      "int": 9167
    },
    "fav_animal": {
      "string": "Violet-eared waxbill"
    },
    "fav_colour": {
      "string": "Puce"
    },
    "fav_movie": {
      "string": "Oh! What a Lovely War"
    }
  }
}