Skip to content

Commit

Permalink
Write row group fileOffset in parquet file footer
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyangli34 authored and raunaqmorarka committed Jan 3, 2025
1 parent 358633f commit 8bf43b5
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.parquet.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -138,6 +139,12 @@ public List<BlockMetadata> getBlocks()
return blocks;
}

@VisibleForTesting
public FileMetaData getParquetMetadata()
{
return parquetMetadata;
}

private static MessageType readParquetSchema(List<SchemaElement> schema)
{
Iterator<SchemaElement> schemaIterator = schema.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private void flush()
columnMetaDataBuilder.add(columnMetaData);
currentOffset += columnMetaData.getTotal_compressed_size();
}
updateRowGroups(columnMetaDataBuilder.build());
updateRowGroups(columnMetaDataBuilder.build(), outputStream.longSize());

// flush pages
for (BufferData bufferData : bufferDataList) {
Expand Down Expand Up @@ -409,12 +409,14 @@ private void writeBloomFilters(List<RowGroup> rowGroups, List<List<Optional<Bloo
}
}

private void updateRowGroups(List<ColumnMetaData> columnMetaData)
private void updateRowGroups(List<ColumnMetaData> columnMetaData, long fileOffset)
{
long totalCompressedBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_compressed_size).sum();
long totalBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_uncompressed_size).sum();
ImmutableList<org.apache.parquet.format.ColumnChunk> columnChunks = columnMetaData.stream().map(ParquetWriter::toColumnChunk).collect(toImmutableList());
fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows).setTotal_compressed_size(totalCompressedBytes));
fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows)
.setTotal_compressed_size(totalCompressedBytes)
.setFile_offset(fileOffset));
}

private static Slice serializeFooter(FileMetaData fileMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.RowGroup;
import org.apache.parquet.format.Util;
import org.apache.parquet.schema.PrimitiveType;
import org.assertj.core.data.Percentage;
Expand Down Expand Up @@ -379,6 +380,38 @@ public void testDictionaryPageOffset()
}
}

@Test
void testRowGroupOffset()
throws IOException
{
// Write a file with 100 rows per row-group
List<String> columnNames = ImmutableList.of("columnA", "columnB");
List<Type> types = ImmutableList.of(INTEGER, BIGINT);

ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder()
.setMaxBlockSize(DataSize.ofBytes(1000))
.build(),
types,
columnNames,
generateInputPages(types, 100, 10)),
new ParquetReaderOptions());

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
List<BlockMetadata> blocks = parquetMetadata.getBlocks();
assertThat(blocks.size()).isGreaterThan(1);

List<RowGroup> rowGroups = parquetMetadata.getParquetMetadata().getRow_groups();
assertThat(rowGroups.size()).isEqualTo(blocks.size());
for (int rowGroupIndex = 0; rowGroupIndex < rowGroups.size(); rowGroupIndex++) {
RowGroup rowGroup = rowGroups.get(rowGroupIndex);
assertThat(rowGroup.isSetFile_offset()).isTrue();
BlockMetadata blockMetadata = blocks.get(rowGroupIndex);
assertThat(blockMetadata.getStartingPos()).isEqualTo(rowGroup.getFile_offset());
}
}

@ParameterizedTest
@MethodSource("testWriteBloomFiltersParams")
public void testWriteBloomFilters(Type type, List<?> data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ public void testCacheFileOperations()
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000002.json", 0, 658))
.add(new CacheOperation("InputFile.length", "00000000000000000003.json"))
.add(new CacheOperation("InputFile.newStream", "_last_checkpoint"))
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227))
.add(new CacheOperation("Input.readFully", "key=p1/", 0, 227))
.add(new CacheOperation("Input.readFully", "key=p2/", 0, 227))
.add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 227))
.add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229))
.add(new CacheOperation("Input.readFully", "key=p1/", 0, 229))
.add(new CacheOperation("Input.readFully", "key=p2/", 0, 229))
.add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 229))
.add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 229))
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
Expand All @@ -113,8 +113,8 @@ public void testCacheFileOperations()
.add(new CacheOperation("InputFile.length", "00000000000000000002.json"))
.add(new CacheOperation("InputFile.length", "00000000000000000003.json"))
.add(new CacheOperation("InputFile.newStream", "_last_checkpoint"))
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229))
.build());
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p3', '3-xyz')", 1);
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p4', '4-xyz')", 1);
Expand All @@ -139,17 +139,17 @@ public void testCacheFileOperations()
.add(new CacheOperation("InputFile.length", "00000000000000000005.json"))
.add(new CacheOperation("InputFile.length", "00000000000000000006.json"))
.add(new CacheOperation("InputFile.newStream", "_last_checkpoint"))
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 227))
.add(new CacheOperation("Input.readFully", "key=p3/", 0, 227))
.add(new CacheOperation("Input.readFully", "key=p4/", 0, 227))
.add(new CacheOperation("Input.readFully", "key=p5/", 0, 227))
.add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 227))
.add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 227))
.add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 227))
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229))
.add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 229))
.add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 229))
.add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 229))
.add(new CacheOperation("Input.readFully", "key=p3/", 0, 229))
.add(new CacheOperation("Input.readFully", "key=p4/", 0, 229))
.add(new CacheOperation("Input.readFully", "key=p5/", 0, 229))
.add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 229))
.add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 229))
.add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 229))
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
Expand All @@ -168,11 +168,11 @@ public void testCacheFileOperations()
.add(new CacheOperation("InputFile.length", "00000000000000000005.json"))
.add(new CacheOperation("InputFile.length", "00000000000000000006.json"))
.add(new CacheOperation("InputFile.newStream", "_last_checkpoint"))
.addCopies(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 227), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 227), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 227), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 229), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 229), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 229), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 229), 1)
.addCopies(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 229), 1)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public void testTableDataCachedWhileTransactionLogNotCached()
.addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2)
.add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json"))
.add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint"))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p1/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p2/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p1/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p2/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 229))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 229))
.add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p1/", 0, 229))
.add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p2/", 0, 229))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p1/", 0, 229))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p2/", 0, 229))
.build());
assertFileSystemAccesses(
"SELECT * FROM test_transaction_log_not_cached",
Expand All @@ -93,8 +93,8 @@ public void testTableDataCachedWhileTransactionLogNotCached()
.addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2)
.add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json"))
.add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint"))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 227))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 229))
.add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 229))
.build());
}

Expand Down

0 comments on commit 8bf43b5

Please sign in to comment.