From 75bc41db3daff60937bfb9a61cc764a8a538770e Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 6 Dec 2021 15:19:42 +0530 Subject: [PATCH] kafka without header type --- .../com/uci/inbound/AppConfigInbound.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/main/java/com/uci/inbound/AppConfigInbound.java b/src/main/java/com/uci/inbound/AppConfigInbound.java index 4706c8e..0756d33 100644 --- a/src/main/java/com/uci/inbound/AppConfigInbound.java +++ b/src/main/java/com/uci/inbound/AppConfigInbound.java @@ -1,14 +1,50 @@ package com.uci.inbound; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import com.uci.dao.service.HealthService; @Configuration public class AppConfigInbound { + @Value("${spring.kafka.bootstrap-servers}") + private String BOOTSTRAP_SERVERS; + @Bean public HealthService healthService() { return new HealthService(); } + + @Bean + Map kafkaProducerConfiguration() { + Map configuration = new HashMap<>(); + configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + configuration.put(org.springframework.kafka.support.serializer.JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + configuration.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer"); + configuration.put(ProducerConfig.ACKS_CONFIG, "all"); + configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); + configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); + + return configuration; + } + + @Bean + ProducerFactory producerFactory(){ + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration()); + return producerFactory; + } + + @Bean + KafkaTemplate kafkaTemplate() { + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); + return (KafkaTemplate) kafkaTemplate; + } }