Skip to content

Commit

Permalink
Merge pull request #46 from abes-esr/CDE-458-voir-pour-améliorer-la-v…
Browse files Browse the repository at this point in the history
…itesse-de-création-du-.bad

Cde 458 voir pour améliorer la vitesse de création du .bad
  • Loading branch information
pierre-maraval authored Oct 8, 2024
2 parents 4321bfa + e4dff08 commit a29c3b2
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 78 deletions.
6 changes: 5 additions & 1 deletion src/main/java/fr/abes/logskbart/entity/LogKbart.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ public class LogKbart implements Serializable {
@Column(name = "THREAD_PRIORITY")
private Integer threadPriority;

@Column(name = "NB_LINE")
@Column(name = "NB_LINE", nullable = false)
private Integer nbLine;

@Column(name = "NB_RUN", nullable = false)
private Integer nbRun = 0;

@Override
public String toString() {
return "LogKbart{" +
Expand All @@ -66,6 +69,7 @@ public String toString() {
", message='" + message + '\'' +
", loggerFqcn='" + loggerFqcn + '\'' +
", nbLine='" + nbLine + '\'' +
", nbRun='" + nbRun + '\'' +

'}';
}
Expand Down
151 changes: 77 additions & 74 deletions src/main/java/fr/abes/logskbart/kafka/LogsListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.logskbart.dto.LogKbartDto;
import fr.abes.logskbart.entity.LogKbart;
import fr.abes.logskbart.repository.LogKbartRepository;
import fr.abes.logskbart.service.EmailService;
import fr.abes.logskbart.service.LogsService;
import fr.abes.logskbart.utils.UtilsMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -21,8 +21,9 @@
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Date;
import java.util.List;


@Slf4j
@Service
Expand All @@ -32,14 +33,14 @@ public class LogsListener {

private final UtilsMapper logsMapper;

private final LogKbartRepository repository;
private final LogsService service;

private final EmailService emailService;

public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogKbartRepository repository, EmailService emailService) {
public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogsService service, EmailService emailService) {
this.mapper = mapper;
this.logsMapper = logsMapper;
this.repository = repository;
this.service = service;
this.emailService = emailService;
}

Expand All @@ -50,85 +51,34 @@ public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogKbartReposit
* @param message le message kafka
* @throws IOException exception levée
*/
@KafkaListener(topics = {"${topic.name.source.error}"}, groupId = "${topic.groupid.source}", containerFactory = "kafkaLogsListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.error}"}, groupId = "${topic.groupid.source}", containerFactory = "kafkaLogsListenerContainerFactory", concurrency = "${abes.kafka.concurrency.nbThread}")
public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, String> message) throws IOException {
LogKbartDto dto = mapper.readValue(message.value(), LogKbartDto.class);
LogKbart logKbart = logsMapper.map(dto, LogKbart.class);

String[] listMessage = message.key().split(";");
log.debug(Arrays.toString(listMessage));
// recuperation de l'heure a laquelle le message a ete envoye
String[] key = message.key().split(";");

Timestamp currentTimestamp = new Timestamp(message.timestamp());
logKbart.setTimestamp(new Date(currentTimestamp.getTime()));
logKbart.setPackageName(listMessage[0]);
String nbLineOrigine = (listMessage.length > 1) ? listMessage[1] : "";
logKbart.setNbLine(Integer.parseInt((nbLineOrigine.isEmpty() ? "-1" : nbLineOrigine) ));
logKbart.setPackageName(key[0]);
logKbart.setNbLine(Integer.parseInt(((key.length > 1) ? key[1] : "-1") ));

Integer nbRun = service.getLastNbRun(logKbart.getPackageName());
if(logKbart.getMessage().contains("Debut envois kafka de :")){
nbRun++;
}
logKbart.setNbRun(nbRun);

logKbart.log();
// Inscrit l'entity en BDD
service.save(logKbart);

// Vérifie qu'un fichier portant le nom du kbart en cours existe
if (!logKbart.getPackageName().contains("ctx:package") && !logKbart.getPackageName().contains("_FORCE")) {

Path tempPath = Path.of("tempLogLocal");
if(!Files.exists(tempPath)) {
Files.createDirectory(tempPath);
}
Path pathOfBadLocal = Path.of("tempLogLocal" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad"));

// Si la ligne de log sur le topic est de type ERROR
if (logKbart.getLevel().toString().equals("ERROR")) {

// vérifie la présence de fichiers obsolètes dans le répertoire tempLogLocal et les supprime le cas échéant
deleteOldLocalTempLog();

String line = nbLineOrigine + "\t" + logKbart.getMessage();

if (Files.exists(pathOfBadLocal)) {
// Inscrit la ligne dedans
Files.write(pathOfBadLocal, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
} else if (!Files.exists(pathOfBadLocal)) {
try {
// Créer le fichier et inscrit la ligne dedans
Files.createFile(pathOfBadLocal);
// Créer la ligne d'en-tête
Files.write(pathOfBadLocal, ("LINE\tMESSAGE" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
// Inscrit les informations sur la ligne
Files.write(pathOfBadLocal, (line + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
log.info("Fichier temporaire créé.");
} catch (SecurityException | IOException e) {
log.error("Erreur lors de la création du fichier temporaire. " + e);
throw new RuntimeException(e);
}
}
} else if (logKbart.getLevel().toString().equals("INFO")) {
// On verifie que le traitement commence pour supp les anciens logs du .bad (ps message venant de kbart2kafka)
if (logKbart.getMessage().contains("Debut envois kafka de : " + logKbart.getPackageName())){
Files.deleteIfExists(pathOfBadLocal);
// On verifie que le traitement est terminé (ps message venant de best-ppn-api ou kbart2kafka)
}else if( (logKbart.getMessage().contains("Traitement terminé pour fichier " + logKbart.getPackageName())) || (logKbart.getMessage().contains("Traitement refusé du fichier " + logKbart.getPackageName())) ) {
// Envoi du mail uniquement si le fichier temporaire a été créé
if (Files.exists(pathOfBadLocal)) {
Path tempPathTarget = Path.of("tempLog");
if (!Files.exists(tempPathTarget)) {
Files.createDirectory(tempPathTarget);
}
// Copie le fichier existant vers le répertoire temporaire
Path pathOfBadFinal = Path.of("tempLog" + File.separator + logKbart.getPackageName().replace(".tsv", ".bad"));
// Déplacement du fichier
Files.copy(pathOfBadLocal, pathOfBadFinal, StandardCopyOption.REPLACE_EXISTING);
log.info("Fichier de log transféré dans le dossier temporaire.");

// Suppression du .log car Useless si cas là
Path pathOfLog = Path.of("tempLog" + File.separator + logKbart.getPackageName().replace(".tsv", ".log"));
Files.deleteIfExists(pathOfLog);

emailService.sendMailWithAttachment(logKbart.getPackageName(), pathOfBadLocal);
}
}
if( (logKbart.getMessage().contains("Traitement terminé pour fichier " + logKbart.getPackageName())) || (logKbart.getMessage().contains("Traitement refusé du fichier " + logKbart.getPackageName())) ) {
createFileBad(logKbart.getPackageName(),nbRun);
}
}
// Inscrit l'entity en BDD
repository.save(logKbart);
}

public void deleteOldLocalTempLog() throws IOException {
Expand All @@ -142,12 +92,65 @@ public void deleteOldLocalTempLog() throws IOException {
Date dateOfLastModification = new Date(basicFileAttributes.lastModifiedTime().toMillis());
Date dateNow = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant());
long interval = dateNow.getTime() - dateOfLastModification.getTime();
if (interval > 600000) {
Files.deleteIfExists(fileToCheck.toPath());
log.debug("Fichier obsolète supprimé : " + nameFile);
if (interval > 600000 && Files.deleteIfExists(fileToCheck.toPath())) {
log.debug("Fichier obsolète supprimé : {}", nameFile);
}
}
}
}
}

private void createFileBad(String filename, Integer nbRun) throws IOException {
List<LogKbart> logKbartList = service.getErrorLogKbartByPackageAndNbRun(filename,nbRun);
Path tempPath = Path.of("tempLogLocal");
if(!Files.exists(tempPath)) {
Files.createDirectory(tempPath);
}
Path pathOfBadLocal = Path.of("tempLogLocal" + File.separator + filename.replace(".tsv", ".bad"));
// vérifie la présence de fichiers obsolètes dans le répertoire tempLogLocal et les supprime le cas échéant
deleteOldLocalTempLog();

logKbartList.forEach(logKbart -> {
try {
if (Files.exists(pathOfBadLocal)) {
// Inscrit la ligne dedans
Files.write(pathOfBadLocal, (logKbart.getNbLine() + "\t" + logKbart.getMessage() + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
} else {
// Créer le fichier et inscrit la ligne dedans
Files.createFile(pathOfBadLocal);
// Créer la ligne d'en-tête
Files.write(pathOfBadLocal, ("LINE\tMESSAGE\t(" + nbRun + ")" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
// Inscrit les informations sur la ligne
Files.write(pathOfBadLocal, (logKbart.getNbLine() + "\t" + logKbart.getMessage() + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
log.info("Fichier temporaire créé.");
}
} catch (IOException e) {
log.error("Erreur lors de la création du fichier temporaire. " + e);
throw new RuntimeException(e);
}
});

if (Files.exists(pathOfBadLocal)) {
Path tempPathTarget = Path.of("tempLog");
if (!Files.exists(tempPathTarget)) {
Files.createDirectory(tempPathTarget);
}
// Copie le fichier existant vers le répertoire temporaire
Path pathOfBadFinal = Path.of("tempLog" + File.separator + filename.replace(".tsv", ".bad"));
// Déplacement du fichier
Files.copy(pathOfBadLocal, pathOfBadFinal, StandardCopyOption.REPLACE_EXISTING);
log.info("Fichier de log transféré dans le dossier temporaire.");

// Suppression du .log car Useless si cas là
Path pathOfLog = Path.of("tempLog" + File.separator + filename.replace(".tsv", ".log"));
log.info("Suppression de " + pathOfLog);
Files.deleteIfExists(pathOfLog);

emailService.sendMailWithAttachment(filename, pathOfBadLocal);

log.info("Suppression de " + pathOfBadLocal + " en local");
Files.deleteIfExists(pathOfBadLocal);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

import fr.abes.logskbart.configuration.LogsBdConfiguration;
import fr.abes.logskbart.entity.LogKbart;
import fr.abes.logskbart.utils.Level;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.Date;
import java.util.List;
import java.util.Optional;

@Repository
@LogsBdConfiguration
public interface LogKbartRepository extends JpaRepository<LogKbart, Long> {
List<LogKbart> findAllByPackageNameAndTimestampBetweenOrderByNbLineAscTimestampAsc(String filename, Date debut, Date fin);

List<LogKbart> findAllByPackageNameAndNbRunAndLevelOrderByNbLineAscTimestampAsc(String filename, Integer nbRun, Level level);

Optional<LogKbart> getFirstByPackageNameOrderByNbRunDesc(String filename);
}
19 changes: 19 additions & 0 deletions src/main/java/fr/abes/logskbart/service/LogsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import fr.abes.logskbart.entity.LogKbart;
import fr.abes.logskbart.repository.LogKbartRepository;
import fr.abes.logskbart.utils.Level;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Optional;

@Service
@Slf4j
Expand All @@ -26,4 +28,21 @@ public List<LogKbart> getLogKbartForPackage(String packageName, Date date) {
log.debug("packageName {}, Date début {}, Date fin {}", packageName, dateChargement.getTime(), dateFin.getTime());
return repository.findAllByPackageNameAndTimestampBetweenOrderByNbLineAscTimestampAsc(packageName, dateChargement.getTime(), dateFin.getTime());
}

public LogKbart save(LogKbart logKbart) {
return repository.save(logKbart);
}

public Integer getLastNbRun(String packageName) {
Optional<LogKbart> logKbart = repository.getFirstByPackageNameOrderByNbRunDesc(packageName);
if(logKbart.isPresent()) {
return logKbart.get().getNbRun();
} else {
return 0;
}
}

public List<LogKbart> getErrorLogKbartByPackageAndNbRun(String packageName, Integer nbRun) {
return repository.findAllByPackageNameAndNbRunAndLevelOrderByNbLineAscTimestampAsc(packageName,nbRun, Level.ERROR);
}
}
3 changes: 2 additions & 1 deletion src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Consumer properties
spring.kafka.consumer.bootstrap-servers=
abes.kafka.concurrency.nbThread=

# Properties defined from .env on server #ignore resolution error
spring.datasource.logsdb.driver-class-name=org.postgresql.Driver
Expand All @@ -22,4 +23,4 @@ topic.groupid.source=logskbartConsumer

# Mailing
mail.ws.url=
mail.ws.recipient=
mail.ws.recipient=
3 changes: 2 additions & 1 deletion src/main/resources/application-prod.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Consumer properties
spring.kafka.consumer.bootstrap-servers=
abes.kafka.concurrency.nbThread=

# Base Postgres
# Properties defined from .env on server #ignore resolution error
Expand All @@ -24,4 +25,4 @@ topic.groupid.source=logskbartConsumer

# Mailing
mail.ws.url=
mail.ws.recipient=
mail.ws.recipient=
3 changes: 2 additions & 1 deletion src/main/resources/application-test.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Consumer properties
spring.kafka.consumer.bootstrap-servers=
abes.kafka.concurrency.nbThread=

# Properties defined from .env on server #ignore resolution error
spring.datasource.logsdb.driver-class-name=org.postgresql.Driver
Expand All @@ -22,4 +23,4 @@ topic.groupid.source=logskbartConsumer

# Mailing
mail.ws.url=
mail.ws.recipient=
mail.ws.recipient=

0 comments on commit a29c3b2

Please sign in to comment.