Skip to content

Commit

Permalink
GEOMESA-3436 NiFi - Fix FSDS path cache classpath with reflection (#3259
Browse files Browse the repository at this point in the history
)
  • Loading branch information
elahrvivaz authored Jan 14, 2025
1 parent 601ffb9 commit 7e7aa24
Showing 1 changed file with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,37 @@

package org.locationtech.geomesa.fs.storage.common.utils

import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator}
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty

import java.io.FileNotFoundException
import java.lang
import java.lang.reflect.Method
import java.util.concurrent.{CompletableFuture, Executor, TimeUnit}
import java.util.function.BiConsumer

/**
* Caches file statuses to avoid repeated file system operations. Status expires after a
* configurable period, by default 10 minutes.
*/
object PathCache {
object PathCache extends LazyLogging {

val CacheDurationProperty: SystemProperty = SystemProperty("geomesa.fs.file.cache.duration", "15 minutes")

private val duration = CacheDurationProperty.toDuration.get.toMillis

// cache for checking existence of files
private val pathCache =
private val pathCache: LoadingCache[(FileSystem, Path), lang.Boolean] =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
new CacheLoader[(FileSystem, Path), java.lang.Boolean]() {
override def load(key: (FileSystem, Path)): java.lang.Boolean = key._1.exists(key._2)
}
)

// cache for checking directory contents
private val listCache =
private val listCache: LoadingCache[(FileSystem, Path), Stream[FileStatus]] =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
new CacheLoader[(FileSystem, Path), Stream[FileStatus]]() {
override def load(key: (FileSystem, Path)): Stream[FileStatus] =
Expand All @@ -44,7 +47,7 @@ object PathCache {
)

// cache for individual file status
private val statusCache =
private val statusCache: LoadingCache[(FileSystem, Path), FileStatus] =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
new CacheLoader[(FileSystem, Path), FileStatus]() {
override def load(key: (FileSystem, Path)): FileStatus = {
Expand All @@ -70,6 +73,12 @@ object PathCache {
}
)

// we use reflection to get around Caffeine 2.x/3.x API differences in some environments
private val refresh: Method = classOf[LoadingCache[_, _]].getMethods.find(_.getName == "refresh").getOrElse {
logger.warn("Could not get refresh cache method - cache operations will be less efficient")
null
}

/**
* Register a path as existing
*
Expand All @@ -78,7 +87,9 @@ object PathCache {
*/
def register(fs: FileSystem, path: Path): Unit = {
pathCache.put((fs, path), java.lang.Boolean.TRUE)
statusCache.refresh((fs, path)) // also triggers listCache update
if (refresh != null) {
refresh.invoke(statusCache, (fs, path)) // also triggers listCache update
}
}

/**
Expand Down

0 comments on commit 7e7aa24

Please sign in to comment.