Skip to content

Commit

Permalink
[feature]: Kafka code for receiving users
Browse files Browse the repository at this point in the history
  • Loading branch information
RapDog64 authored and Denis Kolovorotnyi committed Jul 13, 2023
1 parent e234143 commit b0bb1df
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.rangiffler.controller;

import com.rangiffler.model.RegistrationModel;
import com.rangiffler.model.UserJson;
import com.rangiffler.service.UserService;
import jakarta.annotation.Nonnull;
import jakarta.servlet.http.HttpServletResponse;
Expand All @@ -10,6 +11,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.validation.BeanPropertyBindingResult;
Expand All @@ -29,14 +31,17 @@ public class RegisterController {
private static final String MODEL_REG_FORM_ATTR = "registrationModel";
private static final String MODEL_FRONT_URI_ATTR = "frontUri";
private static final String REG_MODEL_ERROR_BEAN_NAME = "org.springframework.validation.BindingResult.registrationModel";
private static final String KAFKA_TOPIC_USERS = "users";

private final UserService userService;
private final String rangifflerFrontUri;
private final KafkaTemplate<String, UserJson> kafkaTemplate;

@Autowired
public RegisterController(UserService userService, @Value("${rangiffler-client.base-uri}") String rangifflerFrontUri) {
public RegisterController(UserService userService, @Value("${rangiffler-client.base-uri}") String rangifflerFrontUri, KafkaTemplate<String, UserJson> kafkaTemplate) {
this.userService = userService;
this.rangifflerFrontUri = rangifflerFrontUri;
this.kafkaTemplate = kafkaTemplate;
}

@GetMapping("/register")
Expand All @@ -60,6 +65,11 @@ public String registerUser(@Valid @ModelAttribute RegistrationModel registration
);
response.setStatus(HttpServletResponse.SC_CREATED);
model.addAttribute(MODEL_USERNAME_ATTR, registeredUserName);

UserJson user = new UserJson();
user.setUsername(registrationModel.getUsername());
kafkaTemplate.send(KAFKA_TOPIC_USERS, user);
LOG.info("### Kafka topic [users] sent message: " + user.getUsername());
} catch (DataIntegrityViolationException e) {
LOG.error("### Error while registration user: " + e.getMessage());
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public UserJson updateUserInfo(@RequestBody UserJson user) {
@GetMapping("/currentUser")
@ResponseStatus(HttpStatus.OK)
public UserJson currentUser(@RequestParam String username) {
return userService.getCurrentUserOrCreateIfAbsent(username);
return userService.getCurrentUser(username);
}

@GetMapping("/allUsers")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.rangiffler.controller.advices;


import com.rangiffler.exception.ErrorResponse;
import com.rangiffler.exception.UserNotFoundException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;

@ControllerAdvice
public class UserControllerAdvice {

@ExceptionHandler(UserNotFoundException.class)
public ResponseEntity<ErrorResponse> userNotFoundHandler(UserNotFoundException exception) {
return new ResponseEntity<>(new ErrorResponse(exception.getMessage()), HttpStatus.NOT_FOUND);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.rangiffler.exception;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;

@Data
@AllArgsConstructor
@RequiredArgsConstructor
public class ErrorResponse {

private String error;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.rangiffler.exception;

public class UserNotFoundException extends IllegalArgumentException {

public UserNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@
import com.rangiffler.data.FriendsEntity;
import com.rangiffler.data.UserEntity;
import com.rangiffler.data.repository.UserRepository;
import com.rangiffler.exception.UserNotFoundException;
import com.rangiffler.model.FriendJson;
import com.rangiffler.model.FriendState;
import com.rangiffler.model.UserJson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
Expand All @@ -20,13 +26,28 @@
@Component
public class UserDataService {

private static final Logger LOG = LoggerFactory.getLogger(UserDataService.class);
private final UserRepository userRepository;

@Autowired
public UserDataService(UserRepository userRepository) {
this.userRepository = userRepository;
}

@KafkaListener(topics = "users", groupId = "userdata")
public void listener(@Payload UserJson user, ConsumerRecord<String, UserJson> cr) {
LOG.info("### Kafka topic [users] received message: " + user.getUserName());
LOG.info("### Kafka consumer record: " + cr.toString());
UserEntity userDataEntity = new UserEntity();
userDataEntity.setUsername(user.getUserName());
UserEntity userEntity = userRepository.save(userDataEntity);
LOG.info(String.format(
"### User '%s' successfully saved to database with id: %s",
user.getUserName(),
userEntity.getId()
));
}

public UserJson update(UserJson user) {
UserEntity userEntity = userRepository.findByUsername(user.getUserName());
userEntity.setFirstname(user.getFirstname());
Expand All @@ -37,15 +58,8 @@ public UserJson update(UserJson user) {
return UserJson.fromEntity(saved);
}

public UserJson getCurrentUserOrCreateIfAbsent(String username) {
UserEntity userDataEntity = userRepository.findByUsername(username);
if (userDataEntity == null) {
userDataEntity = new UserEntity();
userDataEntity.setUsername(username);
return UserJson.fromEntity(userRepository.save(userDataEntity));
} else {
return UserJson.fromEntity(userDataEntity);
}
public UserJson getCurrentUser(String username) {
return UserJson.fromEntity(getRequiredUser(username));
}

public List<UserJson> receivePeopleAround(String username) {
Expand Down Expand Up @@ -154,4 +168,9 @@ public List<UserJson> removeFriend(String username, String friendUsername) {
: FriendState.FRIEND))
.toList();
}

private UserEntity getRequiredUser(String username) {
return Optional.ofNullable(userRepository.findByUsername(username))
.orElseThrow(() -> new UserNotFoundException("Can`t find user by username: " + username));
}
}

0 comments on commit b0bb1df

Please sign in to comment.