Skip to content

Commit

Permalink
GEOMESA-3259 FSDS - Support GeoParquet for Point geometries (#3254)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Jan 10, 2025
1 parent 2175eea commit 06c6b41
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
<version>1.14.4</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GeoParquet",
"description": "Parquet metadata included in the geo field.",
"type": "object",
"required": [
"version",
"primary_column",
"columns"
],
"properties": {
"version": {
"type": "string",
"const": "1.1.0"
},
"primary_column": {
"type": "string",
"minLength": 1
},
"columns": {
"type": "object",
"minProperties": 1,
"patternProperties": {
".+": {
"type": "object",
"required": [
"encoding",
"geometry_types"
],
"properties": {
"encoding": {
"type": "string",
"pattern": "^(WKB|point|linestring|polygon|multipoint|multilinestring|multipolygon)$"
},
"geometry_types": {
"type": "array",
"uniqueItems": true,
"items": {
"type": "string",
"pattern": "^(GeometryCollection|(Multi)?(Point|LineString|Polygon))( Z)?$"
}
},
"crs": {
"oneOf": [
{
"$ref": "https://proj.org/schemas/v0.7/projjson.schema.json"
},
{
"type": "null"
}
]
},
"edges": {
"type": "string",
"enum": [
"planar",
"spherical"
]
},
"orientation": {
"type": "string",
"const": "counterclockwise"
},
"bbox": {
"type": "array",
"items": {
"type": "number"
},
"oneOf": [
{
"description": "2D bbox consisting of (xmin, ymin, xmax, ymax)",
"minItems": 4,
"maxItems": 4
},
{
"description": "3D bbox consisting of (xmin, ymin, zmin, xmax, ymax, zmax)",
"minItems": 6,
"maxItems": 6
}
]
},
"epoch": {
"type": "number"
},
"covering": {
"type": "object",
"required": [
"bbox"
],
"properties": {
"bbox": {
"type": "object",
"required": [
"xmin",
"xmax",
"ymin",
"ymax"
],
"properties": {
"xmin": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "xmin"
}
],
"minItems": 2,
"maxItems": 2
},
"xmax": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "xmax"
}
],
"minItems": 2,
"maxItems": 2
},
"ymin": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "ymin"
}
],
"minItems": 2,
"maxItems": 2
},
"ymax": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "ymax"
}
],
"minItems": 2,
"maxItems": 2
}
}
}
}
}
}
}
},
"additionalProperties": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,25 @@

package org.locationtech.geomesa.fs.storage.parquet.io

import com.google.gson.GsonBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.Types.BasePrimitiveBuilder
import org.apache.parquet.schema._
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.features.serialization.TwkbSerialization.GeometryBytes
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.text.StringSerialization
import org.locationtech.jts.geom.{Envelope, Geometry, GeometryCollection}

import java.util.{Collections, Locale}

/**
* A paired simple feature type and parquet schema
Expand Down Expand Up @@ -256,4 +261,97 @@ object SimpleFeatureParquetSchema {
def apply(binding: ObjectType): Binding =
bindings.getOrElse(binding, throw new NotImplementedError(s"No mapping defined for type $binding"))
}

object GeoParquetMetadata {

val GeoParquetMetadataKey = "geo"

private val gson = new GsonBuilder().disableHtmlEscaping().create()

// TODO "covering" - per row bboxes can be used to accelerate spatial queries at the row group/page level

/**
* Indicates if an attribute is supported or not.
*
* Currently only mixed geometry (encoded as WKB) and point types are encoded correctly as GeoParquet.
*
* TODO GEOMESA-3259 support non-point geometries
*
* @param d descriptor
* @return
*/
private[parquet] def supported(d: GeometryDescriptor): Boolean = {
ObjectType.selectType(d).last match {
case ObjectType.POINT | ObjectType.GEOMETRY => true
case _ => false
}
}

/**
* Generate json describing the GeoParquet file, conforming to the 1.1.0 GeoParquet schema
*
* @param primaryGeometry name of the primary geometry in the file
* @param geometries pairs of geometries and optional bounds
* @return
*/
def apply(primaryGeometry: String, geometries: Seq[(GeometryDescriptor, Option[Envelope])]): String = {
val cols = geometries.map { case (descriptor, bounds) =>
val (encoding, types) = descriptor.getType.getBinding match {
case b if b == classOf[Geometry] => ("WKB", Collections.emptyList())
case b if b == classOf[GeometryCollection] => ("WKB", Collections.singletonList(classOf[GeometryCollection].getSimpleName))
case b => (b.getSimpleName.toLowerCase(Locale.US), Collections.singletonList(b.getSimpleName))
}
val metadata = new java.util.HashMap[String, AnyRef]
metadata.put("encoding", encoding)
metadata.put("geometry_types", types) // TODO add ' Z' for 3d points
bounds.filterNot(_.isNull).foreach { e =>
metadata.put("bbox", java.util.List.of(e.getMinX, e.getMinY, e.getMaxX, e.getMaxY)) // TODO add z for 3d points
}
StringSerialization.alphaNumericSafeString(descriptor.getLocalName) -> metadata
}

val model = java.util.Map.of(
"version", "1.1.0",
"primary_column", primaryGeometry,
"columns", cols.toMap.asJava
)

gson.toJson(model)
}

/**
* Observer class that generates GeoParquet metadata
*
* @param sft simple feature type
*/
class GeoParquetObserver(sft: SimpleFeatureType) extends FileSystemObserver {

import scala.collection.JavaConverters._

private val bounds =
sft.getAttributeDescriptors.asScala.zipWithIndex.collect {
case (d: GeometryDescriptor, i) if supported(d) => (d, i, new Envelope())
}

def metadata(): java.util.Map[String, String] = {
if (bounds.isEmpty) { Collections.emptyMap() } else {
val geoms = bounds.map { case (d, _, env) => (d, Some(env).filterNot(_.isNull)) }
val primary = bounds.find(_._1 == sft.getGeometryDescriptor).getOrElse(bounds.head)._1.getLocalName
Collections.singletonMap(GeoParquetMetadataKey, GeoParquetMetadata(primary, geoms.toSeq))
}
}

override def write(feature: SimpleFeature): Unit = {
bounds.foreach { case (_, i, envelope) =>
val geom = feature.getAttribute(i).asInstanceOf[Geometry]
if (geom != null) {
envelope.expandToInclude(geom.getEnvelopeInternal)
}
}
}

override def flush(): Unit = {}
override def close(): Unit = {}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ package org.locationtech.geomesa.fs.storage.parquet.io

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.api.WriteSupport.{FinalizedWriteContext, WriteContext}
import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.GeoParquetMetadata.GeoParquetObserver
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.geomesa.utils.io.CloseWithLogging
import org.locationtech.geomesa.utils.text.WKBUtils
import org.locationtech.jts.geom._

Expand All @@ -26,6 +28,8 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {

private var writer: SimpleFeatureWriteSupport.SimpleFeatureWriter = _
private var consumer: RecordConsumer = _
private var geoParquetObserver: GeoParquetObserver = _
private var baseMetadata: java.util.Map[String, String] = _

override val getName: String = "SimpleFeatureWriteSupport"

Expand All @@ -35,6 +39,8 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
throw new IllegalArgumentException("Could not extract SimpleFeatureType from write context")
}
this.writer = SimpleFeatureWriteSupport.SimpleFeatureWriter(schema.sft)
this.geoParquetObserver = new GeoParquetObserver(schema.sft)
this.baseMetadata = schema.metadata

new WriteContext(schema.schema, schema.metadata)
}
Expand All @@ -43,7 +49,22 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = consumer = recordConsumer

// called per row
override def write(record: SimpleFeature): Unit = writer.write(consumer, record)
override def write(record: SimpleFeature): Unit = {
writer.write(consumer, record)
geoParquetObserver.write(record)
}

// called once at the end
override def finalizeWrite(): FinalizedWriteContext = {
try {
val metadata = new java.util.HashMap[String, String]()
metadata.putAll(baseMetadata)
metadata.putAll(geoParquetObserver.metadata())
new FinalizedWriteContext(metadata)
} finally {
CloseWithLogging(geoParquetObserver)
}
}
}

object SimpleFeatureWriteSupport {
Expand Down
Loading

0 comments on commit 06c6b41

Please sign in to comment.