Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj] Improve error handling in VeniceVsonFileIterator and add tests #1406

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.linkedin.davinci.stats;

import static com.linkedin.venice.stats.StatsErrorCode.NULL_DIV_STATS;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;

import com.linkedin.venice.tehuti.MockTehutiReporter;
import com.linkedin.venice.utils.Utils;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.linkedin.davinci.store.memory;

import static org.testng.Assert.*;

import java.nio.ByteBuffer;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.davinci.utils;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package com.linkedin.venice.fastclient.transport;

import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.ImmutableMap;
import com.linkedin.r2.transport.common.Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,27 @@ public class VeniceVsonFileIterator implements VeniceRecordIterator {
private final BytesWritable currentKey = new BytesWritable();
private final BytesWritable currentValue = new BytesWritable();

public VeniceVsonFileIterator(FileSystem fs, Path hdfsPath, VeniceVsonRecordReader recordReader) {
if (fs != null && hdfsPath != null) {
try {
fileReader = new SequenceFile.Reader(fs, hdfsPath, new Configuration());
} catch (IOException e) {
LOGGER.info("Path: {} is not a sequence file.", hdfsPath.getName());
}
} else {
throw new VeniceException("Invalid file system or path");
public VeniceVsonFileIterator(FileSystem fileSystem, Path hdfsPath, VeniceVsonRecordReader recordReader) {
if (fileSystem == null) {
LOGGER.error("FileSystem cannot be null for VeniceVsonFileIterator");
throw new VeniceException("FileSystem cannot be null for VeniceVsonFileIterator");
}
if (hdfsPath == null) {
LOGGER.error("Path cannot be null for VeniceVsonFileIterator");
throw new VeniceException("Path cannot be null for VeniceVsonFileIterator");
}
if (recordReader == null) {
LOGGER.error("RecordReader cannot be null for VeniceVsonFileIterator");
throw new VeniceException("RecordReader cannot be null for VeniceVsonFileIterator");
}

try {
this.fileReader = new SequenceFile.Reader(fileSystem, hdfsPath, new Configuration());
} catch (IOException e) {
String errorMessage =
String.format("Failed to open file: %s. Ensure that the file is a valid sequence file.", hdfsPath.getName());
LOGGER.error(errorMessage, e);
throw new VeniceException(errorMessage, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the right thing to do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check how Avro input works. I'm very surprised this didn't fail for the avro inputs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does fail for avro input (non sequence file)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the right thing to do

Why? If we can't create fileReader what's the point in continuing with the execution? We can add retries (for non-seq file exception) if we want to have additional layer of protection

}

this.recordReader = recordReader;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.input.kafka;

import static org.testng.Assert.*;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.hadoop.input.kafka;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.input.kafka;

import static org.testng.Assert.*;
import static org.testng.Assert.assertTrue;

import org.apache.hadoop.io.BytesWritable;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.linkedin.venice.hadoop.input.recordreader.vson;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.spark.input.hdfs.TestSparkInputFromHdfs;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.annotations.Test;


public class VeniceVsonFileIteratorTest {
@Test
public void testConstructorInitializationAndExceptions() throws IOException {
// Setup: Temporary directory and files
File tempDir = Utils.getTempDataDirectory("test-dir");
File tempSeqFile = File.createTempFile("test-file", ".seq", tempDir);
Path tempSeqFilePath = new Path(tempSeqFile.getAbsolutePath());
File tempNonSeqFile = File.createTempFile("test-file", ".txt", tempDir);
Path tempNonSeqFilePath = new Path(tempNonSeqFile.getAbsolutePath());

FileSystem fileSystem = FileSystem.getLocal(new Configuration());
VeniceVsonRecordReader mockRecordReader = Mockito.mock(VeniceVsonRecordReader.class);

// Write valid data to the sequence file
TestSparkInputFromHdfs.writeVsonFile(tempSeqFilePath, 1, 2);

// Case 1: Valid FileSystem and Path
VeniceVsonFileIterator iterator = new VeniceVsonFileIterator(fileSystem, tempSeqFilePath, mockRecordReader);
assertNotNull(iterator);

// Case 2: Null FileSystem
Exception exception =
expectThrows(VeniceException.class, () -> new VeniceVsonFileIterator(null, tempSeqFilePath, mockRecordReader));
assertTrue(exception.getMessage().contains("FileSystem cannot be null"));

// Case 3: Null Path
Exception exception2 =
expectThrows(VeniceException.class, () -> new VeniceVsonFileIterator(fileSystem, null, mockRecordReader));
assertTrue(exception2.getMessage().contains("Path cannot be null"));

// Case 4: Null RecordReader
Exception exception3 =
expectThrows(VeniceException.class, () -> new VeniceVsonFileIterator(fileSystem, tempSeqFilePath, null));
assertTrue(exception3.getMessage().contains("RecordReader cannot be null"));

// Case 4: Invalid SequenceFile
expectThrows(
VeniceException.class,
() -> new VeniceVsonFileIterator(fileSystem, tempNonSeqFilePath, mockRecordReader));
}

@Test
public void testNextAndGetCurrentKeyValue() throws IOException {
// Setup: Temporary sequence file
File tempDir = Utils.getTempDataDirectory("test-dir");
File tempSeqFile = File.createTempFile("test-file", ".seq", tempDir);
Path tempSeqFilePath = new Path(tempSeqFile.getAbsolutePath());
FileSystem fileSystem = FileSystem.getLocal(new Configuration());
VeniceVsonRecordReader mockRecordReader = Mockito.mock(VeniceVsonRecordReader.class);

int totalRecords = 5;
// Write valid data to the sequence file
TestSparkInputFromHdfs.writeVsonFile(tempSeqFilePath, 1, totalRecords);

VeniceVsonFileIterator vsonFileIterator = new VeniceVsonFileIterator(fileSystem, tempSeqFilePath, mockRecordReader);

// Mock behavior for record reader
when(mockRecordReader.getKeyBytes(any(), any())).thenReturn("mockKey".getBytes());
when(mockRecordReader.getValueBytes(any(), any())).thenReturn("mockValue".getBytes());

byte[] lastKey = null;
byte[] lastValue = null;

int expectedRecords = totalRecords;
while (vsonFileIterator.next()) {
lastKey = vsonFileIterator.getCurrentKey();
lastValue = vsonFileIterator.getCurrentValue();
assertNotNull(lastKey);
assertNotNull(lastValue);
expectedRecords -= 1;
}
assertEquals(expectedRecords, 0);

// Case 2: No more records (simulate by mocking behavior)
assertFalse(vsonFileIterator.next());
assertNotNull(lastKey);
assertNotNull(lastValue);
assertEquals(lastKey, vsonFileIterator.getCurrentKey());
assertEquals(lastValue, vsonFileIterator.getCurrentValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,11 @@ private void writeAvroFile(File inputDir, String fileName, int start, int end) t
}
}

private void writeVsonFile(File parentDir, String fileName, int start, int end) throws IOException {
public static void writeVsonFile(File parentDir, String fileName, int start, int end) throws IOException {
writeVsonFile(new Path(parentDir.getAbsolutePath(), fileName), start, end);
}

public static void writeVsonFile(Path path, int start, int end) throws IOException {
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
metadata.set(new Text("key.schema"), new Text(VSON_STRING_SCHEMA));
metadata.set(new Text("value.schema"), new Text(VSON_STRING_SCHEMA));
Expand All @@ -281,7 +285,7 @@ private void writeVsonFile(File parentDir, String fileName, int start, int end)

try (SequenceFile.Writer writer = SequenceFile.createWriter(
new Configuration(),
SequenceFile.Writer.file(new Path(parentDir.toString(), fileName)),
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(BytesWritable.class),
SequenceFile.Writer.valueClass(BytesWritable.class),
SequenceFile.Writer.metadata(metadata))) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.venice.pulsar.sink;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import java.util.Map;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.linkedin.venice.pushmonitor;

import static org.testng.Assert.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.linkedin.venice.compute;

import static com.linkedin.venice.utils.TestWriteUtils.*;
import static org.testng.Assert.*;
import static com.linkedin.venice.utils.TestWriteUtils.loadFileAsString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

import com.linkedin.avro.api.PrimitiveFloatList;
import com.linkedin.avro.fastserde.primitive.PrimitiveFloatArrayList;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.meta;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;

import com.linkedin.venice.ConfigConstants;
import com.linkedin.venice.helix.StoreJSONSerializer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.pubsub.adapter.kafka.admin;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;

import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.VeniceProperties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.utils;

import static org.testng.Assert.*;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.linkedin.venice.fastclient.meta;

import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED;
import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.r2.transport.common.Client;
Expand Down
Loading