Skip to content

Commit

Permalink
GEOMESA-3406 Postgis - support filtering on list-type attributes (#3221)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Oct 23, 2024
1 parent 7cefa85 commit add94b9
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 12 deletions.
6 changes: 6 additions & 0 deletions geomesa-gt/geomesa-gt-partitioning/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<!-- 'works with' due to license issues -->
<groupId>javax.media</groupId>
<artifactId>jai_core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.geotools.data.postgis.PostGISPSDialect
import org.geotools.jdbc.JDBCDataStore
import org.geotools.api.filter.expression.{Expression, PropertyName}
import org.geotools.data.postgis.{PostGISPSDialect, PostgisPSFilterToSql}
import org.geotools.feature.AttributeTypeBuilder
import org.geotools.feature.simple.SimpleFeatureTypeBuilder
import org.geotools.jdbc.{JDBCDataStore, PreparedFilterToSQL}
import org.geotools.util.Version
import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisPsDialect.PartitionedPostgisPsFilterToSql

import java.lang.invoke.{MethodHandle, MethodHandles, MethodType}
import java.sql.{Connection, DatabaseMetaData, PreparedStatement, Types}
Expand Down Expand Up @@ -47,6 +52,15 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos
MethodHandles.lookup.findSpecial(classOf[PostGISPSDialect], "setValue", methodType, classOf[PartitionedPostgisPsDialect])
}

override def createPreparedFilterToSQL: PreparedFilterToSQL = {
val fts = new PartitionedPostgisPsFilterToSql(this, delegate.getPostgreSQLVersion(null))
fts.setFunctionEncodingEnabled(delegate.isFunctionEncodingEnabled)
fts.setLooseBBOXEnabled(delegate.isLooseBBOXEnabled)
fts.setEncodeBBOXFilterAsEnvelope(delegate.isEncodeBBOXFilterAsEnvelope)
fts.setEscapeBackslash(delegate.isEscapeBackslash)
fts
}

override def setValue(
value: AnyRef,
binding: Class[_],
Expand All @@ -57,7 +71,7 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos
// json columns are string type in geotools, but we have to use setObject or else we get a binding error
if (binding == classOf[String] && jsonColumns.get(new PreparedStatementKey(ps, column))) {
ps.setObject(column, value, Types.OTHER)
} else if (binding == classOf[java.util.List[_]]) {
} else if (binding.isArray || binding == classOf[java.util.List[_]]) {
// handle bug in jdbc store not calling setArrayValue in update statements
value match {
case null =>
Expand All @@ -77,9 +91,8 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos
setArray(array, ps, column, cx)
}

case _ =>
// this will almost certainly fail...
super.setValue(value, binding, att, ps, column, cx)
case singleton =>
setArray(Array(singleton), ps, column, cx)
}
} else {
super.setValue(value, binding, att, ps, column, cx)
Expand All @@ -92,7 +105,7 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos
// json columns are string type in geotools, but we have to use setObject or else we get a binding error
if (binding == classOf[String] && jsonColumns.get(new PreparedStatementKey(ps, column))) {
ps.setObject(column, value, Types.OTHER)
} else if (binding == classOf[java.util.List[_]]) {
} else if (binding.isArray || binding == classOf[java.util.List[_]]) {
// handle bug in jdbc store not calling setArrayValue in update statements
value match {
case null =>
Expand All @@ -112,9 +125,8 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos
setArray(array, ps, column, cx)
}

case _ =>
// this will almost certainly fail...
superSetValue.invoke(this, value, binding, ps, column, cx)
case singleton =>
setArray(Array(singleton), ps, column, cx)
}
} else {
superSetValue.invoke(this, value, binding, ps, column, cx)
Expand Down Expand Up @@ -164,6 +176,61 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos

object PartitionedPostgisPsDialect {

class PartitionedPostgisPsFilterToSql(dialect: PartitionedPostgisPsDialect, pgVersion: Version)
extends PostgisPSFilterToSql(dialect, pgVersion) {

import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor

import scala.collection.JavaConverters._

override def setFeatureType(featureType: SimpleFeatureType): Unit = {
// convert List-type attributes to Array-types so that prepared statement bindings work correctly
if (featureType.getAttributeDescriptors.asScala.exists(_.getType.getBinding == classOf[java.util.List[_]])) {
val builder = new SimpleFeatureTypeBuilder() {
override def init(`type`: SimpleFeatureType): Unit = {
super.init(`type`)
attributes().clear()
}
}
builder.init(featureType)
featureType.getAttributeDescriptors.asScala.foreach { descriptor =>
val ab = new AttributeTypeBuilder(builder.getFeatureTypeFactory)
ab.init(descriptor)
if (descriptor.getType.getBinding == classOf[java.util.List[_]]) {
ab.setBinding(java.lang.reflect.Array.newInstance(Option(descriptor.getListType()).getOrElse(classOf[String]), 0).getClass)
}
builder.add(ab.buildDescriptor(descriptor.getLocalName))
}
this.featureType = builder.buildFeatureType()
this.featureType.getUserData.putAll(featureType.getUserData)
} else {
this.featureType = featureType
}
}

// note: this would be a cleaner solution, but it doesn't get invoked due to explicit calls to
// super.getExpressionType in PostgisPSFilterToSql :/
override def getExpressionType(expression: Expression): Class[_] = {
val result = Option(expression).collect { case p: PropertyName => p }.flatMap { p =>
Option(p.evaluate(featureType).asInstanceOf[AttributeDescriptor]).map { descriptor =>
val binding = descriptor.getType.getBinding
if (binding == classOf[java.util.List[_]]) {
val listType = descriptor.getListType()
if (listType == null) {
classOf[Array[String]]
} else {
java.lang.reflect.Array.newInstance(listType, 0).getClass
}
} else {
binding
}
}
}

result.getOrElse(super.getExpressionType(expression))
}
}

// uses eq on the prepared statement to ensure that we compute json fields exactly once per prepared statement/col
private class PreparedStatementKey(val ps: PreparedStatement, val column: Int) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import com.typesafe.scalalogging.LazyLogging
import org.geotools.api.data._
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.geotools.api.filter.MultiValuedFilter.MatchAction
import org.geotools.data._
import org.geotools.factory.CommonFactoryFinder
import org.geotools.feature.simple.SimpleFeatureBuilder
import org.geotools.filter.text.ecql.ECQL
import org.geotools.jdbc.JDBCDataStore
Expand Down Expand Up @@ -67,7 +69,7 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll

lazy val features = Seq.tabulate(10) { i =>
val builder = new SimpleFeatureBuilder(sft)
builder.set("name", Collections.singletonList(s"name$i"))
builder.set("name", java.util.List.of(s"name$i", s"alt$i"))
builder.set("age", i)
builder.set("props", s"""["name$i"]""")
builder.set("dtg", new java.util.Date(now - ((i + 1) * 20 * 60 * 1000))) // 20 minutes
Expand All @@ -91,6 +93,8 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
lazy val host = Option(container).map(_.getHost).getOrElse("localhost")
lazy val port = Option(container).map(_.getFirstMappedPort).getOrElse(5432).toString

lazy val fif = CommonFactoryFinder.getFilterFactory

override def beforeAll(): Unit = {
val image =
DockerImageName.parse("ghcr.io/geomesa/postgis-cron")
Expand All @@ -109,6 +113,7 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
}

"PartitionedPostgisDataStore" should {

"fail with a useful error message if type name is too long" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)
Expand Down Expand Up @@ -201,7 +206,64 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
} finally {
ds.dispose()
}
ok
}

"filter on list elements" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
ds must beAnInstanceOf[JDBCDataStore]

val sft = SimpleFeatureTypes.renameSft(this.sft, "list-filters")
ds.getTypeNames.toSeq must not(contain(sft.getTypeName))
ds.createSchema(sft)

val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null)
schema must not(beNull)
schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq)
logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}")

// write some data
WithClose(new DefaultTransaction()) { tx =>
WithClose(ds.getFeatureWriterAppend(sft.getTypeName, tx)) { writer =>
features.foreach { feature =>
FeatureUtils.write(writer, feature, useProvidedFid = true)
}
}
tx.commit()
}

val filters = Seq(
fif.equals(fif.property("name"), fif.literal("name0")),
fif.equal(fif.property("name"), fif.literal("name0"), false, MatchAction.ANY),
fif.equals(fif.property("name"), fif.literal(Collections.singletonList("name0"))),
fif.equal(fif.property("name"), fif.literal(Collections.singletonList("name0")), false, MatchAction.ANY),
fif.equal(fif.property("name"), fif.literal(java.util.List.of("name0", "alt0")), false, MatchAction.ANY),
fif.equal(fif.property("name"), fif.literal(java.util.List.of("name0", "alt0")), false, MatchAction.ALL),
ECQL.toFilter("name = 'name0'"),
)
foreach(filters) { filter =>
WithClose(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)) { reader =>
val result = SelfClosingIterator(reader).toList
result must haveLength(1)
compFromDb(result.head) mustEqual compWithFid(features.head, sft)
}
}

val nonMatchingFilters = Seq(
fif.equal(fif.property("name"), fif.literal("name0"), false, MatchAction.ALL),
fif.equal(fif.property("name"), fif.literal(Collections.singletonList("name0")), false, MatchAction.ALL),
)
foreach(nonMatchingFilters) { filter =>
WithClose(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)) { reader =>
val result = SelfClosingIterator(reader).toList
result must beEmpty
}
}
} finally {
ds.dispose()
}
}

"age-off" in {
Expand Down

0 comments on commit add94b9

Please sign in to comment.