Skip to content

Commit

Permalink
Merge pull request #38
Browse files Browse the repository at this point in the history
FEAT : CDE-403-sur-un-gros-fichier-le-bad-ecrase-son-contenu-au-fil-d…
  • Loading branch information
jvk88511334 authored Apr 5, 2024
2 parents 4d6397d + c8dd7b7 commit 4895aae
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions src/main/java/fr/abes/logskbart/kafka/LogsListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@
import fr.abes.logskbart.utils.UtilsMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.*;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -48,13 +50,13 @@ public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogKbartReposit


/**
* Ecoute les topic de log d'erreurs et de fin de traitement bestPpn et génère un fichier err pour chaque fichier kbart
* Ecoute le topic de log d'erreurs et génère un fichier bad pour chaque fichier kbart
*
* @param message le message kafka
* @throws IOException exception levée
*/
@KafkaListener(topics = {"${topic.name.source.error}"}, groupId = "${topic.groupid.source}", containerFactory = "kafkaLogsListenerContainerFactory")
public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, String> message) throws IOException, InterruptedException {
public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, String> message) throws IOException {
LogKbartDto dto = mapper.readValue(message.value(), LogKbartDto.class);
LogKbart logKbart = logsMapper.map(dto, LogKbart.class);

Expand All @@ -71,11 +73,12 @@ public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, Stri

// Vérifie qu'un fichier portant le nom du kbart en cours existe
if (!logKbart.getPackageName().contains("ctx:package") && !logKbart.getPackageName().contains("_FORCE")) {
Path tempPath = Path.of("tempLog");

Path tempPath = Path.of("tempLogLocal");
if(!Files.exists(tempPath)) {
Files.createDirectory(tempPath);
}
Path of = Path.of("tempLog" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad"));
Path of = Path.of("tempLogLocal" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad"));

// Si la ligne de log sur le topic est de type ERROR
if (logKbart.getLevel().toString().equals("ERROR")) {
Expand All @@ -94,40 +97,36 @@ public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, Stri
String line = nbLineOrigine + "\t" + logKbart.getMessage();

if (Files.exists(of)) {
FileOutputStream fos = new FileOutputStream(String.valueOf(of), true);
FileLock lock = fos.getChannel().lock();
try (fos) {
// Inscrit la ligne dedans
Files.write(of, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
} finally {
lock.release();
lock.close();
}
// Inscrit la ligne dedans
Files.write(of, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
} else if (!Files.exists(of)) {
try {
FileOutputStream fos = new FileOutputStream(String.valueOf(of), true);
FileLock lock = fos.getChannel().lock();
try (fos) {
// Créer le fichier et inscrit la ligne dedans
Files.createFile(of);
// Créer la ligne d'en-tête
Files.write(of, ("LINE\tMESSAGE" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
// Inscrit les informations sur la ligne
Files.write(of, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
log.info("Fichier temporaire créé.");
} finally {
lock.release();
lock.close();
}
} catch (SecurityException | IOException e) {
// Créer le fichier et inscrit la ligne dedans
Files.createFile(of);
// Créer la ligne d'en-tête
Files.write(of, ("LINE\tMESSAGE" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
// Inscrit les informations sur la ligne
Files.write(of, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
log.info("Fichier temporaire créé.");
} catch (SecurityException | IOException e) {
log.error("Erreur lors de la création du fichier temporaire. " + e);
throw new RuntimeException(e);
}
}
} else if (logKbart.getLevel().toString().equals("INFO") && logKbart.getMessage().contains("Traitement terminé pour fichier " + logKbart.getPackageName())) {
// Envoi du mail uniquement si le fichier temporaire a été créé
if (Files.exists(of)) {
Thread.sleep(20000); // pour attendre que tous les threads de best-ppn-api aient terminé leurs traitements
Path tempPathTarget = Path.of("tempLog");
if(!Files.exists(tempPathTarget)) {
Files.createDirectory(tempPathTarget);
}
// Copie le fichier existant vers le répertoire temporaire en ajoutant sa date de création
if (of != null && Files.exists(of)) {
Path target = Path.of("tempLog" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad"));
// Déplacement du fichier
Files.copy(of, target, StandardCopyOption.REPLACE_EXISTING);
log.info("Fichier de log transféré dans le dossier temporaire.");
}
emailService.sendMailWithAttachment(logKbart.getPackageName(), of);
}
}
Expand Down

0 comments on commit 4895aae

Please sign in to comment.