Skip to content

Commit

Permalink
Merge pull request datakaveri#544 from ankitmashu/fix/async_csv
Browse files Browse the repository at this point in the history
async csv file
  • Loading branch information
gopal-mahajan authored Sep 24, 2024
2 parents 8b5cdb2 + 6438650 commit 3b93196
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import iudx.resource.server.database.archives.ResponseBuilder;
import iudx.resource.server.database.async.ProgressListener;
import java.io.File;
import java.util.List;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
Expand Down Expand Up @@ -101,10 +101,18 @@ public Future<JsonObject> asyncScroll(
LOGGER.debug(file.getAbsolutePath());

int totaldocsDownloaded = 0;
instance.write(searchHits);
Set<String> headers = null;

if (format.equalsIgnoreCase("json")) {
instance.write(searchHits);
} else {
headers = instance.writeToCsv(searchHits);
}

int totalIterations = totalHits < 10000 ? 1 : (int) Math.ceil(totalHits / 10000.0);
double iterationCount = 0.0;
double progress;
boolean appendComma = false;
while (searchHits != null && searchHits.size() > 0) {
long downloadedDocs = searchHits.size();
totaldocsDownloaded += downloadedDocs;
Expand All @@ -117,14 +125,20 @@ public Future<JsonObject> asyncScroll(
// external (s3)
double finalProgress = progress * 0.9;
Future.future(handler -> progressListener.updateProgress(finalProgress));
instance.append(searchHits);

if (format.equalsIgnoreCase("json")) {
instance.append(searchHits, appendComma);
} else {
instance.append(searchHits, appendComma, headers);
}

ScrollRequest scrollRequest = nextScrollRequest(scrollId);
CompletableFuture<ScrollResponse<ObjectNode>> future =
asyncClient.scroll(scrollRequest, ObjectNode.class);
ScrollResponse<ObjectNode> scrollResponse = future.get();
scrollId = scrollResponse.scrollId();
searchHits = scrollResponse.hits().hits();
appendComma = true;
}

instance.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.List;
import java.util.Set;

public interface EsResponseFormatter {
void write(List<Hit<ObjectNode>> searchHits);

Set<String> writeToCsv(List<Hit<ObjectNode>> searchHits);

void finish();

void append(List<Hit<ObjectNode>> searchHits);
void append(List<Hit<ObjectNode>> searchHits, boolean appendComma);

void append(List<Hit<ObjectNode>> searchHits, boolean appendComma, Set<String> headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public EsResponseFormatterToCsv(File file) {
*
* @param searchHits ElasticSearch response searchHits
*/
public void flattenRecord(List<Hit<ObjectNode>> searchHits) {
public void flattenRecord(List<Hit<ObjectNode>> searchHits, Set<String> headers) {
for (Hit hit : searchHits) {
jsonFlatten = new JsonFlatten((JsonNode) hit.source());
map = jsonFlatten.flatten();
Set<String> header = map.keySet();
appendToCsvFile(map, header);
/*Set<String> header = map.keySet();*/
/*appendToCsvFile(map, header);*/
appendToCsvFile(map, headers);
}
}

Expand All @@ -50,14 +51,15 @@ public void flattenRecord(List<Hit<ObjectNode>> searchHits) {
*
* @param searchHits Elastic search scroll response
*/
public void getHeader(List<Hit<ObjectNode>> searchHits) {
public Set<String> getHeader(List<Hit<ObjectNode>> searchHits) {
Hit<ObjectNode> firstHit = searchHits.get(0);
if (jsonFlatten == null) {
jsonFlatten = new JsonFlatten((JsonNode) firstHit.source());
}
map = jsonFlatten.flatten();
Set<String> header = map.keySet();
simpleFileWriter(header);
return header;
}

/**
Expand All @@ -73,9 +75,13 @@ public void appendToCsvFile(Map<String, Object> map, Set<String> header) {
for (String field : header) {
Object cell = map.get(field);
if (cell == null) {
stringBuilder.append("" + ",");
stringBuilder.append("").append(",");
} else {
stringBuilder.append(cell + ",");
String cellValue = cell.toString();
if (cellValue.contains(",") || cellValue.contains("\"")) {
cellValue = "\"" + cellValue.replace("\"", "\"\"") + "\"";
}
stringBuilder.append(cellValue).append(",");
}
}

Expand Down Expand Up @@ -106,8 +112,11 @@ private void simpleFileWriter(Set<String> header) {
}

@Override
public void write(List<Hit<ObjectNode>> searchHits) {
this.getHeader(searchHits);
public void write(List<Hit<ObjectNode>> searchHits) {}

@Override
public Set<String> writeToCsv(List<Hit<ObjectNode>> searchHits) {
return this.getHeader(searchHits);
}

@Override
Expand All @@ -120,7 +129,10 @@ public void finish() {
}

@Override
public void append(List<Hit<ObjectNode>> searchHits) {
this.flattenRecord(searchHits);
public void append(List<Hit<ObjectNode>> searchHits, boolean appendComma) {}

@Override
public void append(List<Hit<ObjectNode>> searchHits, boolean appendComma, Set<String> headers) {
this.flattenRecord(searchHits, headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class EsResponseFormatterToJson extends AbstractEsSearchResponseFormatter {
private static final Logger LOGGER = LogManager.getLogger(EsResponseFormatterToJson.class);
FileWriter fileWriter;

/**
Expand All @@ -33,6 +37,11 @@ public void write(List<Hit<ObjectNode>> searchHits) {
}
}

@Override
public Set<String> writeToCsv(List<Hit<ObjectNode>> searchHits) {
return null;
}

@Override
public void finish() {
try {
Expand All @@ -44,12 +53,13 @@ public void finish() {
}

@Override
public void append(List<Hit<ObjectNode>> searchHits) {
public void append(List<Hit<ObjectNode>> searchHits, boolean appendComma) {
try {
boolean appendComma = false;
for (Hit<ObjectNode> sh : searchHits) {
assert sh.source() != null;
if (appendComma) {
fileWriter.write("," + sh.source().toString());
fileWriter.write(",\n");
fileWriter.write(String.valueOf(sh.source()));
} else {
fileWriter.write(sh.source().toString());
}
Expand All @@ -60,4 +70,7 @@ public void append(List<Hit<ObjectNode>> searchHits) {
throw new RuntimeException(e);
}
}

@Override
public void append(List<Hit<ObjectNode>> searchHits, boolean appendComma, Set<String> headers) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ public void write(List<Hit<ObjectNode>> searchHits) {
// ParquetWriter parquetWriter = this.getParquetWriter(file, messageTypeParquetSchema);
}

@Override
public Set<String> writeToCsv(List<Hit<ObjectNode>> searchHits) {
return null;
}

@Override
public void finish() {}

@Override
public void append(List<Hit<ObjectNode>> searchHits) {}
public void append(List<Hit<ObjectNode>> searchHits, boolean appendComma) {}

@Override
public void append(List<Hit<ObjectNode>> searchHits, boolean appendComma, Set<String> headers) {}

/* private SchemaMapping getParquetSchema(Schema arrowSchema) {
SchemaConverter schemaConverter = new SchemaConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ValueNode;
import com.github.sisyphsu.dateparser.DateParserUtils;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;

public class JsonFlatten {

private final Map<String, ValueNode> json = new LinkedHashMap<>();
private final LinkedHashMap<String, Object> jsonObj = new LinkedHashMap<>();
private final JsonNode root;
Expand All @@ -23,6 +19,10 @@ public JsonFlatten(JsonNode node) {
}

public static void flattenJson(JsonNode node, String parent, Map<String, ValueNode> map) {

if (node == null) {
return;
}
if (node instanceof ValueNode) {
map.put(parent, (ValueNode) node);
} else {
Expand All @@ -39,30 +39,25 @@ public static void flattenJson(JsonNode node, String parent, Map<String, ValueNo
flattenJson(field.getValue(), prefix + field.getKey(), map);
}
} else {
throw new RuntimeException("unknown json node");
throw new RuntimeException("Unknown JSON node type: " + node.getNodeType());
}
}
}

public LinkedHashMap<String, Object> flatten() {
flattenJson(root, null, json);
for (String key : json.keySet()) {
if (json.get(key).isInt()) {
jsonObj.put(key, json.get(key).asInt());
}
if (json.get(key).isLong()) {
jsonObj.put(key, json.get(key).asLong());
}
if (json.get(key).isFloat()) {
jsonObj.put(key, json.get(key).asDouble());
}
if (json.get(key).isDouble()) {
jsonObj.put(key, json.get(key).asDouble());
}
if (json.get(key).isBoolean()) {
jsonObj.put(key, json.get(key).asBoolean());
}
if (json.get(key).isTextual()) {
for (Map.Entry<String, ValueNode> entry : json.entrySet()) {
String key = entry.getKey();
ValueNode valueNode = entry.getValue();
if (valueNode.isInt()) {
jsonObj.put(key, valueNode.asInt());
} else if (valueNode.isLong()) {
jsonObj.put(key, valueNode.asLong());
} else if (valueNode.isFloat() || valueNode.isDouble()) {
jsonObj.put(key, valueNode.asDouble());
} else if (valueNode.isBoolean()) {
jsonObj.put(key, valueNode.asBoolean());
} else if (json.get(key).isTextual()) {
jsonObj.put(key, json.get(key).asText());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void setUp(VertxTestContext vertxTestContext)
@DisplayName("Test method : Success")
public void testWriteMethod(VertxTestContext vertxTestContext)
{
responseFormatterToCsv.write(searchHits);
responseFormatterToCsv.writeToCsv(searchHits);
verify(searchHits, times(1)).get(anyInt());
verify(jsonFlatten,times(1)).flatten();
verify(stringObjectMap,times(1)).keySet();
Expand All @@ -76,15 +76,15 @@ public void testWriteMethod(VertxTestContext vertxTestContext)
@Test
@DisplayName("Test method : Failure")
public void testWriteMethodFailure(VertxTestContext vertxTestContext) throws IOException {
responseFormatterToCsv.write(searchHits);
responseFormatterToCsv.writeToCsv(searchHits);
responseFormatterToCsv.fileWriter = mock(fileWriter.getClass());
doThrow(new IOException()).when(responseFormatterToCsv.fileWriter).write(anyString());

verify(searchHits, times(1)).get(anyInt());
verify(jsonFlatten,times(1)).flatten();
verify(stringObjectMap,times(1)).keySet();

assertThrows(RuntimeException.class,()-> responseFormatterToCsv.write(searchHits));
assertThrows(RuntimeException.class,()-> responseFormatterToCsv.writeToCsv(searchHits));
vertxTestContext.completeNow();
}

Expand Down

0 comments on commit 3b93196

Please sign in to comment.