Skip to content

Commit

Permalink
Don't overwrite Configured projection in scio-smb (#5083)
Browse files Browse the repository at this point in the history
* Don't overwrite Configured projection in scio-smb

* test serialization
  • Loading branch information
clairemcginty authored Nov 20, 2023
1 parent 92b963a commit 5cd7392
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ public void prepareRead(ReadableByteChannel channel) throws IOException {
final Schema schema = schemaSupplier.get();
final Configuration configuration = conf.get();
AvroReadSupport.setAvroReadSchema(configuration, schema);
AvroReadSupport.setRequestedProjection(configuration, schema);

if (configuration.get(AvroReadSupport.AVRO_REQUESTED_PROJECTION) == null) {
AvroReadSupport.setRequestedProjection(configuration, schema);
}

ParquetReader.Builder<ValueT> builder =
AvroParquetReader.<ValueT>builder(new ParquetInputFile(channel)).withConf(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void testLogicalTypes() throws Exception {
}

@Test
public void testProjection() throws Exception {
public void testGenericProjection() throws Exception {
final ResourceId file =
fromFolder(output)
.resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
Expand All @@ -185,6 +185,59 @@ public void testProjection() throws Exception {
Assert.assertEquals(expected, actual);
}

@Test
public void testSpecificProjection() throws Exception {
final Schema projection =
Schema.createRecord(
"AvroGeneratedUserProjection",
"",
"org.apache.beam.sdk.extensions.smb",
false,
Lists.newArrayList(
new Schema.Field("name", Schema.create(Schema.Type.STRING), "", "")));
final Configuration configuration = new Configuration();
AvroReadSupport.setRequestedProjection(configuration, projection);

final ParquetAvroFileOperations<AvroGeneratedUser> fileOperations =
ParquetAvroFileOperations.of(
AvroGeneratedUser.class, CompressionCodecName.UNCOMPRESSED, configuration);

final ResourceId file =
fromFolder(output)
.resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE);

final List<AvroGeneratedUser> records =
IntStream.range(0, 10)
.mapToObj(
i ->
AvroGeneratedUser.newBuilder()
.setName(String.format("user%02d", i))
.setFavoriteColor(String.format("color%02d", i))
.setFavoriteNumber(i)
.build())
.collect(Collectors.toList());
final FileOperations.Writer<AvroGeneratedUser> writer = fileOperations.createWriter(file);
for (AvroGeneratedUser record : records) {
writer.write(record);
}
writer.close();

final List<AvroGeneratedUser> actual = new ArrayList<>();
fileOperations.iterator(file).forEachRemaining(actual::add);

final List<AvroGeneratedUser> expected =
IntStream.range(0, 10)
.mapToObj(
i ->
AvroGeneratedUser.newBuilder()
.setName(String.format("user%02d", i))
.setFavoriteColor(null)
.setFavoriteNumber(null)
.build())
.collect(Collectors.toList());
Assert.assertEquals(expected, actual);
}

@Test
public void testPredicate() throws Exception {
final ResourceId file =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.beam.sdk.extensions.smb;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -34,18 +39,29 @@ public class ParquetAvroSortedBucketIOTest {

@Test
public void testReadSerializable() {
final Configuration conf = new Configuration();
AvroReadSupport.setRequestedProjection(
conf,
Schema.createRecord(
Lists.newArrayList(
new Schema.Field("name", Schema.create(Schema.Type.STRING), "", ""))));

SerializableUtils.ensureSerializable(
SortedBucketIO.read(String.class)
.of(
ParquetAvroSortedBucketIO.read(new TupleTag<>("input"), AvroGeneratedUser.class)
.from(folder.toString())));
.from(folder.toString())
.withConfiguration(conf)
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("test"), 5))));

SerializableUtils.ensureSerializable(
SortedBucketIO.read(String.class)
.of(
ParquetAvroSortedBucketIO.read(
new TupleTag<>("input"), AvroGeneratedUser.getClassSchema())
.from(folder.toString())));
.from(folder.toString())
.withConfiguration(conf)
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("test"), 5))));
}

@Test
Expand Down

0 comments on commit 5cd7392

Please sign in to comment.