Skip to content

Commit

Permalink
Add OpenLineage reporting support for Spark connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ddebowczyk92 committed Sep 2, 2024
1 parent 166b764 commit 1c8b91a
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 3 deletions.
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
<scoverage.version>1.4.11</scoverage.version>
<sbt-compiler.version>1.0.0</sbt-compiler.version>
<jacoco.version>0.8.8</jacoco.version>
<openlineage.version>1.20.5</openlineage.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -243,6 +244,17 @@
<artifactId>audience-annotations</artifactId>
<version>${audience-annotations.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-interfaces</artifactId>
<version>${openlineage.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-entrypoint</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
Expand Down Expand Up @@ -387,7 +399,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<version>3.4.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
35 changes: 35 additions & 0 deletions spark/hbase-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-interfaces</artifactId>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-entrypoint</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark_2.12</artifactId>
<version>${openlineage.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -265,6 +280,26 @@
</rules>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<relocations>
<relocation>
<pattern>io.openlineage.spark.shade</pattern>
<shadedPattern>org.apache.hbase.thirdparty.io.openlineage.spark.shade</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.hadoop.hbase.spark.SparkHBaseLineageProvider
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.spark

import io.openlineage.spark.shade.client.OpenLineage
import io.openlineage.spark.shade.client.utils.DatasetIdentifier
import io.openlineage.spark.shade.extension.v1.{LineageRelation, LineageRelationProvider}
import java.util
import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.hadoop.hbase.CellUtil
Expand Down Expand Up @@ -53,7 +56,11 @@ import scala.collection.mutable
* Through the HBase Bytes object commands.
*/
@InterfaceAudience.Private
class DefaultSource extends RelationProvider with CreatableRelationProvider with Logging {
class DefaultSource
extends RelationProvider
with CreatableRelationProvider
with Logging
with LineageRelationProvider {

/**
* Is given input from SparkSQL to construct a BaseRelation
Expand All @@ -78,6 +85,19 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider with
relation.insert(data, false)
relation
}

def getLineageDatasetIdentifier(
sparkListenerEventName: String,
openLineage: OpenLineage,
sqlContext: Any,
parameters: Any): DatasetIdentifier = {
val params = parameters.asInstanceOf[Map[String, String]]
val hbaseContext = LatestHBaseContextCache.latest
val catalog = HBaseTableCatalog(params)
val name = s"${catalog.namespace}.${catalog.name}"
val namespace = s"hbase://${hbaseContext.config.get("hbase.zookeeper.quorum")}"
new DatasetIdentifier(name, namespace)
}
}

/**
Expand All @@ -93,7 +113,8 @@ case class HBaseRelation(
extends BaseRelation
with PrunedFilteredScan
with InsertableRelation
with Logging {
with Logging
with LineageRelation {
val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
Expand Down Expand Up @@ -611,6 +632,14 @@ case class HBaseRelation(
new PassThroughLogicExpression
}
}

def getLineageDatasetIdentifier(
sparkListenerEventName: String,
openLineage: OpenLineage): DatasetIdentifier = {
val name = s"${this.catalog.namespace}.${this.catalog.name}"
val namespace = s"hbase://${this.hbaseConf.get("hbase.zookeeper.quorum")}"
new DatasetIdentifier(name, namespace)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark

import io.openlineage.spark.extension.OpenLineageExtensionProvider
import io.openlineage.spark.shade.extension.v1.lifecycle.plan.SparkOpenLineageExtensionVisitor

class SparkHBaseLineageProvider extends OpenLineageExtensionProvider {

def shadedPackage(): String =
"org.apache.hbase.thirdparty.io.openlineage.spark.shade"

override def getVisitorClassName: String =
classOf[SparkOpenLineageExtensionVisitor].getCanonicalName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark

import io.openlineage.spark.agent.OpenLineageSparkListener
import java.io.File
import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, HBaseTableCatalog}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.scalatest.Matchers.convertToAnyShouldWrapper
import org.scalatest.concurrent.Eventually
import scala.collection.mutable.ArrayBuffer
import scala.io.Source

class OpenLineageSuite
extends FunSuite
with Eventually
with BeforeAndAfterEach
with BeforeAndAfterAll
with Logging {
@transient var sc: SparkSession = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility

val t1TableName = "t1"
val t2TableName = "t2"
val columnFamily = "c"
var sqlContext: SQLContext = null

val timestamp = 1234567890000L
val lineageFile = File.createTempFile(s"openlineage_test_${System.nanoTime()}", ".log")

override def beforeAll() {

TEST_UTIL.startMiniCluster

logInfo(" - minicluster started")
try
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
catch {
case e: Exception => logInfo(" - no table " + t1TableName + " found")
}
try
TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
catch {
case e: Exception => logInfo(" - no table " + t2TableName + " found")
}

logInfo(" - creating table " + t1TableName)
TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
logInfo(" - creating table " + t2TableName)
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")

val sparkConf = new SparkConf
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sparkConf.set("spark.extraListeners", classOf[OpenLineageSparkListener].getCanonicalName)
sparkConf.set("spark.openlineage.transport.type", "file")
sparkConf.set("spark.openlineage.transport.location", lineageFile.getAbsolutePath)

sc = SparkSession
.builder()
.master("local")
.appName("openlineage-test")
.config(sparkConf)
.getOrCreate();
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
try {
val t1Table = connection.getTable(TableName.valueOf(t1TableName))

try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
t1Table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
t1Table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t1Table.put(put)
put = new Put(Bytes.toBytes("get4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
t1Table.put(put)
put = new Put(Bytes.toBytes("get5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t1Table.put(put)
} finally {
t1Table.close()
}
} finally {
connection.close()
}

new HBaseContext(sc.sparkContext, TEST_UTIL.getConfiguration)
}

override def afterAll() {
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()

sc.stop()
}

override def beforeEach(): Unit = {
DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
}

test("Test rowKey point only rowKey query") {
val hbaseTable1Catalog =
s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
|}
|}""".stripMargin

val hbaseTable2Catalog =
s"""{
|"table":{"namespace":"default", "name":"t2"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"OUTPUT_A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"OUTPUT_B_FIELD":{"cf":"c", "col":"b", "type":"string"}
|}
|}""".stripMargin

val results = sc.read
.options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable1Catalog))
.format("org.apache.hadoop.hbase.spark")
.load()

results.createOrReplaceTempView("tempview");

val outputDf =
sc.sql("SELECT KEY_FIELD, A_FIELD AS OUTPUT_A_FIELD, B_FIELD AS OUTPUT_B_FIELD FROM tempview")

outputDf.write
.format("org.apache.hadoop.hbase.spark")
.options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable2Catalog))
.save()

val events = eventually {
val eventLog = parseEventLog(lineageFile); eventLog.size shouldBe 1; eventLog
}

val json = events.head
assert(((json \\ "inputs")(0) \ "name") == JString("default.t1"))
assert(((json \\ "inputs")(0) \ "namespace") == JString("hbase://127.0.0.1"))
assert(((json \\ "outputs")(0) \ "name") == JString("default.t2"))
assert(((json \\ "outputs")(0) \ "namespace") == JString("hbase://127.0.0.1"))
}

def parseEventLog(file: File): List[JValue] = {
val source = Source.fromFile(file)
val eventlist = ArrayBuffer.empty[JValue]
for (line <- source.getLines()) {
val event = parse(line)
for {
JObject(child) <- event
JField("inputs", JArray(inputs)) <- child
JField("outputs", JArray(outputs)) <- child
JField("eventType", JString(eventType)) <- child
if outputs.nonEmpty && inputs.nonEmpty && eventType == "COMPLETE"
} yield eventlist += event
}
eventlist.toList
}
}

0 comments on commit 1c8b91a

Please sign in to comment.