Skip to content

Commit

Permalink
HBASE-27658: Add max live time limit for cached connections
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed Aug 3, 2023
1 parent f8ee8f6 commit 4415f28
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ private[spark] object HBaseConnectionCache extends Logging {
// A hashmap of Spark-HBase connections. Key is HBaseConnectionKey.
val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()

val toCloseConnectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()

val cacheStat = HBaseConnectionCacheStat(0, 0, 0)

// in milliseconds
private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
private var timeout = DEFAULT_TIME_OUT
private final val DEFAULT_MAX_LIFE_TIME: Long = HBaseSparkConf.DEFAULT_CONNECTION_MAX_LIFE_TIME
private var maxLifeTime = DEFAULT_MAX_LIFE_TIME
private var closed: Boolean = false

var housekeepingThread = new Thread(new Runnable {
Expand Down Expand Up @@ -104,7 +108,25 @@ private[spark] object HBaseConnectionCache extends Logging {
}
connectionMap.remove(x._1)
}

// max life time
if (maxLifeTime > 0 && tsNow - x._2.createTime > maxLifeTime) {
toCloseConnectionMap.put(x._1, x._2)
connectionMap.remove(x._1)
}
}
}
}

// clean toCloseConnectionMap
toCloseConnectionMap.foreach { x =>
if (forceClean || x._2.refCount <= 0) {
try {
x._2.connection.close()
} catch {
case e: IOException => logWarning(s"Fail to close connection ${x._2}", e)
}
toCloseConnectionMap.remove(x._1)
}
}
}
Expand All @@ -116,7 +138,7 @@ private[spark] object HBaseConnectionCache extends Logging {
return null
cacheStat.numTotalRequests += 1
val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1
new SmartConnection(conn)})
new SmartConnection(conn, System.currentTimeMillis())})
sc.refCount += 1
sc
}
Expand All @@ -134,11 +156,21 @@ private[spark] object HBaseConnectionCache extends Logging {
housekeepingThread.interrupt()
}
}

// For testing purpose only
def setMaxLifeTime(to: Long): Unit = {
connectionMap.synchronized {
if (closed)
return
maxLifeTime = to
housekeepingThread.interrupt()
}
}
}

@InterfaceAudience.Private
private[hbase] case class SmartConnection (
connection: Connection, var refCount: Int = 0, var timestamp: Long = 0) {
connection: Connection, createTime: Long, var refCount: Int = 0, var timestamp: Long = 0) {
def getTable(tableName: TableName): Table = connection.getTable(tableName)
def getRegionLocator(tableName: TableName): RegionLocator = connection.getRegionLocator(tableName)
def isClosed: Boolean = connection.isClosed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ object HBaseSparkConf{
val MAX_VERSIONS = "hbase.spark.query.maxVersions"
/** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
/** The maximum time to keep the cached connection alive, in milliseconds. */
val DEFAULT_CONNECTION_MAX_LIFE_TIME = 24 * 3600 * 1000
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
testBasic()
testWithPressureWithoutClose()
testWithPressureWithClose()
testMaxLifeTime()
}

def cleanEnv() {
Expand Down Expand Up @@ -236,4 +237,27 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
}
}

def testMaxLifeTime(): Unit = {
cleanEnv()

HBaseConnectionCache.setTimeout(1 * 1000)
HBaseConnectionCache.setMaxLifeTime(2 * 1000)
val coon = HBaseConnectionCache.getConnection(HBaseConnectionKeyMocker(1), new ConnectionMocker)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.toCloseConnectionMap.size === 0)
}
Thread.sleep(3 * 1000)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.toCloseConnectionMap.size === 1)
}
coon.close()
HBaseConnectionCache.performHousekeeping(false)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.toCloseConnectionMap.size === 0)
}
}
}

0 comments on commit 4415f28

Please sign in to comment.