diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml index e5067fc0ff1b..05e6e2b93bdc 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml @@ -61,6 +61,12 @@ org.specs2 specs2-junit_${scala.binary.version} + + com.github.erosb + everit-json-schema + 1.14.4 + test + diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/resources/geoparquet-1.1.0-schema.json b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/resources/geoparquet-1.1.0-schema.json new file mode 100644 index 000000000000..7510da0c52a7 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/resources/geoparquet-1.1.0-schema.json @@ -0,0 +1,166 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "GeoParquet", + "description": "Parquet metadata included in the geo field.", + "type": "object", + "required": [ + "version", + "primary_column", + "columns" + ], + "properties": { + "version": { + "type": "string", + "const": "1.1.0" + }, + "primary_column": { + "type": "string", + "minLength": 1 + }, + "columns": { + "type": "object", + "minProperties": 1, + "patternProperties": { + ".+": { + "type": "object", + "required": [ + "encoding", + "geometry_types" + ], + "properties": { + "encoding": { + "type": "string", + "pattern": "^(WKB|point|linestring|polygon|multipoint|multilinestring|multipolygon)$" + }, + "geometry_types": { + "type": "array", + "uniqueItems": true, + "items": { + "type": "string", + "pattern": "^(GeometryCollection|(Multi)?(Point|LineString|Polygon))( Z)?$" + } + }, + "crs": { + "oneOf": [ + { + "$ref": "https://proj.org/schemas/v0.7/projjson.schema.json" + }, + { + "type": "null" + } + ] + }, + "edges": { + "type": "string", + "enum": [ + "planar", + "spherical" + ] + }, + "orientation": { + "type": "string", + "const": "counterclockwise" + }, + "bbox": { + "type": "array", + "items": { + "type": "number" + }, + "oneOf": [ + { + "description": "2D bbox consisting of (xmin, ymin, xmax, ymax)", + "minItems": 4, + "maxItems": 4 + }, + { + "description": "3D bbox consisting of (xmin, ymin, zmin, xmax, ymax, zmax)", + "minItems": 6, + "maxItems": 6 + } + ] + }, + "epoch": { + "type": "number" + }, + "covering": { + "type": "object", + "required": [ + "bbox" + ], + "properties": { + "bbox": { + "type": "object", + "required": [ + "xmin", + "xmax", + "ymin", + "ymax" + ], + "properties": { + "xmin": { + "type": "array", + "items": [ + { + "type": "string", + "minLength": 1 + }, + { + "const": "xmin" + } + ], + "minItems": 2, + "maxItems": 2 + }, + "xmax": { + "type": "array", + "items": [ + { + "type": "string", + "minLength": 1 + }, + { + "const": "xmax" + } + ], + "minItems": 2, + "maxItems": 2 + }, + "ymin": { + "type": "array", + "items": [ + { + "type": "string", + "minLength": 1 + }, + { + "const": "ymin" + } + ], + "minItems": 2, + "maxItems": 2 + }, + "ymax": { + "type": "array", + "items": [ + { + "type": "string", + "minLength": 1 + }, + { + "const": "ymax" + } + ], + "minItems": 2, + "maxItems": 2 + } + } + } + } + } + } + } + }, + "additionalProperties": false + } + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala index 7e37459f7a5e..dcaf1d23e3ea 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala @@ -9,6 +9,7 @@ package org.locationtech.geomesa.fs.storage.parquet.io +import com.google.gson.GsonBuilder import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.metadata.FileMetaData @@ -16,13 +17,17 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.Types.BasePrimitiveBuilder import org.apache.parquet.schema._ -import org.geotools.api.feature.`type`.AttributeDescriptor -import org.geotools.api.feature.simple.SimpleFeatureType +import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor} +import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.locationtech.geomesa.features.serialization.TwkbSerialization.GeometryBytes import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration +import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes} import org.locationtech.geomesa.utils.text.StringSerialization +import org.locationtech.jts.geom.{Envelope, Geometry, GeometryCollection} + +import java.util.{Collections, Locale} /** * A paired simple feature type and parquet schema @@ -256,4 +261,97 @@ object SimpleFeatureParquetSchema { def apply(binding: ObjectType): Binding = bindings.getOrElse(binding, throw new NotImplementedError(s"No mapping defined for type $binding")) } + + object GeoParquetMetadata { + + val GeoParquetMetadataKey = "geo" + + private val gson = new GsonBuilder().disableHtmlEscaping().create() + + // TODO "covering" - per row bboxes can be used to accelerate spatial queries at the row group/page level + + /** + * Indicates if an attribute is supported or not. + * + * Currently only mixed geometry (encoded as WKB) and point types are encoded correctly as GeoParquet. + * + * TODO GEOMESA-3259 support non-point geometries + * + * @param d descriptor + * @return + */ + private[parquet] def supported(d: GeometryDescriptor): Boolean = { + ObjectType.selectType(d).last match { + case ObjectType.POINT | ObjectType.GEOMETRY => true + case _ => false + } + } + + /** + * Generate json describing the GeoParquet file, conforming to the 1.1.0 GeoParquet schema + * + * @param primaryGeometry name of the primary geometry in the file + * @param geometries pairs of geometries and optional bounds + * @return + */ + def apply(primaryGeometry: String, geometries: Seq[(GeometryDescriptor, Option[Envelope])]): String = { + val cols = geometries.map { case (descriptor, bounds) => + val (encoding, types) = descriptor.getType.getBinding match { + case b if b == classOf[Geometry] => ("WKB", Collections.emptyList()) + case b if b == classOf[GeometryCollection] => ("WKB", Collections.singletonList(classOf[GeometryCollection].getSimpleName)) + case b => (b.getSimpleName.toLowerCase(Locale.US), Collections.singletonList(b.getSimpleName)) + } + val metadata = new java.util.HashMap[String, AnyRef] + metadata.put("encoding", encoding) + metadata.put("geometry_types", types) // TODO add ' Z' for 3d points + bounds.filterNot(_.isNull).foreach { e => + metadata.put("bbox", java.util.List.of(e.getMinX, e.getMinY, e.getMaxX, e.getMaxY)) // TODO add z for 3d points + } + StringSerialization.alphaNumericSafeString(descriptor.getLocalName) -> metadata + } + + val model = java.util.Map.of( + "version", "1.1.0", + "primary_column", primaryGeometry, + "columns", cols.toMap.asJava + ) + + gson.toJson(model) + } + + /** + * Observer class that generates GeoParquet metadata + * + * @param sft simple feature type + */ + class GeoParquetObserver(sft: SimpleFeatureType) extends FileSystemObserver { + + import scala.collection.JavaConverters._ + + private val bounds = + sft.getAttributeDescriptors.asScala.zipWithIndex.collect { + case (d: GeometryDescriptor, i) if supported(d) => (d, i, new Envelope()) + } + + def metadata(): java.util.Map[String, String] = { + if (bounds.isEmpty) { Collections.emptyMap() } else { + val geoms = bounds.map { case (d, _, env) => (d, Some(env).filterNot(_.isNull)) } + val primary = bounds.find(_._1 == sft.getGeometryDescriptor).getOrElse(bounds.head)._1.getLocalName + Collections.singletonMap(GeoParquetMetadataKey, GeoParquetMetadata(primary, geoms.toSeq)) + } + } + + override def write(feature: SimpleFeature): Unit = { + bounds.foreach { case (_, i, envelope) => + val geom = feature.getAttribute(i).asInstanceOf[Geometry] + if (geom != null) { + envelope.expandToInclude(geom.getEnvelopeInternal) + } + } + } + + override def flush(): Unit = {} + override def close(): Unit = {} + } + } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala index 4c89b5f9ec34..a8ddf6cf0c9d 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala @@ -10,12 +10,14 @@ package org.locationtech.geomesa.fs.storage.parquet.io import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.hadoop.api.WriteSupport.{FinalizedWriteContext, WriteContext} import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.geotools.api.feature.`type`.AttributeDescriptor import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.GeoParquetMetadata.GeoParquetObserver import org.locationtech.geomesa.utils.geotools.ObjectType import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType +import org.locationtech.geomesa.utils.io.CloseWithLogging import org.locationtech.geomesa.utils.text.WKBUtils import org.locationtech.jts.geom._ @@ -26,6 +28,8 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] { private var writer: SimpleFeatureWriteSupport.SimpleFeatureWriter = _ private var consumer: RecordConsumer = _ + private var geoParquetObserver: GeoParquetObserver = _ + private var baseMetadata: java.util.Map[String, String] = _ override val getName: String = "SimpleFeatureWriteSupport" @@ -35,6 +39,8 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] { throw new IllegalArgumentException("Could not extract SimpleFeatureType from write context") } this.writer = SimpleFeatureWriteSupport.SimpleFeatureWriter(schema.sft) + this.geoParquetObserver = new GeoParquetObserver(schema.sft) + this.baseMetadata = schema.metadata new WriteContext(schema.schema, schema.metadata) } @@ -43,7 +49,22 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] { override def prepareForWrite(recordConsumer: RecordConsumer): Unit = consumer = recordConsumer // called per row - override def write(record: SimpleFeature): Unit = writer.write(consumer, record) + override def write(record: SimpleFeature): Unit = { + writer.write(consumer, record) + geoParquetObserver.write(record) + } + + // called once at the end + override def finalizeWrite(): FinalizedWriteContext = { + try { + val metadata = new java.util.HashMap[String, String]() + metadata.putAll(baseMetadata) + metadata.putAll(geoParquetObserver.metadata()) + new FinalizedWriteContext(metadata) + } finally { + CloseWithLogging(geoParquetObserver) + } + } } object SimpleFeatureWriteSupport { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala index d70001e7a873..ae1e1eca05c7 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala @@ -11,12 +11,16 @@ package org.locationtech.geomesa.parquet import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.everit.json.schema.loader.SchemaLoader import org.geotools.api.data.Query import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.geotools.filter.text.ecql.ECQL import org.geotools.util.factory.Hints +import org.json.{JSONObject, JSONTokener} import org.junit.runner.RunWith import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter @@ -24,8 +28,10 @@ import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.StorageKeys import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadataFactory import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorageFactory +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.GeoParquetMetadata import org.locationtech.geomesa.utils.collection.SelfClosingIterator import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes +import org.locationtech.geomesa.utils.io.WithClose import org.specs2.matcher.MatchResult import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner @@ -106,6 +112,25 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog val loaded = new FileBasedMetadataFactory().load(context) loaded must beSome testQuery(new ParquetFileSystemStorageFactory().apply(context, loaded.get), sft)("INCLUDE", null, features) + + // verify GeoParquet metadata + val geoParquetSchema = WithClose(getClass.getClassLoader.getResourceAsStream("geoparquet-1.1.0-schema.json")) { is => + SchemaLoader.load(new JSONObject(new JSONTokener(is))) + } + foreach(storage.getPartitions) { partition => + foreach(partition.files) { file => + val path = new Path(dir, file.name) + WithClose(ParquetFileReader.open(HadoopInputFile.fromPath(path, context.conf))) { reader => + val meta = reader.getFileMetaData.getKeyValueMetaData + val geo = Option(meta.get(GeoParquetMetadata.GeoParquetMetadataKey)).map(new JSONObject(_)).orNull + geo must not(beNull) + val col = geo.getJSONObject("columns").getJSONObject("geom") + col.getString("encoding") mustEqual "point" + col.getJSONArray("geometry_types").asScala.toSeq mustEqual Seq("Point") + geoParquetSchema.validate(geo) must not(throwAn[Exception]) + } + } + } } }