From 6438650b7cd852294f4f16b157803a0358077dba Mon Sep 17 00:00:00 2001 From: Ankit Singh <101859999+ankitmashu@users.noreply.github.com> Date: Tue, 24 Sep 2024 10:21:27 +0530 Subject: [PATCH] async csv file --- .../database/elastic/ElasticClient.java | 20 ++++++++-- .../database/elastic/EsResponseFormatter.java | 7 +++- .../elastic/EsResponseFormatterToCsv.java | 32 ++++++++++----- .../elastic/EsResponseFormatterToJson.java | 19 +++++++-- .../elastic/EsResponseFormatterToParquet.java | 10 ++++- .../server/database/elastic/JsonFlatten.java | 39 ++++++++----------- .../elastic/TestEsResponseFormatterToCsv.java | 6 +-- 7 files changed, 90 insertions(+), 43 deletions(-) diff --git a/src/main/java/iudx/resource/server/database/elastic/ElasticClient.java b/src/main/java/iudx/resource/server/database/elastic/ElasticClient.java index 727fc511e..18569ff23 100644 --- a/src/main/java/iudx/resource/server/database/elastic/ElasticClient.java +++ b/src/main/java/iudx/resource/server/database/elastic/ElasticClient.java @@ -24,7 +24,7 @@ import iudx.resource.server.database.archives.ResponseBuilder; import iudx.resource.server.database.async.ProgressListener; import java.io.File; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -101,10 +101,18 @@ public Future asyncScroll( LOGGER.debug(file.getAbsolutePath()); int totaldocsDownloaded = 0; - instance.write(searchHits); + Set headers = null; + + if (format.equalsIgnoreCase("json")) { + instance.write(searchHits); + } else { + headers = instance.writeToCsv(searchHits); + } + int totalIterations = totalHits < 10000 ? 1 : (int) Math.ceil(totalHits / 10000.0); double iterationCount = 0.0; double progress; + boolean appendComma = false; while (searchHits != null && searchHits.size() > 0) { long downloadedDocs = searchHits.size(); totaldocsDownloaded += downloadedDocs; @@ -117,7 +125,12 @@ public Future asyncScroll( // external (s3) double finalProgress = progress * 0.9; Future.future(handler -> progressListener.updateProgress(finalProgress)); - instance.append(searchHits); + + if (format.equalsIgnoreCase("json")) { + instance.append(searchHits, appendComma); + } else { + instance.append(searchHits, appendComma, headers); + } ScrollRequest scrollRequest = nextScrollRequest(scrollId); CompletableFuture> future = @@ -125,6 +138,7 @@ public Future asyncScroll( ScrollResponse scrollResponse = future.get(); scrollId = scrollResponse.scrollId(); searchHits = scrollResponse.hits().hits(); + appendComma = true; } instance.finish(); diff --git a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatter.java b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatter.java index d2c272f5b..5727f21fa 100644 --- a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatter.java +++ b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatter.java @@ -3,11 +3,16 @@ import co.elastic.clients.elasticsearch.core.search.Hit; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.List; +import java.util.Set; public interface EsResponseFormatter { void write(List> searchHits); + Set writeToCsv(List> searchHits); + void finish(); - void append(List> searchHits); + void append(List> searchHits, boolean appendComma); + + void append(List> searchHits, boolean appendComma, Set headers); } diff --git a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToCsv.java b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToCsv.java index 0bf06060f..f77c51fd4 100644 --- a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToCsv.java +++ b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToCsv.java @@ -36,12 +36,13 @@ public EsResponseFormatterToCsv(File file) { * * @param searchHits ElasticSearch response searchHits */ - public void flattenRecord(List> searchHits) { + public void flattenRecord(List> searchHits, Set headers) { for (Hit hit : searchHits) { jsonFlatten = new JsonFlatten((JsonNode) hit.source()); map = jsonFlatten.flatten(); - Set header = map.keySet(); - appendToCsvFile(map, header); + /*Set header = map.keySet();*/ + /*appendToCsvFile(map, header);*/ + appendToCsvFile(map, headers); } } @@ -50,7 +51,7 @@ public void flattenRecord(List> searchHits) { * * @param searchHits Elastic search scroll response */ - public void getHeader(List> searchHits) { + public Set getHeader(List> searchHits) { Hit firstHit = searchHits.get(0); if (jsonFlatten == null) { jsonFlatten = new JsonFlatten((JsonNode) firstHit.source()); @@ -58,6 +59,7 @@ public void getHeader(List> searchHits) { map = jsonFlatten.flatten(); Set header = map.keySet(); simpleFileWriter(header); + return header; } /** @@ -73,9 +75,13 @@ public void appendToCsvFile(Map map, Set header) { for (String field : header) { Object cell = map.get(field); if (cell == null) { - stringBuilder.append("" + ","); + stringBuilder.append("").append(","); } else { - stringBuilder.append(cell + ","); + String cellValue = cell.toString(); + if (cellValue.contains(",") || cellValue.contains("\"")) { + cellValue = "\"" + cellValue.replace("\"", "\"\"") + "\""; + } + stringBuilder.append(cellValue).append(","); } } @@ -106,8 +112,11 @@ private void simpleFileWriter(Set header) { } @Override - public void write(List> searchHits) { - this.getHeader(searchHits); + public void write(List> searchHits) {} + + @Override + public Set writeToCsv(List> searchHits) { + return this.getHeader(searchHits); } @Override @@ -120,7 +129,10 @@ public void finish() { } @Override - public void append(List> searchHits) { - this.flattenRecord(searchHits); + public void append(List> searchHits, boolean appendComma) {} + + @Override + public void append(List> searchHits, boolean appendComma, Set headers) { + this.flattenRecord(searchHits, headers); } } diff --git a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java index afd9f7e63..670e4ae97 100644 --- a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java +++ b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToJson.java @@ -6,8 +6,12 @@ import java.io.FileWriter; import java.io.IOException; import java.util.List; +import java.util.Set; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class EsResponseFormatterToJson extends AbstractEsSearchResponseFormatter { + private static final Logger LOGGER = LogManager.getLogger(EsResponseFormatterToJson.class); FileWriter fileWriter; /** @@ -33,6 +37,11 @@ public void write(List> searchHits) { } } + @Override + public Set writeToCsv(List> searchHits) { + return null; + } + @Override public void finish() { try { @@ -44,12 +53,13 @@ public void finish() { } @Override - public void append(List> searchHits) { + public void append(List> searchHits, boolean appendComma) { try { - boolean appendComma = false; for (Hit sh : searchHits) { + assert sh.source() != null; if (appendComma) { - fileWriter.write("," + sh.source().toString()); + fileWriter.write(",\n"); + fileWriter.write(String.valueOf(sh.source())); } else { fileWriter.write(sh.source().toString()); } @@ -60,4 +70,7 @@ public void append(List> searchHits) { throw new RuntimeException(e); } } + + @Override + public void append(List> searchHits, boolean appendComma, Set headers) {} } diff --git a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToParquet.java b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToParquet.java index d9285acaf..b757ced17 100644 --- a/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToParquet.java +++ b/src/main/java/iudx/resource/server/database/elastic/EsResponseFormatterToParquet.java @@ -30,11 +30,19 @@ public void write(List> searchHits) { // ParquetWriter parquetWriter = this.getParquetWriter(file, messageTypeParquetSchema); } + @Override + public Set writeToCsv(List> searchHits) { + return null; + } + @Override public void finish() {} @Override - public void append(List> searchHits) {} + public void append(List> searchHits, boolean appendComma) {} + + @Override + public void append(List> searchHits, boolean appendComma, Set headers) {} /* private SchemaMapping getParquetSchema(Schema arrowSchema) { SchemaConverter schemaConverter = new SchemaConverter(); diff --git a/src/main/java/iudx/resource/server/database/elastic/JsonFlatten.java b/src/main/java/iudx/resource/server/database/elastic/JsonFlatten.java index 0112b3912..50f78ed51 100644 --- a/src/main/java/iudx/resource/server/database/elastic/JsonFlatten.java +++ b/src/main/java/iudx/resource/server/database/elastic/JsonFlatten.java @@ -4,16 +4,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ValueNode; -import com.github.sisyphsu.dateparser.DateParserUtils; -import java.sql.Timestamp; -import java.time.LocalDateTime; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; public class JsonFlatten { - private final Map json = new LinkedHashMap<>(); private final LinkedHashMap jsonObj = new LinkedHashMap<>(); private final JsonNode root; @@ -23,6 +19,10 @@ public JsonFlatten(JsonNode node) { } public static void flattenJson(JsonNode node, String parent, Map map) { + + if (node == null) { + return; + } if (node instanceof ValueNode) { map.put(parent, (ValueNode) node); } else { @@ -39,30 +39,25 @@ public static void flattenJson(JsonNode node, String parent, Map flatten() { flattenJson(root, null, json); - for (String key : json.keySet()) { - if (json.get(key).isInt()) { - jsonObj.put(key, json.get(key).asInt()); - } - if (json.get(key).isLong()) { - jsonObj.put(key, json.get(key).asLong()); - } - if (json.get(key).isFloat()) { - jsonObj.put(key, json.get(key).asDouble()); - } - if (json.get(key).isDouble()) { - jsonObj.put(key, json.get(key).asDouble()); - } - if (json.get(key).isBoolean()) { - jsonObj.put(key, json.get(key).asBoolean()); - } - if (json.get(key).isTextual()) { + for (Map.Entry entry : json.entrySet()) { + String key = entry.getKey(); + ValueNode valueNode = entry.getValue(); + if (valueNode.isInt()) { + jsonObj.put(key, valueNode.asInt()); + } else if (valueNode.isLong()) { + jsonObj.put(key, valueNode.asLong()); + } else if (valueNode.isFloat() || valueNode.isDouble()) { + jsonObj.put(key, valueNode.asDouble()); + } else if (valueNode.isBoolean()) { + jsonObj.put(key, valueNode.asBoolean()); + } else if (json.get(key).isTextual()) { jsonObj.put(key, json.get(key).asText()); } } diff --git a/src/test/java/iudx/resource/server/database/elastic/TestEsResponseFormatterToCsv.java b/src/test/java/iudx/resource/server/database/elastic/TestEsResponseFormatterToCsv.java index 90deb904c..51788db77 100644 --- a/src/test/java/iudx/resource/server/database/elastic/TestEsResponseFormatterToCsv.java +++ b/src/test/java/iudx/resource/server/database/elastic/TestEsResponseFormatterToCsv.java @@ -67,7 +67,7 @@ public void setUp(VertxTestContext vertxTestContext) @DisplayName("Test method : Success") public void testWriteMethod(VertxTestContext vertxTestContext) { - responseFormatterToCsv.write(searchHits); + responseFormatterToCsv.writeToCsv(searchHits); verify(searchHits, times(1)).get(anyInt()); verify(jsonFlatten,times(1)).flatten(); verify(stringObjectMap,times(1)).keySet(); @@ -76,7 +76,7 @@ public void testWriteMethod(VertxTestContext vertxTestContext) @Test @DisplayName("Test method : Failure") public void testWriteMethodFailure(VertxTestContext vertxTestContext) throws IOException { - responseFormatterToCsv.write(searchHits); + responseFormatterToCsv.writeToCsv(searchHits); responseFormatterToCsv.fileWriter = mock(fileWriter.getClass()); doThrow(new IOException()).when(responseFormatterToCsv.fileWriter).write(anyString()); @@ -84,7 +84,7 @@ public void testWriteMethodFailure(VertxTestContext vertxTestContext) throws IOE verify(jsonFlatten,times(1)).flatten(); verify(stringObjectMap,times(1)).keySet(); - assertThrows(RuntimeException.class,()-> responseFormatterToCsv.write(searchHits)); + assertThrows(RuntimeException.class,()-> responseFormatterToCsv.writeToCsv(searchHits)); vertxTestContext.completeNow(); }