Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27658: Add max live time limit for cached connections #118

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ private[spark] object HBaseConnectionCache extends Logging {
// A hashmap of Spark-HBase connections. Key is HBaseConnectionKey.
val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()

val toCloseConnectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()

val cacheStat = HBaseConnectionCacheStat(0, 0, 0)

// in milliseconds
private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
private var timeout = DEFAULT_TIME_OUT
private final val DEFAULT_MAX_LIFE_TIME: Long = HBaseSparkConf.DEFAULT_CONNECTION_MAX_LIFE_TIME
private var maxLifeTime = DEFAULT_MAX_LIFE_TIME
private var closed: Boolean = false

var housekeepingThread = new Thread(new Runnable {
Expand Down Expand Up @@ -104,7 +108,25 @@ private[spark] object HBaseConnectionCache extends Logging {
}
connectionMap.remove(x._1)
}

// max life time
if (maxLifeTime > 0 && tsNow - x._2.createTime > maxLifeTime) {
toCloseConnectionMap.put(x._1, x._2)
connectionMap.remove(x._1)
}
}
}
}

// clean toCloseConnectionMap
toCloseConnectionMap.foreach { x =>
if (forceClean || x._2.refCount <= 0) {
try {
x._2.connection.close()
} catch {
case e: IOException => logWarning(s"Fail to close connection ${x._2}", e)
}
toCloseConnectionMap.remove(x._1)
}
}
}
Expand All @@ -116,7 +138,7 @@ private[spark] object HBaseConnectionCache extends Logging {
return null
cacheStat.numTotalRequests += 1
val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1
new SmartConnection(conn)})
new SmartConnection(conn, System.currentTimeMillis())})
sc.refCount += 1
sc
}
Expand All @@ -134,11 +156,21 @@ private[spark] object HBaseConnectionCache extends Logging {
housekeepingThread.interrupt()
}
}

// For testing purpose only
def setMaxLifeTime(to: Long): Unit = {
connectionMap.synchronized {
if (closed)
return
maxLifeTime = to
housekeepingThread.interrupt()
}
}
}

@InterfaceAudience.Private
private[hbase] case class SmartConnection (
connection: Connection, var refCount: Int = 0, var timestamp: Long = 0) {
connection: Connection, createTime: Long, var refCount: Int = 0, var timestamp: Long = 0) {
def getTable(tableName: TableName): Table = connection.getTable(tableName)
def getRegionLocator(tableName: TableName): RegionLocator = connection.getRegionLocator(tableName)
def isClosed: Boolean = connection.isClosed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ object HBaseSparkConf{
val MAX_VERSIONS = "hbase.spark.query.maxVersions"
/** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
/** The maximum time to keep the cached connection alive, in milliseconds. */
val DEFAULT_CONNECTION_MAX_LIFE_TIME = 24 * 3600 * 1000
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
testBasic()
testWithPressureWithoutClose()
testWithPressureWithClose()
testMaxLifeTime()
}

def cleanEnv() {
Expand Down Expand Up @@ -236,4 +237,27 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
}
}

def testMaxLifeTime(): Unit = {
cleanEnv()

HBaseConnectionCache.setTimeout(1 * 1000)
HBaseConnectionCache.setMaxLifeTime(2 * 1000)
val coon = HBaseConnectionCache.getConnection(HBaseConnectionKeyMocker(1), new ConnectionMocker)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.toCloseConnectionMap.size === 0)
}
Thread.sleep(3 * 1000)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.toCloseConnectionMap.size === 1)
}
coon.close()
HBaseConnectionCache.performHousekeeping(false)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.toCloseConnectionMap.size === 0)
}
}
}