You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
While using connector 3.4.03 to read from pulsar topic, backlogs on pulsar topic pile up and don't get cleared up till TTL.
Spark Version- 3.5.0
Scala 2.12
Pulsar Version- 2.10.0.7
streamnative connector - pulsar-spark-connector_2.12-3.4.0.3.jar
To Reproduce
Steps to reproduce the behavior:
Running this script as a notebook process messages fine, but results in backlogs in pulsar that don't get cleared up till TTL
val pulsarDF = spark.readStream
.format("pulsar")
.option("service.url", serviceUrl)
.option("topic", topicName)
.option("predefinedSubscription", subscriptionName)
.option("pulsar.reader.receiverQueueSize", "10000")
.option("pulsar.client.operationTimeoutMs", "60000") // Set to 60 seconds
.option("failOnDataLoss", "false")
.load()
// Process the binary data and rename columns
val processedDF = pulsarDF.selectExpr(
"CAST(value AS STRING) as message",
"CAST(__publishTime AS STRING) as pulsarPublishTime"
)
val query = processedDF.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.table(tablePath) // This writes to managed table managed by Unity catalog
Expected behavior
Pulsar Backlogs are zero or close to zero
Additional context
It appears that stream native connector is not sending message acknowledgements to Pulsar broker causing the backlogs. Doesn't the stream native connector use Consumer API to read and ack?
The text was updated successfully, but these errors were encountered:
Describe the bug
While using connector 3.4.03 to read from pulsar topic, backlogs on pulsar topic pile up and don't get cleared up till TTL.
Spark Version- 3.5.0
Scala 2.12
Pulsar Version- 2.10.0.7
streamnative connector - pulsar-spark-connector_2.12-3.4.0.3.jar
To Reproduce
Steps to reproduce the behavior:
Running this script as a notebook process messages fine, but results in backlogs in pulsar that don't get cleared up till TTL
val pulsarDF = spark.readStream
.format("pulsar")
.option("service.url", serviceUrl)
.option("topic", topicName)
.option("predefinedSubscription", subscriptionName)
.option("pulsar.reader.receiverQueueSize", "10000")
.option("pulsar.client.operationTimeoutMs", "60000") // Set to 60 seconds
.option("failOnDataLoss", "false")
.load()
// Process the binary data and rename columns
val processedDF = pulsarDF.selectExpr(
"CAST(value AS STRING) as message",
"CAST(__publishTime AS STRING) as pulsarPublishTime"
)
val query = processedDF.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.table(tablePath) // This writes to managed table managed by Unity catalog
Expected behavior
Pulsar Backlogs are zero or close to zero
Additional context
It appears that stream native connector is not sending message acknowledgements to Pulsar broker causing the backlogs. Doesn't the stream native connector use Consumer API to read and ack?
The text was updated successfully, but these errors were encountered: