From c74c17cab5bbcc5bf2e6daca7f996c257718e5f9 Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Fri, 4 Oct 2024 11:27:31 +0200 Subject: [PATCH 1/3] Refactor: Renomme des paths car cetait des noms par defaut Supp: suppression du .log inutile si .bad --- .../fr/abes/logskbart/kafka/LogsListener.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/main/java/fr/abes/logskbart/kafka/LogsListener.java b/src/main/java/fr/abes/logskbart/kafka/LogsListener.java index 93d8715..e80f120 100644 --- a/src/main/java/fr/abes/logskbart/kafka/LogsListener.java +++ b/src/main/java/fr/abes/logskbart/kafka/LogsListener.java @@ -13,7 +13,10 @@ import java.io.File; import java.io.IOException; -import java.nio.file.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.sql.Timestamp; import java.time.LocalDateTime; @@ -70,7 +73,7 @@ public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord Date: Mon, 7 Oct 2024 15:23:11 +0200 Subject: [PATCH 2/3] Ajout gain de temps pour multi thread --- .../fr/abes/logskbart/entity/LogKbart.java | 6 +- .../fr/abes/logskbart/kafka/LogsListener.java | 142 +++++++++--------- .../repository/LogKbartRepository.java | 6 + .../abes/logskbart/service/LogsService.java | 19 +++ src/main/resources/application-dev.properties | 3 +- .../resources/application-prod.properties | 3 +- .../resources/application-test.properties | 3 +- 7 files changed, 108 insertions(+), 74 deletions(-) diff --git a/src/main/java/fr/abes/logskbart/entity/LogKbart.java b/src/main/java/fr/abes/logskbart/entity/LogKbart.java index 60e8cd0..650fd92 100644 --- a/src/main/java/fr/abes/logskbart/entity/LogKbart.java +++ b/src/main/java/fr/abes/logskbart/entity/LogKbart.java @@ -53,9 +53,12 @@ public class LogKbart implements Serializable { @Column(name = "THREAD_PRIORITY") private Integer threadPriority; - @Column(name = "NB_LINE") + @Column(name = "NB_LINE", nullable = false) private Integer nbLine; + @Column(name = "NB_RUN", nullable = false) + private Integer nbRun = 0; + @Override public String toString() { return "LogKbart{" + @@ -66,6 +69,7 @@ public String toString() { ", message='" + message + '\'' + ", loggerFqcn='" + loggerFqcn + '\'' + ", nbLine='" + nbLine + '\'' + + ", nbRun='" + nbRun + '\'' + '}'; } diff --git a/src/main/java/fr/abes/logskbart/kafka/LogsListener.java b/src/main/java/fr/abes/logskbart/kafka/LogsListener.java index e80f120..355c266 100644 --- a/src/main/java/fr/abes/logskbart/kafka/LogsListener.java +++ b/src/main/java/fr/abes/logskbart/kafka/LogsListener.java @@ -3,8 +3,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import fr.abes.logskbart.dto.LogKbartDto; import fr.abes.logskbart.entity.LogKbart; -import fr.abes.logskbart.repository.LogKbartRepository; import fr.abes.logskbart.service.EmailService; +import fr.abes.logskbart.service.LogsService; import fr.abes.logskbart.utils.UtilsMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -21,8 +21,9 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.Arrays; import java.util.Date; +import java.util.List; + @Slf4j @Service @@ -32,14 +33,14 @@ public class LogsListener { private final UtilsMapper logsMapper; - private final LogKbartRepository repository; + private final LogsService service; private final EmailService emailService; - public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogKbartRepository repository, EmailService emailService) { + public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogsService service, EmailService emailService) { this.mapper = mapper; this.logsMapper = logsMapper; - this.repository = repository; + this.service = service; this.emailService = emailService; } @@ -50,85 +51,33 @@ public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogKbartReposit * @param message le message kafka * @throws IOException exception levée */ - @KafkaListener(topics = {"${topic.name.source.error}"}, groupId = "${topic.groupid.source}", containerFactory = "kafkaLogsListenerContainerFactory") + @KafkaListener(topics = {"${topic.name.source.error}"}, groupId = "${topic.groupid.source}", containerFactory = "kafkaLogsListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}") public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord message) throws IOException { LogKbartDto dto = mapper.readValue(message.value(), LogKbartDto.class); LogKbart logKbart = logsMapper.map(dto, LogKbart.class); - String[] listMessage = message.key().split(";"); - log.debug(Arrays.toString(listMessage)); // recuperation de l'heure a laquelle le message a ete envoye Timestamp currentTimestamp = new Timestamp(message.timestamp()); logKbart.setTimestamp(new Date(currentTimestamp.getTime())); - logKbart.setPackageName(listMessage[0]); - String nbLineOrigine = (listMessage.length > 1) ? listMessage[1] : ""; + logKbart.setPackageName(message.key().split(";")[0]); + String nbLineOrigine = (message.key().split(";").length > 1) ? message.key().split(";")[1] : ""; logKbart.setNbLine(Integer.parseInt((nbLineOrigine.isEmpty() ? "-1" : nbLineOrigine) )); + Integer nbRun = service.getLastNbRun(logKbart.getPackageName()); + if(logKbart.getMessage().contains("Debut envois kafka de :")){ + nbRun++; + } + logKbart.setNbRun(nbRun); + logKbart.log(); + // Inscrit l'entity en BDD + service.save(logKbart); - // Vérifie qu'un fichier portant le nom du kbart en cours existe if (!logKbart.getPackageName().contains("ctx:package") && !logKbart.getPackageName().contains("_FORCE")) { - - Path tempPath = Path.of("tempLogLocal"); - if(!Files.exists(tempPath)) { - Files.createDirectory(tempPath); - } - Path pathOfBadLocal = Path.of("tempLogLocal" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad")); - - // Si la ligne de log sur le topic est de type ERROR - if (logKbart.getLevel().toString().equals("ERROR")) { - - // vérifie la présence de fichiers obsolètes dans le répertoire tempLogLocal et les supprime le cas échéant - deleteOldLocalTempLog(); - - String line = nbLineOrigine + "\t" + logKbart.getMessage(); - - if (Files.exists(pathOfBadLocal)) { - // Inscrit la ligne dedans - Files.write(pathOfBadLocal, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND); - } else if (!Files.exists(pathOfBadLocal)) { - try { - // Créer le fichier et inscrit la ligne dedans - Files.createFile(pathOfBadLocal); - // Créer la ligne d'en-tête - Files.write(pathOfBadLocal, ("LINE\tMESSAGE" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND); - // Inscrit les informations sur la ligne - Files.write(pathOfBadLocal, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND); - log.info("Fichier temporaire créé."); - } catch (SecurityException | IOException e) { - log.error("Erreur lors de la création du fichier temporaire. " + e); - throw new RuntimeException(e); - } - } - } else if (logKbart.getLevel().toString().equals("INFO")) { - // On verifie que le traitement commence pour supp les anciens logs du .bad (ps message venant de kbart2kafka) - if (logKbart.getMessage().contains("Debut envois kafka de : " + logKbart.getPackageName())){ - Files.deleteIfExists(pathOfBadLocal); - // On verifie que le traitement est terminé (ps message venant de best-ppn-api ou kbart2kafka) - }else if( (logKbart.getMessage().contains("Traitement terminé pour fichier " + logKbart.getPackageName())) || (logKbart.getMessage().contains("Traitement refusé du fichier " + logKbart.getPackageName())) ) { - // Envoi du mail uniquement si le fichier temporaire a été créé - if (Files.exists(pathOfBadLocal)) { - Path tempPathTarget = Path.of("tempLog"); - if (!Files.exists(tempPathTarget)) { - Files.createDirectory(tempPathTarget); - } - // Copie le fichier existant vers le répertoire temporaire - Path pathOfBadFinal = Path.of("tempLog" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad")); - // Déplacement du fichier - Files.copy(pathOfBadLocal, pathOfBadFinal, StandardCopyOption.REPLACE_EXISTING); - log.info("Fichier de log transféré dans le dossier temporaire."); - - // Suppression du .log car Useless si cas là - Path pathOfLog = Path.of("tempLog" + File.separator + logKbart.getPackageName().replace(".tsv", ".log")); - Files.deleteIfExists(pathOfLog); - - emailService.sendMailWithAttachment(logKbart.getPackageName(), pathOfBadLocal); - } - } + if( (logKbart.getMessage().contains("Traitement terminé pour fichier " + logKbart.getPackageName())) || (logKbart.getMessage().contains("Traitement refusé du fichier " + logKbart.getPackageName())) ) { + createFileBad(logKbart.getPackageName(),nbRun); } } - // Inscrit l'entity en BDD - repository.save(logKbart); } public void deleteOldLocalTempLog() throws IOException { @@ -150,4 +99,57 @@ public void deleteOldLocalTempLog() throws IOException { } } } + + private void createFileBad(String filename, Integer nbRun) throws IOException { + List logKbartList = service.getErrorLogKbartByPackageAndNbRun(filename,nbRun); + Path tempPath = Path.of("tempLogLocal"); + if(!Files.exists(tempPath)) { + Files.createDirectory(tempPath); + } + Path pathOfBadLocal = Path.of("tempLogLocal" + File.separator + filename.replace(".tsv", ".bad")); + // vérifie la présence de fichiers obsolètes dans le répertoire tempLogLocal et les supprime le cas échéant + deleteOldLocalTempLog(); + + logKbartList.forEach(logKbart -> { + try { + if (Files.exists(pathOfBadLocal)) { + // Inscrit la ligne dedans + Files.write(pathOfBadLocal, (logKbart.getNbLine() + "\t" + logKbart.getMessage() + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND); + } else { + // Créer le fichier et inscrit la ligne dedans + Files.createFile(pathOfBadLocal); + // Créer la ligne d'en-tête + Files.write(pathOfBadLocal, ("LINE\tMESSAGE\t(" + nbRun + ")" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND); + // Inscrit les informations sur la ligne + Files.write(pathOfBadLocal, (logKbart.getNbLine() + "\t" + logKbart.getMessage() + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND); + log.info("Fichier temporaire créé."); + } + } catch (IOException e) { + log.error("Erreur lors de la création du fichier temporaire. " + e); + throw new RuntimeException(e); + } + }); + + if (Files.exists(pathOfBadLocal)) { + Path tempPathTarget = Path.of("tempLog"); + if (!Files.exists(tempPathTarget)) { + Files.createDirectory(tempPathTarget); + } + // Copie le fichier existant vers le répertoire temporaire + Path pathOfBadFinal = Path.of("tempLog" + File.separator + filename.replace(".tsv", ".bad")); + // Déplacement du fichier + Files.copy(pathOfBadLocal, pathOfBadFinal, StandardCopyOption.REPLACE_EXISTING); + log.info("Fichier de log transféré dans le dossier temporaire."); + + // Suppression du .log car Useless si cas là + Path pathOfLog = Path.of("tempLog" + File.separator + filename.replace(".tsv", ".log")); + log.info("Suppression de " + pathOfLog.toString()); + Files.deleteIfExists(pathOfLog); + log.info("Suppression de " + pathOfBadLocal + " en local"); + Files.deleteIfExists(pathOfBadLocal); + +// emailService.sendMailWithAttachment(filename, pathOfBadLocal); + } + + } } diff --git a/src/main/java/fr/abes/logskbart/repository/LogKbartRepository.java b/src/main/java/fr/abes/logskbart/repository/LogKbartRepository.java index 72f26b3..24c5716 100644 --- a/src/main/java/fr/abes/logskbart/repository/LogKbartRepository.java +++ b/src/main/java/fr/abes/logskbart/repository/LogKbartRepository.java @@ -2,14 +2,20 @@ import fr.abes.logskbart.configuration.LogsBdConfiguration; import fr.abes.logskbart.entity.LogKbart; +import fr.abes.logskbart.utils.Level; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; import java.util.Date; import java.util.List; +import java.util.Optional; @Repository @LogsBdConfiguration public interface LogKbartRepository extends JpaRepository { List findAllByPackageNameAndTimestampBetweenOrderByNbLineAscTimestampAsc(String filename, Date debut, Date fin); + + List findAllByPackageNameAndNbRunAndLevelOrderByNbLineAscTimestampAsc(String filename, Integer nbRun, Level level); + + Optional getFirstByPackageNameOrderByNbRunDesc(String filename); } diff --git a/src/main/java/fr/abes/logskbart/service/LogsService.java b/src/main/java/fr/abes/logskbart/service/LogsService.java index 0aeaf99..0518988 100644 --- a/src/main/java/fr/abes/logskbart/service/LogsService.java +++ b/src/main/java/fr/abes/logskbart/service/LogsService.java @@ -2,12 +2,14 @@ import fr.abes.logskbart.entity.LogKbart; import fr.abes.logskbart.repository.LogKbartRepository; +import fr.abes.logskbart.utils.Level; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.Calendar; import java.util.Date; import java.util.List; +import java.util.Optional; @Service @Slf4j @@ -26,4 +28,21 @@ public List getLogKbartForPackage(String packageName, Date date) { log.debug("packageName {}, Date début {}, Date fin {}", packageName, dateChargement.getTime(), dateFin.getTime()); return repository.findAllByPackageNameAndTimestampBetweenOrderByNbLineAscTimestampAsc(packageName, dateChargement.getTime(), dateFin.getTime()); } + + public LogKbart save(LogKbart logKbart) { + return repository.save(logKbart); + } + + public Integer getLastNbRun(String packageName) { + Optional logKbart = repository.getFirstByPackageNameOrderByNbRunDesc(packageName); + if(logKbart.isPresent()) { + return logKbart.get().getNbRun(); + } else { + return 0; + } + } + + public List getErrorLogKbartByPackageAndNbRun(String packageName, Integer nbRun) { + return repository.findAllByPackageNameAndNbRunAndLevelOrderByNbLineAscTimestampAsc(packageName,nbRun, Level.ERROR); + } } diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 1fa18bb..fb3edd2 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -1,5 +1,6 @@ # Consumer properties spring.kafka.consumer.bootstrap-servers= +abes.kafka.concurrency.nbThread= # Properties defined from .env on server #ignore resolution error spring.datasource.logsdb.driver-class-name=org.postgresql.Driver @@ -22,4 +23,4 @@ topic.groupid.source=logskbartConsumer # Mailing mail.ws.url= -mail.ws.recipient= \ No newline at end of file +mail.ws.recipient= diff --git a/src/main/resources/application-prod.properties b/src/main/resources/application-prod.properties index 550ff67..34b174a 100644 --- a/src/main/resources/application-prod.properties +++ b/src/main/resources/application-prod.properties @@ -1,5 +1,6 @@ # Consumer properties spring.kafka.consumer.bootstrap-servers= +abes.kafka.concurrency.nbThread= # Base Postgres # Properties defined from .env on server #ignore resolution error @@ -24,4 +25,4 @@ topic.groupid.source=logskbartConsumer # Mailing mail.ws.url= -mail.ws.recipient= \ No newline at end of file +mail.ws.recipient= diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index f285439..4e98d54 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -1,5 +1,6 @@ # Consumer properties spring.kafka.consumer.bootstrap-servers= +abes.kafka.concurrency.nbThread= # Properties defined from .env on server #ignore resolution error spring.datasource.logsdb.driver-class-name=org.postgresql.Driver @@ -22,4 +23,4 @@ topic.groupid.source=logskbartConsumer # Mailing mail.ws.url= -mail.ws.recipient= \ No newline at end of file +mail.ws.recipient= From e4dff08cb6353def23718b7484d10a6bd27c493f Mon Sep 17 00:00:00 2001 From: SamuelQuetin Date: Tue, 8 Oct 2024 11:31:57 +0200 Subject: [PATCH 3/3] remise du email --- .../fr/abes/logskbart/kafka/LogsListener.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/main/java/fr/abes/logskbart/kafka/LogsListener.java b/src/main/java/fr/abes/logskbart/kafka/LogsListener.java index 355c266..2e21a79 100644 --- a/src/main/java/fr/abes/logskbart/kafka/LogsListener.java +++ b/src/main/java/fr/abes/logskbart/kafka/LogsListener.java @@ -57,11 +57,12 @@ public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord 1) ? message.key().split(";")[1] : ""; - logKbart.setNbLine(Integer.parseInt((nbLineOrigine.isEmpty() ? "-1" : nbLineOrigine) )); + logKbart.setPackageName(key[0]); + logKbart.setNbLine(Integer.parseInt(((key.length > 1) ? key[1] : "-1") )); Integer nbRun = service.getLastNbRun(logKbart.getPackageName()); if(logKbart.getMessage().contains("Debut envois kafka de :")){ @@ -91,9 +92,8 @@ public void deleteOldLocalTempLog() throws IOException { Date dateOfLastModification = new Date(basicFileAttributes.lastModifiedTime().toMillis()); Date dateNow = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()); long interval = dateNow.getTime() - dateOfLastModification.getTime(); - if (interval > 600000) { - Files.deleteIfExists(fileToCheck.toPath()); - log.debug("Fichier obsolète supprimé : " + nameFile); + if (interval > 600000 && Files.deleteIfExists(fileToCheck.toPath())) { + log.debug("Fichier obsolète supprimé : {}", nameFile); } } } @@ -143,12 +143,13 @@ private void createFileBad(String filename, Integer nbRun) throws IOException { // Suppression du .log car Useless si cas là Path pathOfLog = Path.of("tempLog" + File.separator + filename.replace(".tsv", ".log")); - log.info("Suppression de " + pathOfLog.toString()); + log.info("Suppression de " + pathOfLog); Files.deleteIfExists(pathOfLog); + + emailService.sendMailWithAttachment(filename, pathOfBadLocal); + log.info("Suppression de " + pathOfBadLocal + " en local"); Files.deleteIfExists(pathOfBadLocal); - -// emailService.sendMailWithAttachment(filename, pathOfBadLocal); } }