diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala index 138c2247..71ee0cfd 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala @@ -39,11 +39,15 @@ private[spark] object HBaseConnectionCache extends Logging { // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey. val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]() + val toCloseConnectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]() + val cacheStat = HBaseConnectionCacheStat(0, 0, 0) // in milliseconds private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY private var timeout = DEFAULT_TIME_OUT + private final val DEFAULT_MAX_LIFE_TIME: Long = HBaseSparkConf.DEFAULT_CONNECTION_MAX_LIFE_TIME + private var maxLifeTime = DEFAULT_MAX_LIFE_TIME private var closed: Boolean = false var housekeepingThread = new Thread(new Runnable { @@ -104,7 +108,25 @@ private[spark] object HBaseConnectionCache extends Logging { } connectionMap.remove(x._1) } + + // max life time + if (maxLifeTime > 0 && tsNow - x._2.createTime > maxLifeTime) { + toCloseConnectionMap.put(x._1, x._2) + connectionMap.remove(x._1) + } + } + } + } + + // clean toCloseConnectionMap + toCloseConnectionMap.foreach { x => + if (forceClean || x._2.refCount <= 0) { + try { + x._2.connection.close() + } catch { + case e: IOException => logWarning(s"Fail to close connection ${x._2}", e) } + toCloseConnectionMap.remove(x._1) } } } @@ -116,7 +138,7 @@ private[spark] object HBaseConnectionCache extends Logging { return null cacheStat.numTotalRequests += 1 val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1 - new SmartConnection(conn)}) + new SmartConnection(conn, System.currentTimeMillis())}) sc.refCount += 1 sc } @@ -134,11 +156,21 @@ private[spark] object HBaseConnectionCache extends Logging { housekeepingThread.interrupt() } } + + // For testing purpose only + def setMaxLifeTime(to: Long): Unit = { + connectionMap.synchronized { + if (closed) + return + maxLifeTime = to + housekeepingThread.interrupt() + } + } } @InterfaceAudience.Private private[hbase] case class SmartConnection ( - connection: Connection, var refCount: Int = 0, var timestamp: Long = 0) { + connection: Connection, createTime: Long, var refCount: Int = 0, var timestamp: Long = 0) { def getTable(tableName: TableName): Table = connection.getTable(tableName) def getRegionLocator(tableName: TableName): RegionLocator = connection.getRegionLocator(tableName) def isClosed: Boolean = connection.isClosed diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala index dc497f94..7c9174eb 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala @@ -59,4 +59,6 @@ object HBaseSparkConf{ val MAX_VERSIONS = "hbase.spark.query.maxVersions" /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */ val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000 + /** The maximum time to keep the cached connection alive, in milliseconds. */ + val DEFAULT_CONNECTION_MAX_LIFE_TIME = 24 * 3600 * 1000 } diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala index f07b1d6e..15f3794a 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala @@ -77,6 +77,7 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { testBasic() testWithPressureWithoutClose() testWithPressureWithClose() + testMaxLifeTime() } def cleanEnv() { @@ -236,4 +237,27 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { assert(HBaseConnectionCache.getStat.numActiveConnections === 0) } } + + def testMaxLifeTime(): Unit = { + cleanEnv() + + HBaseConnectionCache.setTimeout(1 * 1000) + HBaseConnectionCache.setMaxLifeTime(2 * 1000) + val coon = HBaseConnectionCache.getConnection(HBaseConnectionKeyMocker(1), new ConnectionMocker) + HBaseConnectionCache.connectionMap.synchronized { + assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.toCloseConnectionMap.size === 0) + } + Thread.sleep(3 * 1000) + HBaseConnectionCache.connectionMap.synchronized { + assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.toCloseConnectionMap.size === 1) + } + coon.close() + HBaseConnectionCache.performHousekeeping(false) + HBaseConnectionCache.connectionMap.synchronized { + assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.toCloseConnectionMap.size === 0) + } + } }