Skip to content

Commit

Permalink
Add rdd function for snapshot read
Browse files Browse the repository at this point in the history
  • Loading branch information
juke-mini666 committed Jul 18, 2019
1 parent 8f88efd commit b2e8d85
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
<compileSource>1.8</compileSource>
<java.min.version>${compileSource}</java.min.version>
<maven.min.version>3.5.0</maven.min.version>
<hbase.version>2.1.0</hbase.version>
<hbase.version>2.1.4</hbase.version>
<maven.compiler.version>3.6.1</maven.compiler.version>
<exec.maven.version>1.6.0</exec.maven.version>
<audience-annotations.version>0.5.0</audience-annotations.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,35 @@ import java.util
import java.util.UUID
import javax.management.openmbean.KeyAlreadyExistsException

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience
import org.apache.hadoop.hbase.fs.HFileSystem
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{BloomType, HStore, HStoreFile, StoreFileWriter}
import org.apache.hadoop.hbase.util.{Bytes, RegionSplitter}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.client._

import scala.reflect.ClassTag
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
TableInputFormat, IdentityTableMapper}
import org.apache.hadoop.hbase.mapreduce.{IdentityTableMapper, TableInputFormat, TableMapReduceUtil, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.streaming.dstream.DStream
import java.io._

import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, Path}

import scala.collection.mutable

/**
Expand Down Expand Up @@ -474,6 +476,81 @@ class HBaseContext(@transient val sc: SparkContext,
(r: (ImmutableBytesWritable, Result)) => r)
}

/**
*
* @param snapshotName the name of the snapshot to scan
* @param scans the HBase scan object to use to read data from HBase
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory,
* and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @return New RDD with results from scan
*/
def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String):
RDD[(ImmutableBytesWritable, Result)]
= hbaseRDDForSnapshot(snapshotName, scans, restoreDir, null, 1)

/**
* This function will use the native HBase TableSnapshotInputFormat with the
* given scan object to generate a new RDD
*
* @param snapshotName the name of the snapshot to scan
* @param scans the HBase scan object to use to read data from HBase
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory,
* and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param splitAlgo SplitAlgorithm to be used when generating InputSplits
* @param numSplitsPerRegion how many input splits to generate per one region
* @return New RDD with results from scan
*/
def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String,
splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int):
RDD[(ImmutableBytesWritable, Result)] = {
hbaseRDDForSnapshot[(ImmutableBytesWritable, Result)](
snapshotName,
scans,
restoreDir,
splitAlgo,
numSplitsPerRegion,
(r: (ImmutableBytesWritable, Result)) => r)
}

/**
* This function will use the native HBase TableSnapshotInputFormat with the
* given scan object to generate a new RDD
*
* @param snapshotName the name of the snapshot to scan
* @param scans the HBase scan object to use to read data from HBase
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory,
* and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param splitAlgo SplitAlgorithm to be used when generating InputSplits
* @param numSplitsPerRegion how many input splits to generate per one region
* @param f function to convert a Result object from HBase into
* what the user wants in the final generated RDD
* @return new RDD with results from scan
*/
def hbaseRDDForSnapshot[U: ClassTag](snapshotName: String, scans: Scan, restoreDir: String,
splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int,
f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
val job: Job = Job.getInstance(getConf(broadcastedConf))

TableMapReduceUtil.initCredentials(job)
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scans,
classOf[IdentityTableMapper], null, null, job, true, new Path(restoreDir), splitAlgo, numSplitsPerRegion)

val jconf = new JobConf(job.getConfiguration)
SparkHadoopUtil.get.addCredentials(jconf)
new NewHBaseRDD(sc,
classOf[TableSnapshotInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result],
job.getConfiguration,
this).map(f)
}

/**
* underlining wrapper all foreach functions in HBaseContext
*/
Expand Down

0 comments on commit b2e8d85

Please sign in to comment.