Skip to content

Commit

Permalink
FIX : envoi des messages à ES
Browse files Browse the repository at this point in the history
Ajout entry point pour créer l'index au démarrage du container s'il n'existe pas
  • Loading branch information
pierre-maraval committed Oct 22, 2024
1 parent 9c13966 commit 9ced317
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 70 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ WORKDIR /app/
COPY --from=build-image /build/target/*.jar /app/logskbart-api.jar
ENV TZ=Europe/Paris
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENTRYPOINT ["java","-jar","/app/logskbart-api.jar"]
COPY ./docker/docker-entrypoint.sh /docker-entrypoint.sh
RUN chmod +x /docker-entrypoint.sh
ENTRYPOINT ["/docker-entrypoint.sh"]
53 changes: 53 additions & 0 deletions docker/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

export SPRING_ELASTICSEARCH_URIS=${SPRING_ELASTICSEARCH_URIS:='http://localhost:9200'}

curl -X PUT "$SPRING_ELASTICSEARCH_URIS/logkbart" -H 'Content-Type: application/json' -d'
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"ID": {
"type": "text"
},
"PACKAGE_NAME": {
"type": "text"
},
"TIMESTAMP": {
"type": "date"
},
"THREAD": {
"type": "text"
},
"LEVEL": {
"type": "text"
},
"LOGGER_NAME": {
"type": "text"
},
"MESSAGE": {
"type": "text"
},
"END_OF_BATCH": {
"type": "boolean"
},
"LOGGER_FQCN": {
"type": "text"
},
"THREAD_ID": {
"type": "integer"
},
"THREAD_PRIORITY": {
"type": "integer"
},
"NB_LINE": {
"type": "integer"
}
}
}
}'

java -jar /app/logskbart-api.jar
18 changes: 17 additions & 1 deletion src/main/java/fr/abes/logskbart/configuration/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${abes.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${abes.nbThread}")
private int nbThread;


@Bean
Expand All @@ -34,7 +40,7 @@ public ConsumerFactory<String, String> consumerLogsFactory() {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
return new DefaultKafkaConsumerFactory<>(props);
}

Expand All @@ -51,4 +57,14 @@ public ConsumerFactory<String, String> consumerLogsFactory() {
public Map<String, WorkInProgress> workInProgressMap() {
return new ConcurrentHashMap<>();
}

@Bean
public Executor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(nbThread);
executor.setMaxPoolSize(nbThread);
executor.setQueueCapacity(500);
executor.initialize();
return executor;
}
}
15 changes: 10 additions & 5 deletions src/main/java/fr/abes/logskbart/entity/LogKbart.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import org.springframework.data.elasticsearch.annotations.Field;

import java.io.Serializable;
import java.util.Comparator;
import java.util.Date;
import java.util.Objects;

@Document(indexName = "logkbart")
@Data
@Slf4j
public class LogKbart implements Serializable {
public class LogKbart implements Serializable, Comparable<LogKbart> {
@Id
@Field(name = "ID")
private String id;
Expand Down Expand Up @@ -51,8 +53,6 @@ public class LogKbart implements Serializable {
@Field(name = "NB_LINE")
private Integer nbLine;

@Field(name = "NB_RUN")
private Integer nbRun = 0;

@Override
public String toString() {
Expand All @@ -64,12 +64,17 @@ public String toString() {
", message='" + message + '\'' +
", loggerFqcn='" + loggerFqcn + '\'' +
", nbLine='" + nbLine + '\'' +
", nbRun='" + nbRun + '\'' +

'}';
}

public void log(){
log.debug( this.level +" : " + this);
}

@Override
public int compareTo(LogKbart logKbart) {
if (!Objects.equals(this.nbLine, logKbart.getNbLine()))
return Integer.compare(this.nbLine, logKbart.getNbLine());
return this.timestamp.compareTo(logKbart.getTimestamp());
}
}
86 changes: 45 additions & 41 deletions src/main/java/fr/abes/logskbart/kafka/LogsListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import fr.abes.logskbart.utils.UtilsMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.io.File;
Expand All @@ -21,14 +23,18 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;


@Slf4j
@Service
public class LogsListener {
@Value("${elasticsearch.max-packet-size}")
private int maxPacketSize;

private final ObjectMapper mapper;

Expand All @@ -40,12 +46,15 @@ public class LogsListener {

private final Map<String, WorkInProgress> workInProgressMap;

public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogsService service, EmailService emailService, Map<String, WorkInProgress> workInProgressMap) {
private final Executor executor;

public LogsListener(ObjectMapper mapper, UtilsMapper logsMapper, LogsService service, EmailService emailService, Map<String, WorkInProgress> workInProgressMap, Executor executor) {
this.mapper = mapper;
this.logsMapper = logsMapper;
this.service = service;
this.emailService = emailService;
this.workInProgressMap = workInProgressMap;
this.executor = executor;
}


Expand All @@ -60,46 +69,38 @@ public void listenInfoKbart2KafkaAndErrorKbart2Kafka(ConsumerRecord<String, Stri
LogKbartDto dto = mapper.readValue(message.value(), LogKbartDto.class);
// recuperation de l'heure a laquelle le message a ete envoye
String[] key = message.key().split(";");
dto.setNbLine(Integer.parseInt(((key.length > 1) ? key[1] : "-1")));
String packageName = key[0];
if (!this.workInProgressMap.containsKey(packageName)) {
//nouveau fichier trouvé dans le topic, on initialise les variables partagées
log.debug("Nouveau package identifié : " + packageName);
workInProgressMap.put(packageName, new WorkInProgress());
}
workInProgressMap.get(packageName).addMessage(dto);
if (!packageName.contains("ctx:package") && !packageName.contains("_FORCE")) {
if (!packageName.equals("${ctx:package}")) {
if (!this.workInProgressMap.containsKey(packageName)) {
//nouveau fichier trouvé dans le topic, on initialise les variables partagées
log.debug("Nouveau package identifié : " + packageName);
workInProgressMap.put(packageName, new WorkInProgress());
}
LogKbart logKbart = logsMapper.map(dto, LogKbart.class);
logKbart.setPackageName(packageName);
logKbart.setTimestamp(new Date(message.timestamp()));
workInProgressMap.get(packageName).addMessage(logKbart);

if ((dto.getMessage().contains("Traitement terminé pour fichier " + packageName)) || (dto.getMessage().contains("Traitement refusé du fichier " + packageName))) {
log.debug("Commit les datas pour fichier " + packageName);
Integer nbLine = Integer.parseInt(((key.length > 1) ? key[1] : "-1"));
Integer nbRun = commitDatas(message.timestamp(), packageName, nbLine);
createFileBad(packageName, nbRun);
saveDatas(workInProgressMap.get(packageName).getMessages());
if (!packageName.contains("_FORCE")) {
createFileBad(packageName);
}
workInProgressMap.remove(packageName);
}
}
}

private Integer commitDatas(long timeStamp, String packageName, Integer nbLine) {
long startTime = System.currentTimeMillis();
log.debug("Debut Commit datas pour fichier " + packageName);
List<LogKbart> logskbart = logsMapper.mapList(workInProgressMap.get(packageName).getMessages(), LogKbart.class);
int nbRun = service.getLastNbRun(packageName) + 1;
log.debug("NbRun: " + nbRun);
saveDatas(timeStamp, packageName, nbLine, logskbart, nbRun);
log.debug("datas saved pour fichier " + packageName);
long endTime = System.currentTimeMillis();
double executionTime = (double) (endTime - startTime) / 1000;
log.debug("Execution time: " + executionTime);
return nbRun;
}

private void saveDatas(long timeStamp, String packageName, Integer nbLine, List<LogKbart> logskbart, int nbRun) {
logskbart.forEach(logKbart -> {
logKbart.setNbRun(nbRun);
logKbart.setTimestamp(new Date(timeStamp));
logKbart.setPackageName(packageName);
logKbart.setNbLine(nbLine);
});
service.saveAll(logskbart);
private void saveDatas(List<LogKbart> logskbart) {
//découpage de la liste en paquets de maxPacketSize pour sauvegarde dans ES pour éviter le timeout ou une erreur ES
IntStream.range(0, (logskbart.size() + maxPacketSize - 1) / maxPacketSize)
.mapToObj(i -> logskbart.subList(i * maxPacketSize, Math.min((i + 1) * maxPacketSize, logskbart.size())))
.toList().forEach(logskbartList -> executor.execute(() -> {
log.debug("Saving logskbart : {}", logskbartList.size());
service.saveAll(logskbartList);
}));
log.debug("Sortie de la sauvegarde");
}

public void deleteOldLocalTempLog() throws IOException {
Expand All @@ -121,8 +122,11 @@ public void deleteOldLocalTempLog() throws IOException {
}
}

private void createFileBad(String filename, Integer nbRun) throws IOException {
List<LogKbart> logKbartList = service.getErrorLogKbartByPackageAndNbRun(filename, nbRun);
private void createFileBad(String filename) throws IOException {
log.debug("Entrée dans createFileBad : {}", filename);
List<LogKbart> logskbartList = workInProgressMap.get(filename).getMessages().stream().filter(message -> message.getLevel().equals("ERROR")).sorted().toList();
log.debug("Taille liste : " + logskbartList.size());
//List<LogKbart> logKbartList = service.getErrorLogKbartByPackageAndNbRun(filename, nbRun);
Path tempPath = Path.of("tempLogLocal");
if (!Files.exists(tempPath)) {
Files.createDirectory(tempPath);
Expand All @@ -131,7 +135,7 @@ private void createFileBad(String filename, Integer nbRun) throws IOException {
// vérifie la présence de fichiers obsolètes dans le répertoire tempLogLocal et les supprime le cas échéant
deleteOldLocalTempLog();

logKbartList.forEach(logKbart -> {
logskbartList.forEach(logKbart -> {
try {
if (Files.exists(pathOfBadLocal)) {
// Inscrit la ligne dedans
Expand All @@ -140,7 +144,7 @@ private void createFileBad(String filename, Integer nbRun) throws IOException {
// Créer le fichier et inscrit la ligne dedans
Files.createFile(pathOfBadLocal);
// Créer la ligne d'en-tête
Files.write(pathOfBadLocal, ("LINE\tMESSAGE\t(" + nbRun + ")" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
Files.write(pathOfBadLocal, ("LINE\tMESSAGE\t" + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
// Inscrit les informations sur la ligne
Files.write(pathOfBadLocal, (logKbart.getNbLine() + "\t" + logKbart.getMessage() + System.lineSeparator()).getBytes(), StandardOpenOption.APPEND);
log.info("Fichier temporaire créé.");
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/fr/abes/logskbart/kafka/WorkInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
@Setter
@Slf4j
public class WorkInProgress {
private List<LogKbartDto> messages;
private List<LogKbart> messages;

private Timestamp timestamp;

public WorkInProgress() {
this.messages = Collections.synchronizedList(new ArrayList<>());
}

public void addMessage(LogKbartDto message) {
public void addMessage(LogKbart message) {
this.messages.add(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,5 @@
public interface LogKbartRepository extends ElasticsearchRepository<LogKbart, Long> {
List<LogKbart> findAllByPackageNameAndTimestampBetweenOrderByNbLineAscTimestampAsc(String filename, Date debut, Date fin);

List<LogKbart> findAllByPackageNameAndNbRunAndLevelOrderByNbLineAscTimestampAsc(String filename, Integer nbRun, String level);

List<LogKbart> findByPackageNameOrderByNbRunDesc(String filename);
}
18 changes: 3 additions & 15 deletions src/main/java/fr/abes/logskbart/service/LogsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@

import fr.abes.logskbart.entity.LogKbart;
import fr.abes.logskbart.repository.LogKbartRepository;
import fr.abes.logskbart.utils.Level;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Optional;

@Service
@Slf4j
public class LogsService {
private final LogKbartRepository repository;


public LogsService(LogKbartRepository repository) {
this.repository = repository;
}
Expand All @@ -32,19 +29,10 @@ public List<LogKbart> getLogKbartForPackage(String packageName, Date date) {

public void saveAll(List<LogKbart> logKbarts) {
repository.saveAll(logKbarts);
log.debug("Save done !");
}

public Integer getLastNbRun(String packageName) {
List<LogKbart> logskbart = repository.findByPackageNameOrderByNbRunDesc(packageName);
Optional<LogKbart> logKbart = logskbart.stream().findFirst();
if (logKbart.isPresent()) {
Integer nbRun = logKbart.get().getNbRun();
return (nbRun != null) ? nbRun : 0;
}
return 0;
}

public List<LogKbart> getErrorLogKbartByPackageAndNbRun(String packageName, Integer nbRun) {
return repository.findAllByPackageNameAndNbRunAndLevelOrderByNbLineAscTimestampAsc(packageName,nbRun, String.valueOf(Level.ERROR));
public void save(LogKbart logKbart) {
repository.save(logKbart);
}
}
4 changes: 1 addition & 3 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

# Configuration du projet (depuis pom.xml)
spring.profiles.active=@spring.profiles.active@
application.name=@project.artifactId@
application.version=@project.version@
application.basedir=@webBaseDir@

# Configuration du serveur Http
server.port=8082

logging.config=classpath:log4j2.xml
elasticsearch.max-packet-size=7500

# Topic Kafka
topic.name.source.error=bacon.logs.toload
Expand Down

0 comments on commit 9ced317

Please sign in to comment.