Skip to content

Commit

Permalink
GEOMESA-3411 FSDS - Fix path cache registration order (#3249)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Jan 2, 2025
1 parent d6d45c8 commit 4ffa739
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.orc.OrcFileSystemWriter
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.PathUtils
Expand Down Expand Up @@ -52,22 +52,23 @@ object FileSystemExporter extends LazyLogging {

class ParquetFileSystemExporter(path: String) extends FileSystemExporter {
override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = {
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
val file = new Path(PathUtils.getUrl(path).toURI)
val conf = new Configuration()
StorageConfiguration.setSft(conf, sft)
try { Class.forName("org.xerial.snappy.Snappy") } catch {
case _: ClassNotFoundException =>
logger.warn("SNAPPY compression is not available on the classpath - falling back to GZIP")
conf.set("parquet.compression", "GZIP")
}
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
new ParquetFileSystemWriter(sft, new Path(PathUtils.getUrl(path).toURI), conf)
new ParquetFileSystemWriter(sft, FileSystemContext(file, conf), file)
}
}

class OrcFileSystemExporter(path: String) extends FileSystemExporter {
override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = {
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
new OrcFileSystemWriter(sft, new Configuration(), new Path(PathUtils.getUrl(path).toURI))
val file = new Path(PathUtils.getUrl(path).toURI)
new OrcFileSystemWriter(sft, FileSystemContext(file, new Configuration()), file)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ abstract class AbstractFileSystemStorage(

def pathAndObserver: WriterConfig = {
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
PathCache.register(context.fs, path)
val updateObserver = new UpdateObserver(partition, path, action)
val observer = if (observers.isEmpty) { updateObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object PathCache {
)

/**
* * Register a path as existing
* Register a path as existing
*
* @param fs file system
* @param path path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata
extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
new OrcFileSystemWriter(metadata.sft, context.conf, file, observer)
new OrcFileSystemWriter(metadata.sft, context, file, observer)

override protected def createReader(
filter: Option[Filter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,29 @@

package org.locationtech.geomesa.fs.storage.orc

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.orc.OrcFile
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.fs.storage.orc.utils.OrcAttributeWriter
import org.locationtech.geomesa.utils.io.CloseQuietly

import scala.util.control.NonFatal

class OrcFileSystemWriter(
sft: SimpleFeatureType,
config: Configuration,
context: FileSystemContext,
file: Path,
observer: FileSystemObserver = NoOpObserver
) extends FileSystemWriter {

private val schema = OrcFileSystemStorage.createTypeDescription(sft)

private val options = OrcFile.writerOptions(config).setSchema(schema)
private val options = OrcFile.writerOptions(context.conf).setSchema(schema)
private val writer = OrcFile.createWriter(file, options)
private val batch = schema.createRowBatch()

Expand All @@ -56,6 +57,7 @@ class OrcFileSystemWriter(
case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e
}
CloseQuietly.raise(Seq(writer, observer))
PathCache.register(context.fs, file)
}

private def flushBatch(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.io.WithClose
Expand All @@ -32,26 +33,22 @@ class OrcFileSystemWriterTest extends Specification {
ScalaSimpleFeature.create(sft, "1", "name1", "1", "2017-01-01T00:00:01.000Z", "LINESTRING (10 1, 5 1)")
)

val config = new Configuration()

"OrcFileSystemWriter" should {
"write and read simple features" in {

withPath { path =>
withTestFile { file =>
WithClose(new OrcFileSystemWriter(sft, config, file)) { writer => features.foreach(writer.write) }
val reader = new OrcFileSystemReader(sft, config, None, None)
val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList }
read mustEqual features
// test out not calling 'hasNext'
var i = 0
WithClose(reader.read(file)) { iter =>
while (i < features.size) {
iter.next() mustEqual features(i)
i += 1
}
iter.next must throwA[NoSuchElementException]
withTestFile { file =>
val fc = FileSystemContext(file.getParent, new Configuration())
WithClose(new OrcFileSystemWriter(sft, fc, file)) { writer => features.foreach(writer.write) }
val reader = new OrcFileSystemReader(sft, fc.conf, None, None)
val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList }
read mustEqual features
// test out not calling 'hasNext'
var i = 0
WithClose(reader.read(file)) { iter =>
while (i < features.size) {
iter.next() mustEqual features(i)
i += 1
}
iter.next must throwA[NoSuchElementException]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.example.data.Group
import org.apache.parquet.filter2.compat.FilterCompat
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
Expand All @@ -24,9 +25,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.File
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.CloseQuietly

import scala.util.control.NonFatal

/**
*
* @param context file system context
Expand All @@ -35,11 +39,8 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = {
val sftConf = new Configuration(context.conf)
StorageConfiguration.setSft(sftConf, metadata.sft)
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer)
}
override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
new ParquetFileSystemWriter(metadata.sft, context, file, observer)

override protected def createReader(
filter: Option[Filter],
Expand Down Expand Up @@ -72,11 +73,17 @@ object ParquetFileSystemStorage extends LazyLogging {

class ParquetFileSystemWriter(
sft: SimpleFeatureType,
context: FileSystemContext,
file: Path,
conf: Configuration,
observer: FileSystemObserver = NoOpObserver
) extends FileSystemWriter {

private val conf = {
val conf = new Configuration(context.conf)
StorageConfiguration.setSft(conf, sft)
conf
}

private val writer = SimpleFeatureParquetWriter.builder(file, conf).build()

override def write(f: SimpleFeature): Unit = {
Expand All @@ -86,27 +93,35 @@ object ParquetFileSystemStorage extends LazyLogging {
override def flush(): Unit = observer.flush()
override def close(): Unit = {
CloseQuietly(Seq(writer, observer)).foreach(e => throw e)
if (FileValidationEnabled.get.toBoolean) {
PathCache.register(context.fs, file)
if (FileValidationEnabled.toBoolean.get) {
validateParquetFile(file)
}
}
}

/**
* Validate a file by reading it back
*
* @param file file to validate
*/
def validateParquetFile(file: Path): Unit = {
val reader = ParquetReader.builder(new GroupReadSupport(), file).build()

var reader: ParquetReader[Group] = null
try {
// Read Parquet file content
// read Parquet file content
reader = ParquetReader.builder(new GroupReadSupport(), file).build()
var record = reader.read()
while (record != null) {
// Process the record
record = reader.read()
}
logger.debug(s"${file} is a valid Parquet file")
logger.trace(s"$file is a valid Parquet file")
} catch {
case e: Exception => throw new RuntimeException(s"Unable to validate ${file}: File may be corrupted", e)
case NonFatal(e) => throw new RuntimeException(s"Unable to validate $file: File may be corrupted", e)
} finally {
reader.close()
if (reader != null) {
reader.close()
}
}
}
}

0 comments on commit 4ffa739

Please sign in to comment.