diff --git a/docker-compose.yml b/docker-compose.yml index 358c90d..def3dc7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,35 @@ services: - rangiffler-network dns_search: . + zookeeper: + container_name: zookeeper + image: confluentinc/cp-zookeeper:7.3.2 + expose: + - "2181" + environment: + - ZOOKEEPER_CLIENT_PORT=2181 + - ZOOKEEPER_TICK_TIME=2000 + networks: + - rangiffler-network + + kafka: + container_name: kafka + image: confluentinc/cp-kafka:7.3.2 + expose: + - "9092" + depends_on: + zookeeper: + condition: service_started + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + networks: + - rangiffler-network + rangiffler-auth: container_name: rangiffler-auth image: rapdog64/rangiffler-auth:1.0.0 @@ -35,6 +64,8 @@ services: depends_on: rangiffler-all-db: condition: service_healthy + kafka: + condition: service_started networks: - rangiffler-network dns_search: . diff --git a/rangiffler-auth/build.gradle b/rangiffler-auth/build.gradle index e093fb4..722217b 100644 --- a/rangiffler-auth/build.gradle +++ b/rangiffler-auth/build.gradle @@ -22,6 +22,7 @@ dependencies { implementation "org.springframework.boot:spring-boot-starter-validation" implementation "org.springframework.boot:spring-boot-starter-actuator" implementation 'org.springframework.security:spring-security-oauth2-authorization-server:1.0.0' + implementation 'org.springframework.kafka:spring-kafka:3.0.7' runtimeOnly 'org.postgresql:postgresql' testImplementation 'org.springframework.boot:spring-boot-starter-test' annotationProcessor 'org.projectlombok:lombok' diff --git a/rangiffler-auth/src/main/java/com/rangiffler/config/KafkaAuthProducerConfiguration.java b/rangiffler-auth/src/main/java/com/rangiffler/config/KafkaAuthProducerConfiguration.java new file mode 100644 index 0000000..164cea0 --- /dev/null +++ b/rangiffler-auth/src/main/java/com/rangiffler/config/KafkaAuthProducerConfiguration.java @@ -0,0 +1,58 @@ +package com.rangiffler.config; + +import com.rangiffler.model.UserJson; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +@Configuration +public class KafkaAuthProducerConfiguration { + private final KafkaProperties kafkaProperties; + + @Autowired + public KafkaAuthProducerConfiguration(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Bean + public Map producerConfiguration() { + Map properties = new HashMap<>(kafkaProperties.buildProducerProperties()); + properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return properties; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfiguration()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + @Primary + public NewTopic topic() { + return TopicBuilder.name("users") + .partitions(10) + .replicas(1) + .build(); + } +} diff --git a/rangiffler-auth/src/main/java/com/rangiffler/controller/RegisterController.java b/rangiffler-auth/src/main/java/com/rangiffler/controller/RegisterController.java index 5ae5f27..0f9e51c 100644 --- a/rangiffler-auth/src/main/java/com/rangiffler/controller/RegisterController.java +++ b/rangiffler-auth/src/main/java/com/rangiffler/controller/RegisterController.java @@ -1,6 +1,7 @@ package com.rangiffler.controller; import com.rangiffler.model.RegistrationModel; +import com.rangiffler.model.UserJson; import com.rangiffler.service.UserService; import jakarta.annotation.Nonnull; import jakarta.servlet.http.HttpServletResponse; @@ -10,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.validation.BeanPropertyBindingResult; @@ -29,14 +31,17 @@ public class RegisterController { private static final String MODEL_REG_FORM_ATTR = "registrationModel"; private static final String MODEL_FRONT_URI_ATTR = "frontUri"; private static final String REG_MODEL_ERROR_BEAN_NAME = "org.springframework.validation.BindingResult.registrationModel"; + private static final String KAFKA_TOPIC_USERS = "users"; private final UserService userService; private final String rangifflerFrontUri; + private final KafkaTemplate kafkaTemplate; @Autowired - public RegisterController(UserService userService, @Value("${rangiffler-client.base-uri}") String rangifflerFrontUri) { + public RegisterController(UserService userService, @Value("${rangiffler-client.base-uri}") String rangifflerFrontUri, KafkaTemplate kafkaTemplate) { this.userService = userService; this.rangifflerFrontUri = rangifflerFrontUri; + this.kafkaTemplate = kafkaTemplate; } @GetMapping("/register") @@ -60,6 +65,11 @@ public String registerUser(@Valid @ModelAttribute RegistrationModel registration ); response.setStatus(HttpServletResponse.SC_CREATED); model.addAttribute(MODEL_USERNAME_ATTR, registeredUserName); + + UserJson user = new UserJson(); + user.setUsername(registrationModel.getUsername()); + kafkaTemplate.send(KAFKA_TOPIC_USERS, user); + LOG.info("### Kafka topic [users] sent message: " + user.getUsername()); } catch (DataIntegrityViolationException e) { LOG.error("### Error while registration user: " + e.getMessage()); response.setStatus(HttpServletResponse.SC_BAD_REQUEST); diff --git a/rangiffler-auth/src/main/java/com/rangiffler/model/UserJson.java b/rangiffler-auth/src/main/java/com/rangiffler/model/UserJson.java new file mode 100644 index 0000000..d2fad77 --- /dev/null +++ b/rangiffler-auth/src/main/java/com/rangiffler/model/UserJson.java @@ -0,0 +1,19 @@ +package com.rangiffler.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class UserJson { + @JsonProperty("username") + private String username; + + public UserJson() { + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } +} diff --git a/rangiffler-auth/src/main/resources/application.yaml b/rangiffler-auth/src/main/resources/application.yaml index 2cd09aa..368511f 100644 --- a/rangiffler-auth/src/main/resources/application.yaml +++ b/rangiffler-auth/src/main/resources/application.yaml @@ -48,6 +48,8 @@ spring: config: activate: on-profile: 'local' + kafka: + bootstrap-servers: 'localhost:9092' datasource: url: 'jdbc:postgresql://localhost:5432/rangiffler-auth' jpa: @@ -62,6 +64,8 @@ spring: config: activate: on-profile: 'docker' + kafka: + bootstrap-servers: 'kafka:9092' datasource: url: 'jdbc:postgresql://rangiffler-all-db:5432/rangiffler-auth' jpa: diff --git a/rangiffler-e-2-e-tests/build.gradle b/rangiffler-e-2-e-tests/build.gradle index a123d31..d65f345 100644 --- a/rangiffler-e-2-e-tests/build.gradle +++ b/rangiffler-e-2-e-tests/build.gradle @@ -11,6 +11,7 @@ buildscript { jupiterVersion = '5.9.2' hibernateVersion = '6.1.7.Final' lombokVersion = '1.18.26' + kafkaVersion = '3.5.0' } } @@ -53,6 +54,7 @@ dependencies { "com.github.vertical-blank:sql-formatter:2.0.3", "io.qameta.allure:allure-okhttp3:${allureVersion}", "com.codeborne:selenide:${selenideVersion}", + "org.apache.kafka:kafka-clients:${kafkaVersion}", "com.github.javafaker:javafaker:${javaFakerVersion}", "com.google.code.findbugs:jsr305:3.0.2", "io.grpc:grpc-protobuf:${grpcVersion}", @@ -86,5 +88,8 @@ tasks.withType(JavaCompile) { tasks.withType(Test) { systemProperties += System.properties + // Kafka logs + testLogging.showStandardStreams = true + testLogging.exceptionFormat = 'full' useJUnitPlatform() } \ No newline at end of file diff --git a/rangiffler-gateway/src/main/java/com/rangiffler/service/PhotoService.java b/rangiffler-gateway/src/main/java/com/rangiffler/service/PhotoService.java index c8ff2e7..904c2dd 100644 --- a/rangiffler-gateway/src/main/java/com/rangiffler/service/PhotoService.java +++ b/rangiffler-gateway/src/main/java/com/rangiffler/service/PhotoService.java @@ -1,10 +1,12 @@ package com.rangiffler.service; -import com.rangiffler.model.*; +import com.rangiffler.model.CountryJson; +import com.rangiffler.model.FriendStatus; +import com.rangiffler.model.PhotoJson; +import com.rangiffler.model.PhotoServiceJson; import com.rangiffler.service.configuration.PhotoServiceClient; import lombok.RequiredArgsConstructor; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -25,15 +27,17 @@ public PhotoJson addPhoto(PhotoJson photoJson) { } public List getAllUserPhotos(String username) { - List usersPhoto = new ArrayList<>(); List photos = photoService.getPhotosForUser(username); - if (!photos.isEmpty()) { - for (PhotoServiceJson photo : photos) { - CountryJson country = countryService.findById(photo.getCountryId()); - usersPhoto.add(PhotoServiceJson.fromPhotoServiceJson(photo, country)); - } + if (photos.isEmpty()) { + return new ArrayList<>(); } - return usersPhoto; + + return photos.stream() + .map(photo -> { + CountryJson country = countryService.findById(photo.getCountryId()); + return PhotoServiceJson.fromPhotoServiceJson(photo, country); + }) + .toList(); } public PhotoJson editPhoto(PhotoJson photoJson, UUID id) { @@ -42,17 +46,12 @@ public PhotoJson editPhoto(PhotoJson photoJson, UUID id) { } public List getAllFriendsPhotos(String username) { - List friendsPhoto = new ArrayList<>(); - List friends = userService.receivePeopleAround(username) + return userService.receivePeopleAround(username) .stream() .filter(userJson -> userJson.getFriendStatus() == FriendStatus.FRIEND) - .toList(); - - for (UserJson userJson : friends) { - friendsPhoto.addAll(getAllUserPhotos(userJson.getUsername())); - } - - return friendsPhoto; + .map(userJson -> getAllUserPhotos(userJson.getUsername())) + .findFirst() + .orElseGet(ArrayList::new); } public void deletePhoto(UUID photoId) { diff --git a/rangiffler-userdata/build.gradle b/rangiffler-userdata/build.gradle index e1f87f7..e8d8938 100644 --- a/rangiffler-userdata/build.gradle +++ b/rangiffler-userdata/build.gradle @@ -22,6 +22,7 @@ repositories { dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + implementation 'org.springframework.kafka:spring-kafka:3.0.7' runtimeOnly 'org.postgresql:postgresql' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' diff --git a/rangiffler-userdata/src/main/java/com/rangiffler/config/KafkaUserdataConsumerConfiguration.java b/rangiffler-userdata/src/main/java/com/rangiffler/config/KafkaUserdataConsumerConfiguration.java new file mode 100644 index 0000000..5db6dba --- /dev/null +++ b/rangiffler-userdata/src/main/java/com/rangiffler/config/KafkaUserdataConsumerConfiguration.java @@ -0,0 +1,37 @@ +package com.rangiffler.config; + +import com.rangiffler.model.UserJson; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +@Configuration +public class KafkaUserdataConsumerConfiguration { + private final KafkaProperties kafkaProperties; + + @Autowired + public KafkaUserdataConsumerConfiguration(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Bean + public ConsumerFactory consumerFactory() { + final JsonDeserializer jsonDeserializer = new JsonDeserializer<>(); + jsonDeserializer.addTrustedPackages("*"); + return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory + = new ConcurrentKafkaListenerContainerFactory<>(); + concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); + return concurrentKafkaListenerContainerFactory; + } +} diff --git a/rangiffler-userdata/src/main/java/com/rangiffler/controller/UserController.java b/rangiffler-userdata/src/main/java/com/rangiffler/controller/UserController.java index 15b827b..c8742bc 100644 --- a/rangiffler-userdata/src/main/java/com/rangiffler/controller/UserController.java +++ b/rangiffler-userdata/src/main/java/com/rangiffler/controller/UserController.java @@ -33,7 +33,7 @@ public UserJson updateUserInfo(@RequestBody UserJson user) { @GetMapping("/currentUser") @ResponseStatus(HttpStatus.OK) public UserJson currentUser(@RequestParam String username) { - return userService.getCurrentUserOrCreateIfAbsent(username); + return userService.getCurrentUser(username); } @GetMapping("/allUsers") diff --git a/rangiffler-userdata/src/main/java/com/rangiffler/controller/advices/UserControllerAdvice.java b/rangiffler-userdata/src/main/java/com/rangiffler/controller/advices/UserControllerAdvice.java new file mode 100644 index 0000000..74978b2 --- /dev/null +++ b/rangiffler-userdata/src/main/java/com/rangiffler/controller/advices/UserControllerAdvice.java @@ -0,0 +1,18 @@ +package com.rangiffler.controller.advices; + + +import com.rangiffler.exception.ErrorResponse; +import com.rangiffler.exception.UserNotFoundException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; + +@ControllerAdvice +public class UserControllerAdvice { + + @ExceptionHandler(UserNotFoundException.class) + public ResponseEntity userNotFoundHandler(UserNotFoundException exception) { + return new ResponseEntity<>(new ErrorResponse(exception.getMessage()), HttpStatus.NOT_FOUND); + } +} diff --git a/rangiffler-userdata/src/main/java/com/rangiffler/exception/ErrorResponse.java b/rangiffler-userdata/src/main/java/com/rangiffler/exception/ErrorResponse.java new file mode 100644 index 0000000..e145b60 --- /dev/null +++ b/rangiffler-userdata/src/main/java/com/rangiffler/exception/ErrorResponse.java @@ -0,0 +1,13 @@ +package com.rangiffler.exception; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +@Data +@AllArgsConstructor +@RequiredArgsConstructor +public class ErrorResponse { + + private String error; +} diff --git a/rangiffler-userdata/src/main/java/com/rangiffler/exception/UserNotFoundException.java b/rangiffler-userdata/src/main/java/com/rangiffler/exception/UserNotFoundException.java new file mode 100644 index 0000000..d55122a --- /dev/null +++ b/rangiffler-userdata/src/main/java/com/rangiffler/exception/UserNotFoundException.java @@ -0,0 +1,8 @@ +package com.rangiffler.exception; + +public class UserNotFoundException extends IllegalArgumentException { + + public UserNotFoundException(String message) { + super(message); + } +} diff --git a/rangiffler-userdata/src/main/java/com/rangiffler/service/UserDataService.java b/rangiffler-userdata/src/main/java/com/rangiffler/service/UserDataService.java index a90126b..08c2d3f 100644 --- a/rangiffler-userdata/src/main/java/com/rangiffler/service/UserDataService.java +++ b/rangiffler-userdata/src/main/java/com/rangiffler/service/UserDataService.java @@ -3,10 +3,16 @@ import com.rangiffler.data.FriendsEntity; import com.rangiffler.data.UserEntity; import com.rangiffler.data.repository.UserRepository; +import com.rangiffler.exception.UserNotFoundException; import com.rangiffler.model.FriendJson; import com.rangiffler.model.FriendState; import com.rangiffler.model.UserJson; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @@ -20,6 +26,7 @@ @Component public class UserDataService { + private static final Logger LOG = LoggerFactory.getLogger(UserDataService.class); private final UserRepository userRepository; @Autowired @@ -27,6 +34,20 @@ public UserDataService(UserRepository userRepository) { this.userRepository = userRepository; } + @KafkaListener(topics = "users", groupId = "userdata") + public void listener(@Payload UserJson user, ConsumerRecord cr) { + LOG.info("### Kafka topic [users] received message: " + user.getUserName()); + LOG.info("### Kafka consumer record: " + cr.toString()); + UserEntity userDataEntity = new UserEntity(); + userDataEntity.setUsername(user.getUserName()); + UserEntity userEntity = userRepository.save(userDataEntity); + LOG.info(String.format( + "### User '%s' successfully saved to database with id: %s", + user.getUserName(), + userEntity.getId() + )); + } + public UserJson update(UserJson user) { UserEntity userEntity = userRepository.findByUsername(user.getUserName()); userEntity.setFirstname(user.getFirstname()); @@ -37,15 +58,8 @@ public UserJson update(UserJson user) { return UserJson.fromEntity(saved); } - public UserJson getCurrentUserOrCreateIfAbsent(String username) { - UserEntity userDataEntity = userRepository.findByUsername(username); - if (userDataEntity == null) { - userDataEntity = new UserEntity(); - userDataEntity.setUsername(username); - return UserJson.fromEntity(userRepository.save(userDataEntity)); - } else { - return UserJson.fromEntity(userDataEntity); - } + public UserJson getCurrentUser(String username) { + return UserJson.fromEntity(getRequiredUser(username)); } public List receivePeopleAround(String username) { @@ -154,4 +168,9 @@ public List removeFriend(String username, String friendUsername) { : FriendState.FRIEND)) .toList(); } + + private UserEntity getRequiredUser(String username) { + return Optional.ofNullable(userRepository.findByUsername(username)) + .orElseThrow(() -> new UserNotFoundException("Can`t find user by username: " + username)); + } }