Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka broker #23

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,35 @@ services:
- rangiffler-network
dns_search: .

zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:7.3.2
expose:
- "2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
networks:
- rangiffler-network

kafka:
container_name: kafka
image: confluentinc/cp-kafka:7.3.2
expose:
- "9092"
depends_on:
zookeeper:
condition: service_started
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
networks:
- rangiffler-network

rangiffler-auth:
container_name: rangiffler-auth
image: rapdog64/rangiffler-auth:1.0.0
Expand All @@ -35,6 +64,8 @@ services:
depends_on:
rangiffler-all-db:
condition: service_healthy
kafka:
condition: service_started
networks:
- rangiffler-network
dns_search: .
Expand Down
1 change: 1 addition & 0 deletions rangiffler-auth/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation "org.springframework.boot:spring-boot-starter-validation"
implementation "org.springframework.boot:spring-boot-starter-actuator"
implementation 'org.springframework.security:spring-security-oauth2-authorization-server:1.0.0'
implementation 'org.springframework.kafka:spring-kafka:3.0.7'
runtimeOnly 'org.postgresql:postgresql'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
annotationProcessor 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.rangiffler.config;

import com.rangiffler.model.UserJson;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

@Configuration
public class KafkaAuthProducerConfiguration {
private final KafkaProperties kafkaProperties;

@Autowired
public KafkaAuthProducerConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}

@Bean
public Map<String, Object> producerConfiguration() {
Map<String, Object> properties = new HashMap<>(kafkaProperties.buildProducerProperties());
properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return properties;
}

@Bean
public ProducerFactory<String, UserJson> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration());
}

@Bean
public KafkaTemplate<String, UserJson> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
@Primary
public NewTopic topic() {
return TopicBuilder.name("users")
.partitions(10)
.replicas(1)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.rangiffler.controller;

import com.rangiffler.model.RegistrationModel;
import com.rangiffler.model.UserJson;
import com.rangiffler.service.UserService;
import jakarta.annotation.Nonnull;
import jakarta.servlet.http.HttpServletResponse;
Expand All @@ -10,6 +11,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.validation.BeanPropertyBindingResult;
Expand All @@ -29,14 +31,17 @@ public class RegisterController {
private static final String MODEL_REG_FORM_ATTR = "registrationModel";
private static final String MODEL_FRONT_URI_ATTR = "frontUri";
private static final String REG_MODEL_ERROR_BEAN_NAME = "org.springframework.validation.BindingResult.registrationModel";
private static final String KAFKA_TOPIC_USERS = "users";

private final UserService userService;
private final String rangifflerFrontUri;
private final KafkaTemplate<String, UserJson> kafkaTemplate;

@Autowired
public RegisterController(UserService userService, @Value("${rangiffler-client.base-uri}") String rangifflerFrontUri) {
public RegisterController(UserService userService, @Value("${rangiffler-client.base-uri}") String rangifflerFrontUri, KafkaTemplate<String, UserJson> kafkaTemplate) {
this.userService = userService;
this.rangifflerFrontUri = rangifflerFrontUri;
this.kafkaTemplate = kafkaTemplate;
}

@GetMapping("/register")
Expand All @@ -60,6 +65,11 @@ public String registerUser(@Valid @ModelAttribute RegistrationModel registration
);
response.setStatus(HttpServletResponse.SC_CREATED);
model.addAttribute(MODEL_USERNAME_ATTR, registeredUserName);

UserJson user = new UserJson();
user.setUsername(registrationModel.getUsername());
kafkaTemplate.send(KAFKA_TOPIC_USERS, user);
LOG.info("### Kafka topic [users] sent message: " + user.getUsername());
} catch (DataIntegrityViolationException e) {
LOG.error("### Error while registration user: " + e.getMessage());
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
Expand Down
19 changes: 19 additions & 0 deletions rangiffler-auth/src/main/java/com/rangiffler/model/UserJson.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.rangiffler.model;

import com.fasterxml.jackson.annotation.JsonProperty;

public class UserJson {
@JsonProperty("username")
private String username;

public UserJson() {
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}
}
4 changes: 4 additions & 0 deletions rangiffler-auth/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ spring:
config:
activate:
on-profile: 'local'
kafka:
bootstrap-servers: 'localhost:9092'
datasource:
url: 'jdbc:postgresql://localhost:5432/rangiffler-auth'
jpa:
Expand All @@ -62,6 +64,8 @@ spring:
config:
activate:
on-profile: 'docker'
kafka:
bootstrap-servers: 'kafka:9092'
datasource:
url: 'jdbc:postgresql://rangiffler-all-db:5432/rangiffler-auth'
jpa:
Expand Down
5 changes: 5 additions & 0 deletions rangiffler-e-2-e-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ buildscript {
jupiterVersion = '5.9.2'
hibernateVersion = '6.1.7.Final'
lombokVersion = '1.18.26'
kafkaVersion = '3.5.0'
}
}

Expand Down Expand Up @@ -53,6 +54,7 @@ dependencies {
"com.github.vertical-blank:sql-formatter:2.0.3",
"io.qameta.allure:allure-okhttp3:${allureVersion}",
"com.codeborne:selenide:${selenideVersion}",
"org.apache.kafka:kafka-clients:${kafkaVersion}",
"com.github.javafaker:javafaker:${javaFakerVersion}",
"com.google.code.findbugs:jsr305:3.0.2",
"io.grpc:grpc-protobuf:${grpcVersion}",
Expand Down Expand Up @@ -86,5 +88,8 @@ tasks.withType(JavaCompile) {

tasks.withType(Test) {
systemProperties += System.properties
// Kafka logs
testLogging.showStandardStreams = true
testLogging.exceptionFormat = 'full'
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.rangiffler.service;


import com.rangiffler.model.*;
import com.rangiffler.model.CountryJson;
import com.rangiffler.model.FriendStatus;
import com.rangiffler.model.PhotoJson;
import com.rangiffler.model.PhotoServiceJson;
import com.rangiffler.service.configuration.PhotoServiceClient;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
Expand All @@ -25,15 +27,17 @@ public PhotoJson addPhoto(PhotoJson photoJson) {
}

public List<PhotoJson> getAllUserPhotos(String username) {
List<PhotoJson> usersPhoto = new ArrayList<>();
List<PhotoServiceJson> photos = photoService.getPhotosForUser(username);
if (!photos.isEmpty()) {
for (PhotoServiceJson photo : photos) {
CountryJson country = countryService.findById(photo.getCountryId());
usersPhoto.add(PhotoServiceJson.fromPhotoServiceJson(photo, country));
}
if (photos.isEmpty()) {
return new ArrayList<>();
}
return usersPhoto;

return photos.stream()
.map(photo -> {
CountryJson country = countryService.findById(photo.getCountryId());
return PhotoServiceJson.fromPhotoServiceJson(photo, country);
})
.toList();
}

public PhotoJson editPhoto(PhotoJson photoJson, UUID id) {
Expand All @@ -42,17 +46,12 @@ public PhotoJson editPhoto(PhotoJson photoJson, UUID id) {
}

public List<PhotoJson> getAllFriendsPhotos(String username) {
List<PhotoJson> friendsPhoto = new ArrayList<>();
List<UserJson> friends = userService.receivePeopleAround(username)
return userService.receivePeopleAround(username)
.stream()
.filter(userJson -> userJson.getFriendStatus() == FriendStatus.FRIEND)
.toList();

for (UserJson userJson : friends) {
friendsPhoto.addAll(getAllUserPhotos(userJson.getUsername()));
}

return friendsPhoto;
.map(userJson -> getAllUserPhotos(userJson.getUsername()))
.findFirst()
.orElseGet(ArrayList::new);
}

public void deletePhoto(UUID photoId) {
Expand Down
1 change: 1 addition & 0 deletions rangiffler-userdata/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ repositories {
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.kafka:spring-kafka:3.0.7'
runtimeOnly 'org.postgresql:postgresql'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.rangiffler.config;

import com.rangiffler.model.UserJson;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaUserdataConsumerConfiguration {
private final KafkaProperties kafkaProperties;

@Autowired
public KafkaUserdataConsumerConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}

@Bean
public ConsumerFactory<String, UserJson> consumerFactory() {
final JsonDeserializer<UserJson> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserJson> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserJson> concurrentKafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return concurrentKafkaListenerContainerFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public UserJson updateUserInfo(@RequestBody UserJson user) {
@GetMapping("/currentUser")
@ResponseStatus(HttpStatus.OK)
public UserJson currentUser(@RequestParam String username) {
return userService.getCurrentUserOrCreateIfAbsent(username);
return userService.getCurrentUser(username);
}

@GetMapping("/allUsers")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.rangiffler.controller.advices;


import com.rangiffler.exception.ErrorResponse;
import com.rangiffler.exception.UserNotFoundException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;

@ControllerAdvice
public class UserControllerAdvice {

@ExceptionHandler(UserNotFoundException.class)
public ResponseEntity<ErrorResponse> userNotFoundHandler(UserNotFoundException exception) {
return new ResponseEntity<>(new ErrorResponse(exception.getMessage()), HttpStatus.NOT_FOUND);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.rangiffler.exception;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;

@Data
@AllArgsConstructor
@RequiredArgsConstructor
public class ErrorResponse {

private String error;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.rangiffler.exception;

public class UserNotFoundException extends IllegalArgumentException {

public UserNotFoundException(String message) {
super(message);
}
}
Loading