diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala index 532472b..83e7985 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala @@ -15,8 +15,9 @@ package org.apache.spark.sql.pulsar import java.{util => ju} +import java.util.Optional -import scala.collection.JavaConverters.asScalaBufferConverter +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.pulsar.client.admin.PulsarAdmin @@ -30,11 +31,12 @@ import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.JSONOptionsInRead import org.apache.spark.sql.connector.read.streaming -import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl} +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, ReportsSourceMetrics, SupportsAdmissionControl} import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset, Source} import org.apache.spark.sql.pulsar.PulsarOptions.ServiceUrlOptionKey import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets import org.apache.spark.sql.types.StructType +import org.apache.spark.util.LongAccumulator private[pulsar] class PulsarSource( @@ -51,7 +53,8 @@ private[pulsar] class PulsarSource( jsonOptions: JSONOptionsInRead) extends Source with Logging - with SupportsAdmissionControl { + with SupportsAdmissionControl + with ReportsSourceMetrics { import PulsarSourceUtils._ @@ -67,6 +70,11 @@ private[pulsar] class PulsarSource( private var currentTopicOffsets: Option[Map[String, MessageId]] = None + private val rowsBytesAccumulator: LongAccumulator = { + val accumulator = new LongAccumulator + sqlContext.sparkContext.register(accumulator, "pulsarLatestMicroBatchRowsBytesCounter") + accumulator + } private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema @@ -180,6 +188,7 @@ private[pulsar] class PulsarSource( failOnDataLoss, subscriptionNamePrefix, jsonOptions, + rowsBytesAccumulator, sqlContext.sparkContext.conf .getOption(PulsarClientFactory.PulsarClientFactoryClassOption)) @@ -195,13 +204,20 @@ private[pulsar] class PulsarSource( pulsarHelper.commitCursorToOffset(off) } + override def metrics(optional: Optional[streaming.Offset]): ju.Map[String, String] = { + // This is called during query progress reporting after a batch finishes. + val currBatchMetrics = Seq("numInputRows" -> rowsBytesAccumulator.count.toString, + "numInputBytes" -> rowsBytesAccumulator.value.toString).toMap.asJava + rowsBytesAccumulator.reset() + currBatchMetrics + } + override def stop(): Unit = synchronized { if (!stopped) { pulsarHelper.removeCursor() pulsarHelper.close() stopped = true } - } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala index ae33d17..49461ec 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.JSONOptionsInRead import org.apache.spark.sql.pulsar.PulsarSourceUtils._ -import org.apache.spark.util.{NextIterator, Utils} +import org.apache.spark.util.{LongAccumulator, NextIterator, Utils} private[pulsar] case class PulsarSourceRDDPartition(index: Int, offsetRange: PulsarOffsetRange) extends Partition @@ -54,7 +54,8 @@ private[pulsar] abstract class PulsarSourceRDDBase( topic: String, startOffset: MessageId, endOffset: MessageId, - context: TaskContext): Iterator[InternalRow] = { + context: TaskContext, + rowsBytesAccumulator: Option[LongAccumulator]): Iterator[InternalRow] = { val deserializer = new PulsarDeserializer(schemaInfo.si, jsonOptions) val schema: Schema[_] = SchemaUtils.getPSchema(schemaInfo.si) @@ -137,6 +138,7 @@ private[pulsar] abstract class PulsarSourceRDDBase( return null } + rowsBytesAccumulator.foreach(_.add(currentMessage.size())) currentId = currentMessage.getMessageId finished = false @@ -172,6 +174,7 @@ private[pulsar] class PulsarSourceRDD( failOnDataLoss: Boolean, subscriptionNamePrefix: String, jsonOptions: JSONOptionsInRead, + rowsBytesAccumulator: LongAccumulator, pulsarClientFactoryClassName: Option[String]) extends PulsarSourceRDDBase( sc, @@ -201,7 +204,7 @@ private[pulsar] class PulsarSourceRDD( return Iterator.empty } - computeInner(tp, start, end, context) + computeInner(tp, start, end, context, Some(rowsBytesAccumulator)) } } @@ -238,6 +241,6 @@ private[pulsar] class PulsarSourceRDD4Batch( return Iterator.empty } - computeInner(tp, start, end, context) + computeInner(tp, start, end, context, None) } } diff --git a/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala index 7b0b51a..9e2a7dd 100644 --- a/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala +++ b/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala @@ -15,15 +15,19 @@ package org.apache.spark.sql.pulsar import java.util.concurrent.ConcurrentLinkedQueue +import scala.collection.JavaConverters._ + import org.apache.pulsar.client.admin.PulsarAdmin import org.apache.spark.SparkException import org.apache.spark.sql.ForeachWriter -import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation +import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingExecutionRelation} import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.pulsar.PulsarOptions.{ServiceUrlOptionKey, TopicPattern} +import org.apache.spark.sql.streaming.StreamingQueryProgress import org.apache.spark.sql.streaming.Trigger.ProcessingTime import org.apache.spark.util.Utils + class PulsarMicroBatchV1SourceSuite extends PulsarMicroBatchSourceSuiteBase { test("V1 Source is used by default") { val topic = newTopic() @@ -63,8 +67,8 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase { test("input row metrics") { val topic = newTopic() + createTopic(topic, 12) sendMessages(topic, Array("-1")) - require(getLatestOffsets(Set(topic)).size === 1) val pulsar = spark.readStream .format("pulsar") @@ -74,6 +78,15 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase { .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] + def checkSourceMetrics( + progresses: Array[StreamingQueryProgress], + numInputRows: Long): Boolean = { + val sourceMetrics = progresses.map(_.sources.head.metrics) + sourceMetrics.map(_.get("numInputRows").toLong).sum == numInputRows && + sourceMetrics.map(_.get("numInputBytes").toLong).sum >= numInputRows && + progresses.map(_.numInputRows).sum == numInputRows + } + val mapped = pulsar.map(kv => kv._2.toInt + 1) testStream(mapped)( StartStream(trigger = ProcessingTime(1)), @@ -81,9 +94,20 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase { AddPulsarData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), AssertOnQuery { query => - val recordsRead = query.recentProgress.map(_.numInputRows).sum - recordsRead == 3 - } + checkSourceMetrics(query.recentProgress, 3) + }, + AddPulsarData(Set(topic), 4, 5), + CheckAnswer(2, 3, 4, 5, 6), + AssertOnQuery { query => + checkSourceMetrics(query.recentProgress, 5) + }, + StopStream, + StartStream(trigger = ProcessingTime(1)), + AddPulsarData(Set(topic), 6), + CheckAnswer(2, 3, 4, 5, 6, 7), + AssertOnQuery { query => + checkSourceMetrics(query.recentProgress, 1) + }, ) }