Skip to content

Commit

Permalink
Added kafka configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
RapDog64 committed Jul 11, 2023
1 parent 97ad3a7 commit e234143
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 0 deletions.
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,35 @@ services:
- rangiffler-network
dns_search: .

zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:7.3.2
expose:
- "2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
networks:
- rangiffler-network

kafka:
container_name: kafka
image: confluentinc/cp-kafka:7.3.2
expose:
- "9092"
depends_on:
zookeeper:
condition: service_started
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
networks:
- rangiffler-network

rangiffler-auth:
container_name: rangiffler-auth
image: rapdog64/rangiffler-auth:1.0.0
Expand All @@ -35,6 +64,8 @@ services:
depends_on:
rangiffler-all-db:
condition: service_healthy
kafka:
condition: service_started
networks:
- rangiffler-network
dns_search: .
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.rangiffler.config;

import com.rangiffler.model.UserJson;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

@Configuration
public class KafkaAuthProducerConfiguration {
private final KafkaProperties kafkaProperties;

@Autowired
public KafkaAuthProducerConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}

@Bean
public Map<String, Object> producerConfiguration() {
Map<String, Object> properties = new HashMap<>(kafkaProperties.buildProducerProperties());
properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return properties;
}

@Bean
public ProducerFactory<String, UserJson> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration());
}

@Bean
public KafkaTemplate<String, UserJson> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
@Primary
public NewTopic topic() {
return TopicBuilder.name("users")
.partitions(10)
.replicas(1)
.build();
}
}
4 changes: 4 additions & 0 deletions rangiffler-auth/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ spring:
config:
activate:
on-profile: 'local'
kafka:
bootstrap-servers: 'localhost:9092'
datasource:
url: 'jdbc:postgresql://localhost:5432/rangiffler-auth'
jpa:
Expand All @@ -62,6 +64,8 @@ spring:
config:
activate:
on-profile: 'docker'
kafka:
bootstrap-servers: 'kafka:9092'
datasource:
url: 'jdbc:postgresql://rangiffler-all-db:5432/rangiffler-auth'
jpa:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.rangiffler.config;

import com.rangiffler.model.UserJson;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaUserdataConsumerConfiguration {
private final KafkaProperties kafkaProperties;

@Autowired
public KafkaUserdataConsumerConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}

@Bean
public ConsumerFactory<String, UserJson> consumerFactory() {
final JsonDeserializer<UserJson> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserJson> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserJson> concurrentKafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return concurrentKafkaListenerContainerFactory;
}
}

0 comments on commit e234143

Please sign in to comment.