Skip to content

Commit

Permalink
GEOMESA-3436 NiFi - Fix FSDS path cache classpath (#3258)
Browse files Browse the repository at this point in the history
* Use Caffeine 2.x-compatible API calls
  • Loading branch information
elahrvivaz authored Jan 14, 2025
1 parent 713b902 commit 601ffb9
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator}
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty

import java.util.concurrent.TimeUnit
import java.io.FileNotFoundException
import java.util.concurrent.{CompletableFuture, Executor, TimeUnit}
import java.util.function.BiConsumer

/**
* Caches file statuses to avoid repeated file system operations. Status expires after a
Expand All @@ -32,14 +34,6 @@ object PathCache {
}
)

// cache for individual file status
private val statusCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
new CacheLoader[(FileSystem, Path), FileStatus]() {
override def load(key: (FileSystem, Path)): FileStatus = key._1.getFileStatus(key._2)
}
)

// cache for checking directory contents
private val listCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
Expand All @@ -49,6 +43,33 @@ object PathCache {
}
)

// cache for individual file status
private val statusCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
new CacheLoader[(FileSystem, Path), FileStatus]() {
override def load(key: (FileSystem, Path)): FileStatus = {
try { key._1.getFileStatus(key._2) } catch {
case _: FileNotFoundException => null
}
}

override def asyncLoad(key: (FileSystem, Path), executor: Executor): CompletableFuture[FileStatus] = {
super.asyncLoad(key, executor)
.whenCompleteAsync(new ListCacheRefresh(key), executor)
.asInstanceOf[CompletableFuture[FileStatus]]
}

override def asyncReload(
key: (FileSystem, Path),
oldValue: FileStatus,
executor: Executor): CompletableFuture[FileStatus] = {
super.asyncReload(key, oldValue, executor)
.whenCompleteAsync(new ListCacheRefresh(key), executor)
.asInstanceOf[CompletableFuture[FileStatus]]
}
}
)

/**
* Register a path as existing
*
Expand All @@ -57,14 +78,7 @@ object PathCache {
*/
def register(fs: FileSystem, path: Path): Unit = {
pathCache.put((fs, path), java.lang.Boolean.TRUE)
val status = statusCache.refresh((fs, path))
val parent = path.getParent
if (parent != null) {
listCache.getIfPresent((fs, parent)) match {
case null => // no-op
case list => listCache.put((fs, parent), list :+ status.get())
}
}
statusCache.refresh((fs, path)) // also triggers listCache update
}

/**
Expand All @@ -83,7 +97,7 @@ object PathCache {
}

/**
* Gets the file status for a path
* Gets the file status for a path. Path must exist.
*
* @param fs file system
* @param path path
Expand Down Expand Up @@ -116,12 +130,33 @@ object PathCache {
* @param fs file system
* @param path path
*/
def invalidate(fs: FileSystem, path: Path): Unit = Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path)))
def invalidate(fs: FileSystem, path: Path): Unit = {
Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path)))
if (path.getParent != null) {
listCache.invalidate((fs, path.getParent))
}
}

object RemoteIterator {
def apply[T](iter: RemoteIterator[T]): Iterator[T] = new Iterator[T] {
override def hasNext: Boolean = iter.hasNext
override def next(): T = iter.next
}
}

private class ListCacheRefresh(key: (FileSystem, Path)) extends BiConsumer[FileStatus, Throwable] {
override def accept(status: FileStatus, u: Throwable): Unit = {
if (status != null) { // could be null if load fails
val (fs, path) = key
val parent = path.getParent
if (parent != null) {
listCache.asMap().computeIfPresent((fs, parent), load(status)_)
}
}
}

// noinspection ScalaUnusedSymbol
private def load(status: FileStatus)(ignored: (FileSystem, Path), list: Stream[FileStatus]): Stream[FileStatus] =
list.filterNot(f => f.getPath == status.getPath) :+ status
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/***********************************************************************
* Copyright (c) 2013-2025 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.junit.runner.RunWith
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

import java.nio.file.Files
import scala.concurrent.duration.DurationInt

@RunWith(classOf[JUnitRunner])
class PathCacheTest extends Specification {

"PathCache" should {
"update list cache when registering a new file" >> {
val root = new Path(Files.createTempDirectory("geomesa").toFile.getPath)
val fs = FileSystem.get(root.toUri, new Configuration())
try {
val file = new Path(root, "test")
PathCache.exists(fs, file) must beFalse
PathCache.list(fs, root) must beEmpty
// create the file
fs.create(file).close()
fs.exists(file) must beTrue
// verify cache has not been updated
PathCache.exists(fs, file) must beFalse
PathCache.list(fs, root) must beEmpty
// register the file
PathCache.register(fs, file)
// verify cached values have been updated
PathCache.exists(fs, file) must beTrue
eventually(10, 100.millis)(PathCache.list(fs, root).toList must haveLength(1))
// note: it's hard to verify this is a cached value, since it doesn't cache if a file doesn't exist...
PathCache.status(fs, file) must not(beNull)
} finally {
fs.delete(root, true)
}
}
}
}

0 comments on commit 601ffb9

Please sign in to comment.