Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1823] Kafka source without topic schema #7066

Open
wants to merge 9 commits into
base: staging
Choose a base branch
from

Conversation

ForrestFairy
Copy link
Contributor

@ForrestFairy ForrestFairy commented Oct 23, 2024

Describe your changes

We want to enable users to choose and use topics without schema present on schema registry.

We won't validate input and it will be passed as Unknown

Problems

  • Currently using ad-hoc creates json object {"Value": message} and so it works differently compared to when message is put on topic.

  • Similarly, I'm not sure if we should handle metadata from hermes here or is it managed on cloud. If locally we create topic and send messages through hermes we get:

{
  "metadata": {
    "externalMetadata":  {},
    "id": "9b7a791e-fdf8-4b50-882b-203e6228491c",
    "timestamp": 1729676736412
  },
  "_w": true,
  "message": "some message"
}

when we only need message -> this can be worked around for now by using dynamic access

Checklist before merge

  • Related issue ID is placed at the beginning of PR title in [brackets] (can be GH issue or Nu Jira issue)
  • Code is cleaned from temporary changes and commented out lines
  • Parts of the code that are not easy to understand are documented in the code
  • Changes are covered by automated tests
  • Showcase in dev-application.conf added to demonstrate the feature
  • Documentation added or updated
  • Added entry in Changelog.md describing the change from the perspective of a public distribution user
  • Added MigrationGuide.md entry in the appropriate subcategory if introducing a breaking change
  • Verify that PR will be squashed during merge

Summary by CodeRabbit

  • New Features

    • Introduced integration tests for Kafka message processing with JSON and plain message formats.
    • Enhanced schema handling in the Kafka components, allowing for dynamic content type management (JSON and PLAIN).
    • Added new methods to verify schema presence for topics and improved error handling related to schema mismatches.
  • Bug Fixes

    • Refined deserialization logic to handle content types more robustly.
  • Documentation

    • Updated imports and method signatures to reflect new content type handling capabilities.

@ForrestFairy ForrestFairy force-pushed the kafka-source-without-topic-schema branch 2 times, most recently from 17b9df4 to 6ec935e Compare October 30, 2024 11:35
@ForrestFairy ForrestFairy marked this pull request as ready for review October 30, 2024 14:06
)
}).map(getVersionParam)
} else {
val versionValues = List(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need 2 options as I think currently we try to process every message as byte array or string

At least source with "Json" option chosen seems to be working with both normal json and byte array, as I'm testing it by using kafka producer locally

Copy link
Contributor

@raphaelsolarski raphaelsolarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments.
Let's start from some test with scenario runner which shows that the source works in desired configuration.
After that we can figure out whether we can render "content type" field in case of non-schemed topics instead of "version" field.


case class DynamicSchemaVersion(typ: JsonTypes) extends SchemaVersionOption

sealed abstract class JsonTypes(val value: Int) extends IntEnumEntry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need value here? can we use only Enum[JsonTypes] instead of IntEnum[JsonTypes]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • as I wrote in previous comments IMO we should separate selected version representation from content type.

Szymon Bogusz added 8 commits November 6, 2024 12:05
…andling when couldn't list kafka topics (previously it wasn't needed)
…compiles

Also a workaround in the same file for test, it will now fetch all topics from schema registry as before and from kafka, so tests should pass
When topic without schema is selected instead of version, user can now choose ContentType (which for now doesn't change anything, need to implement handling bytes array)
@ForrestFairy ForrestFairy force-pushed the kafka-source-without-topic-schema branch from cd3c797 to 4ef3b59 Compare November 6, 2024 11:05

class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with LazyLogging {

private val givenMatchingAvroObjV2 = avroEncoder.encodeRecordOrError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. imo we don't need anything avro connected in the test class, including encoding record using avroEncoder, using RecordSchemaV2 and decoding json using UniversalSchemaSupportDispatcher/RuntimeSchemaData/SchemaWithMetadata.

All of that we can do basing on simple circe json parser and json ast

package pl.touk.nussknacker.defaultmodel

import io.circe.{Json, parser}
import pl.touk.nussknacker.engine.api.process.TopicName.ForSource
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes
import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion

import java.nio.charset.StandardCharsets
import java.time.Instant

class KafkaJsonItSpec extends FlinkWithKafkaSuite {

  test("should round-trip json message without provided schema") {
    val jsonRecord = Json.obj(
      "first"  -> Json.fromString("Jan"),
      "middle" -> Json.fromString("Tomek"),
      "last"   -> Json.fromString("Kowalski")
    )

    val inputTopic  = "input-topic-without-schema"
    val outputTopic = "output-topic-without-schema"

    kafkaClient.createTopic(inputTopic, 1)
    kafkaClient.createTopic(outputTopic, 1)
    sendAsJson(jsonRecord.toString(), ForSource(inputTopic), Instant.now.toEpochMilli)

    val process =
      ScenarioBuilder
        .streaming("without-schema")
        .parallelism(1)
        .source(
          "start",
          "kafka",
          KafkaUniversalComponentTransformer.topicParamName.value       -> Expression.spel(s"'$inputTopic'"),
          KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel
        )
        .emptySink(
          "end",
          "kafka",
          KafkaUniversalComponentTransformer.sinkKeyParamName.value       -> "".spel,
          KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel,
          KafkaUniversalComponentTransformer.sinkValueParamName.value     -> "#input".spel,
          KafkaUniversalComponentTransformer.topicParamName.value         -> s"'$outputTopic'".spel,
          KafkaUniversalComponentTransformer.contentTypeParamName.value   -> s"'${ContentTypes.JSON.toString}'".spel,
          KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel
        )

    run(process) {
      val outputRecord = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head
      val parsedOutput = parser
        .parse(new String(outputRecord.value(), StandardCharsets.UTF_8))
        .fold(throw _, identity)

      parsedOutput shouldBe jsonRecord
    }
  }

}

import java.time.Instant
import java.util

class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with LazyLogging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik we don't need LazyLogging here because we don't use loggers directly in the class.
PatientScalaFutures is also not necessary as long we don't have a place to pass PantienceConfig in the class for eventually/futureValue/whenReady usages (at least I can't find it).

@@ -0,0 +1,83 @@
package pl.touk.nussknacker.defaultmodel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo we need a test case or test class for PLAIN content type

)
.asInstanceOf[util.HashMap[String, String]]

response.forEach((key, value) => givenMatchingAvroObjV2.get(key) shouldBe value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion in forEach is tricky - it won't fail if map is empty

)
}).map(getVersionParam)
val topicsWithSchema = topicSelectionStrategy.getTopics(schemaRegistryClient)
if (topicsWithSchema.exists(_.contains(preparedTopic.prepared.topicName.toUnspecialized))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe lets extract the condition and topicsWithSchema computing line to some method like isTopicWithoutSchema(topic, strategy), wdyt?

val valueBytes = readValueMessage(valueSchemaOpt, topic, value)
(keyBytes, valueBytes)

if (schemaRegistryClient.getAllTopics.exists(_.contains(UnspecializedTopicName(topic.name)))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a similar condition in a few places in the change 😄 maybe we can extract it somewhere?

val valueSchemaOpt =
Option(
SchemaWithMetadata(
OpenAPIJsonSchema("""{"type": "object"}"""),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this value actually have to be a json object in root? What about other valid json elements (strings, numbers, arrays, ... null?)?

} else {
SchemaWithMetadata(
// I don't know how these schemas affect deserialization later
OpenAPIJsonSchema("""{"type": "object"}"""),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like above - imo we can have other element types as root element.

_
) =>
val preparedTopic = prepareTopic(topic)
val valueValidationResult = if (contentType.equals("JSON")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls use some constant/enum here

Valid(
(
Some(
RuntimeSchemaData[ParsedSchema](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does it work with non-json values? I mean e.g. unescaped string (someString) as root element in kafka record value

- created a function for repeated code
- removed use of avro in test for json
- used more constants were applicable
Copy link

coderabbitai bot commented Nov 18, 2024

Walkthrough

The changes introduce a comprehensive set of modifications across multiple files to enhance Kafka message processing capabilities, particularly focusing on JSON and plain message formats. A new integration test file, KafkaJsonItSpec.scala, is added to validate message round-tripping without provided schemas. The KafkaUniversalComponentTransformer class is updated to handle content type parameters and improve topic retrieval. Additionally, new content types are defined, and various components, including deserializers and formatters, are modified to support these enhancements, ensuring robust schema handling and error management.

Changes

File Change Summary
engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala New file added for integration tests on Kafka message processing with JSON and plain formats. Contains two tests for round-tripping messages without schemas.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala Added contentTypeParamName variable, renamed getVersionParam to getVersionOrContentTypeParam, added getAllTopics method, and updated logic for handling topics and parameters based on schema presence.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/ContentTypes.scala New file defining ContentTypes enumeration with JSON and PLAIN, and ContentTypesSchemas object with schemas for JSON and plain types.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala Added isTopicWithSchema method to check if a topic has an associated schema.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala Updated parseRecord method to include schema checks and handle content types, modifying serialization logic accordingly.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala Modified formValueEncoder method to handle input values as maps, enhancing encoding logic for various formats.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala Enhanced readMessage method to conditionally handle string messages based on schema presence.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala Updated deserialize method to check for schema presence before processing, refining error handling for content types.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala Altered deserialize method to handle specific cases for JSON schemas, improving control flow.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala Added support for JSON schemas in rawEditorParameterStep and valueEditorParamStep, enhancing parameter handling based on content types.
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala Updated nextSteps and prepareSourceFinalResults methods to manage JSON and plain content types, improving schema handling in the source factory.

Poem

🐰 In the land of bytes and streams,
Where JSON dances, and plain gleams,
New tests are hopping, round-trips in sight,
Content types twinkle, oh what a delight!
With schemas in hand, we leap and play,
In Kafka's embrace, we find our way! 🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Outside diff range and nitpick comments (14)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala (1)

85-89: Document the plain content type handling behavior

The special handling for plain content type should be documented to explain when raw bytes are returned versus when deserialization occurs. This aligns with the PR objective of supporting topics without schemas.

+  /**
+   * Deserializes the payload based on the schema type:
+   * - For plain content type (no schema): returns raw bytes without transformation
+   * - For other JSON schemas: uses the schema's deserializer
+   */
   override def deserialize(
engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala (3)

73-74: Remove unused variable

The longJsonInHex variable is defined but never used in the test.

-    val longJsonInHex =
-      "227b226669727374223a2022546f6d656b222c20226d6964646c65223a20224a616e222c20226c617374223a20224b6f77616c736b69227d22"

76-82: Remove debug code

Remove the debug-related code including the unused BigInteger conversion and print statements.

-    val big        = new BigInteger(shortJsonInHex, 16).toByteArray
-
-    val str = new String(byteString)
-    println(str)
-    println(byteString.mkString("Array(", ", ", ")"))
-    println(big.mkString("Array(", ", ", ")"))

66-114: Consider adding validation for message content

While the test verifies the byte-level equality, it would be valuable to also validate that the message content is a valid JSON string as expected.

     run(process) {
       val outputRecord = kafkaClient
         .createConsumer()
         .consumeWithConsumerRecord(outputTopic)
         .take(1)
         .head

       outputRecord.value() shouldBe byteString
+      // Verify the content is valid JSON
+      val outputJson = parser.parse(new String(outputRecord.value()))
+      outputJson.isRight shouldBe true
     }
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala (3)

75-101: Reduce code duplication in schema handling

The JSON and plain cases have similar structure. Consider extracting the common logic into a helper method.

private def createRuntimeSchemaData(contentType: ContentTypes, typingResult: TypingResult): (Option[RuntimeSchemaData[ParsedSchema]], TypingResult) = {
  val schema = contentType match {
    case ContentTypes.JSON => ContentTypesSchemas.schemaForJson
    case ContentTypes.PLAIN => ContentTypesSchemas.schemaForPlain
  }
  (
    Some(
      RuntimeSchemaData[ParsedSchema](
        new NkSerializableParsedSchema[ParsedSchema](schema),
        Some(SchemaId.fromString(contentType.toString))
      )
    ),
    typingResult
  )
}

97-98: Address TODO comment about Array[Byte] handling

The comment indicates incomplete implementation for plain content type handling. Please clarify the implementation timeline or requirements.

Would you like me to help implement the Array[Byte] handling or create a GitHub issue to track this task?


80-80: Avoid using content type strings as schema IDs

Using content type strings as schema IDs could lead to conflicts if the schema registry is used in the future. Consider using dedicated schema identifiers.

Consider:

  1. Using a dedicated schema ID generation mechanism
  2. Adding a prefix to distinguish these special cases (e.g., "internal-json-schema")
  3. Documenting the schema ID allocation strategy

Also applies to: 93-93

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala (2)

56-56: Improve Error Message with Actual Content Type

When throwing the IllegalStateException, it's helpful to include the actual content type that was received. This provides clearer context for debugging.

Apply this diff to enhance the error message:

-            throw new IllegalStateException("Topic without schema should have ContentType Json or Plain, was neither")
+            throw new IllegalStateException(s"Topic without schema should have ContentType JSON or PLAIN, but was [$writerSchemaId]")

Line range hint 61-68: Add Tests for Mismatched Schema Types

The current code throws a MismatchReaderWriterSchemaException when the reader and writer schema types do not match. However, there is a TODO comment indicating that this case needs testing, especially when supporting JSON schema.

Consider adding unit tests to cover scenarios where the reader and writer schema types differ to ensure the deserializer handles these cases correctly. Would you like assistance in creating these test cases?

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala (1)

120-139: Consider adding unit tests for schemaless topic handling

It's important to ensure that the new logic for handling schemaless topics is thoroughly tested. Adding unit tests covering scenarios for JSON and plain content types can help prevent future regressions.

Would you like assistance in creating unit tests for these cases?

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala (4)

171-175: Use named parameters in copy method to enhance clarity

In line 174, when calling the copy method on jsonSchema, it's advisable to use named parameters to explicitly indicate which field is being modified. This improves code readability and reduces potential errors if the class structure changes in the future.

Apply this diff to use named parameters:

- jsonSchema.copy(new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForPlain))
+ jsonSchema.copy(
+   schema = new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForPlain)
+ )

174-174: Consider defining a separate plainSchema variable for better clarity

Reusing jsonSchema.copy to represent a plain schema can be confusing. Defining a separate variable plainSchema enhances code clarity and maintainability.

Define plainSchema:

private val plainSchema = RuntimeSchemaData[ParsedSchema](
  new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForPlain),
  None
)

Update the code to use plainSchema:

      val runtimeSchemaData = if (contentType.equals(ContentTypes.JSON.toString)) {
        jsonSchema
      } else {
-       jsonSchema.copy(
-         schema = new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForPlain)
-       )
+       plainSchema
      }

171-172: Use case-insensitive comparison for content type

When comparing strings like contentType, it's safer to perform a case-insensitive comparison to handle input variations gracefully.

Apply this diff:

- val runtimeSchemaData = if (contentType.equals(ContentTypes.JSON.toString)) {
+ val runtimeSchemaData = if (contentType.equalsIgnoreCase(ContentTypes.JSON.toString)) {

254-255: Use case-insensitive comparison for contentType in valueEditorParamStep

Ensure that the comparison for contentType is case-insensitive to handle different input cases.

Apply this diff:

- (`contentTypeParamName`, DefinedEagerParameter(contentType: String, _)) ::
+ (`contentTypeParamName`, DefinedEagerParameter(contentTypeRaw: String, _)) ::
...
- if (contentType.equals(ContentTypes.JSON.toString)) {
+ val contentType = contentTypeRaw.toUpperCase
+ if (contentType == ContentTypes.JSON.toString) {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 58f8185 and 65d38fb.

📒 Files selected for processing (11)
  • engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala (1 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala (6 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/ContentTypes.scala (1 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala (2 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala (2 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala (2 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala (2 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala (2 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala (2 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala (6 hunks)
  • utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala (2 hunks)
🔇 Additional comments (17)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/ContentTypes.scala (3)

1-4: LGTM: Package structure and imports are well-organized

The package hierarchy and import statement are appropriate for schema registry functionality.


11-14: Add documentation and consider schema validation implications

The current implementation uses extremely permissive schemas (empty object/string) which effectively bypass schema validation. While this aligns with the PR's goal of handling schema-less topics, it has several implications:

  1. There's no documentation explaining when these schemas should be used
  2. The empty schemas might mask potential data quality issues
  3. It's unclear how this relates to the metadata handling mentioned in the PR objectives

Let's check if these schemas are used appropriately:

#!/bin/bash
# Search for usages of these schemas to understand the context
rg -l "ContentTypesSchemas" | xargs rg "schemaFor(Json|Plain)"

Consider:

  1. Adding comprehensive documentation explaining the use cases and implications
  2. Implementing basic structural validation even for schema-less topics
  3. Adding logging when these permissive schemas are used

1-14: Verify handling of hermes metadata

The PR objectives mention issues with hermes metadata handling, but this implementation doesn't explicitly address how metadata fields will be handled with these content types.

Let's check how metadata is handled:

✅ Verification successful

Let me gather more specific information about metadata handling.


Let me check one more specific aspect of the implementation to understand how the content types interact with message handling.


Based on the gathered information, I can now provide the final response:

Implementation correctly handles metadata through content type distinction

The implementation properly addresses metadata handling through the content type system:

  • Plain and JSON content types are handled separately in UniversalKafkaDeserializer with appropriate schema assignments
  • The deserializer implementation maintains metadata through SchemaWithMetadata wrapper
  • The system correctly handles both schema-based and schema-less messages with proper content type detection
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for hermes metadata handling in the codebase
rg -A 5 "hermes.*metadata" 

# Look for any message transformation logic
ast-grep --pattern 'def transform$_($$$) {
  $$$
}'

Length of output: 2630


Script:

#!/bin/bash
# Look for metadata handling in UniversalKafkaDeserializer
rg -A 10 "SchemaWithMetadata" utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala

# Check for any specific hermes metadata handling
rg -A 5 "hermes" utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/

# Look for metadata-related schema definitions
ast-grep --pattern 'case class $_Metadata($$$)'

Length of output: 3538


Script:

#!/bin/bash
# Check UniversalSchemaPayloadDeserializer for message handling
rg -A 10 "deserialize" utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala

# Look for any schema-less topic handling
rg -A 5 "schemaless" utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/

Length of output: 2433

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala (1)

43-46: 🛠️ Refactor suggestion

Consider improving error handling and performance

The implementation has several areas that could be enhanced:

  1. Error Handling: The method silently handles potential errors from strategy.getTopics. Consider propagating errors to allow proper handling by callers.
  2. Performance: Fetching all topics for each check could be inefficient. Consider caching results if this method is called frequently.
  3. Input Validation: Add parameter validation for null safety.

Consider this improved implementation:

- def isTopicWithSchema(topic: String, strategy: TopicSelectionStrategy): Boolean = {
-   val topicsWithSchema = strategy.getTopics(this)
-   topicsWithSchema.exists(_.map(_.name).contains(topic))
- }
+ def isTopicWithSchema(topic: String, strategy: TopicSelectionStrategy): Validated[SchemaRegistryError, Boolean] = {
+   require(topic != null, "Topic cannot be null")
+   require(strategy != null, "Strategy cannot be null")
+   strategy.getTopics(this).map(topics => topics.exists(_.name == topic))
+ }

Let's verify the usage of this method to understand the impact:

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala (2)

10-10: LGTM: Import statement is correctly placed

The new import for ContentTypesSchemas is appropriately placed with other schema-related imports and is necessary for the plain content type handling.


85-89: Verify plain content type handling across the codebase

The introduction of special handling for plain content type may affect other components that consume the deserialized data. We should verify that consumers can handle raw byte arrays.

✅ Verification successful

The previous search didn't yield results because we were looking for the wrong class name. Let's broaden our search to understand the content type handling and byte array processing in the codebase.


Based on the search results, I can now make a determination about the plain content type handling. The codebase shows that:

  1. The plain content type handling is properly integrated in the UniversalSchemaPayloadDeserializer with ContentTypesSchemas.schemaForPlain
  2. The byte array handling is widely supported throughout the codebase:
    • Kafka sources and sinks work with Array[Byte] as their base type
    • The deserialization/serialization infrastructure has proper byte array support
    • The Lite engine and other components handle raw byte arrays correctly

Raw byte array handling is properly supported across the codebase

The special handling for plain content type is consistent with the overall architecture where raw byte arrays are a fundamental data type. Consumers are already equipped to handle raw byte arrays as this is the base type for Kafka integration.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for places where JsonSchemaPayloadDeserializer.deserialize is used
rg -l "JsonSchemaPayloadDeserializer.*deserialize" | while read -r file; do
  echo "=== $file ==="
  rg -A 5 "JsonSchemaPayloadDeserializer.*deserialize" "$file"
done

Length of output: 67


Script:

#!/bin/bash
# Let's search for any usage of UniversalSchemaPayloadDeserializer and content type handling
rg -A 5 "ContentTypesSchemas.schemaForPlain|UniversalSchemaPayloadDeserializer" 

# Also search for byte array handling in the codebase
rg -A 5 "Array\[Byte\]|deserialize.*bytes"

Length of output: 235407

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala (2)

8-8: LGTM: Import statement is correctly updated

The addition of ContentTypesSchemas to the import statement is necessary for the schema-less topic handling functionality.


53-59: Verify string content validation requirements

The implementation allows any string content when no schema is provided. Based on the PR objectives, this aligns with the requirement to treat input as Unknown. However, please verify if any basic validation or sanitization is needed for security or data quality purposes.

Run the following script to check for any validation patterns in the codebase:

✅ Verification successful

Let me analyze the findings and generate additional verification scripts to better understand the validation context.


Let me generate one final verification to check the schema validation context:


Based on the analysis of the codebase, I can now generate the final response:

String content validation is appropriately handled through schema validation

The implementation is correct as-is. When no schema is provided, treating input as raw bytes is the intended behavior. For cases with schemas:

  • JSON content is validated against ContentTypesSchemas.schemaForJson
  • Plain text content is validated against ContentTypesSchemas.schemaForPlain
  • Schema validation is enforced through UniversalSchemaValidator and type-specific validators (AvroSchemaValidator, JsonSchemaValidator)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for string validation patterns in Kafka-related files
rg -t scala "validate.*String|sanitize.*String" 

# Search for similar message handling patterns
ast-grep --pattern 'case j if j.isString => $$$'

Length of output: 12377


Script:

#!/bin/bash
# Check for schema validation patterns in the relevant files
rg -A 5 "schemaOpt match" utils/schemed-kafka-components-utils/

# Look for any string content validation in the schema registry related files
rg -t scala "validate.*content|validate.*message" utils/schemed-kafka-components-utils/

# Check the parent class/trait for any validation requirements
ast-grep --pattern 'trait RecordFormatterSupport {
  $$$
}'

Length of output: 2530


Script:

#!/bin/bash
# Check how schema validation is handled in the broader context
rg -t scala -B 5 -A 10 "ParsedSchema" utils/schemed-kafka-components-utils/

# Look for any content validation in the parent directories
rg -t scala "validate.*ParsedSchema|validate.*Schema" utils/

Length of output: 294921

engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala (1)

20-64: LGTM! Clean and focused test implementation

The JSON round-trip test is well-structured and follows best practices:

  • Clear test data setup
  • Proper topic naming
  • Explicit content type specification
  • Robust error handling in JSON parsing
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala (2)

34-34: LGTM!

The import is necessary for the Try-based error handling in the formValueEncoder implementation.


161-176: ⚠️ Potential issue

Several issues need to be addressed in the formValueEncoder implementation.

  1. The code uses unsafe tuple accessors (_1, _2) which should be replaced with pattern matching as suggested in previous reviews.
  2. The type casting with asInstanceOf is unsafe and should include better error handling.
  3. Using "Failure" as a value is incorrect - it should be a Failure instance.
  4. The special case handling for "Value" key needs better documentation.

Here's a suggested refactoring that addresses these issues:

  (value: Any) => {
-    // In ad-hoc test without schema we create object `{ "Value" = userInputInAdHoc }`, so if present we should just take the input
-    Try {
-      val temp = value.asInstanceOf[Map[String, Map[String, Any]]].head
-      val key = temp._1
-      // Any try to create a variable with value temp._2 fails
-      if (key.equals("Value")) {
-        temp._2
-      } else Failure
-    } match {
-      // For normal usage
-      case Failure(_) => encoder.encodeOrError(value, rawSchema)
-      // If source with topic without schema
-      case Success(objectInside) => encoder.encodeOrError(objectInside, rawSchema)
-    }
+    /**
+     * Special handling for ad-hoc testing without schema:
+     * When testing topics without schema, the input is wrapped in {"Value": actualMessage}.
+     * This unwraps such messages to ensure consistent behavior with direct topic publishing.
+     */
+    value match {
+      case map: Map[String, Map[String, Any]] =>
+        map.headOption.flatMap {
+          case ("Value", innerMap) => Some(innerMap)
+          case _ => None
+        }.map(encoder.encodeOrError(_, rawSchema))
+         .getOrElse(encoder.encodeOrError(value, rawSchema))
+      case _ => encoder.encodeOrError(value, rawSchema)
+    }
  }

The refactored version:

  • Uses pattern matching instead of unsafe casting
  • Provides clear documentation for the special case
  • Handles the "Value" key case more safely
  • Removes the incorrect usage of Failure as a value
  • Simplifies the control flow

Let's verify if this pattern is used consistently across the codebase:

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala (1)

9-9: LGTM: Import for serializable schema wrapper

The added import for NkSerializableParsedSchema is appropriate for making schemas serializable, which is necessary for Flink's distributed execution.

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala (2)

115-119: Logic for handling topics with schemas looks good

The implementation correctly processes topics that have associated schemas.


120-139: Proper handling of schemaless topics based on content type

The code appropriately handles schemaless topics by checking the content type and processing JSON and plain text messages accordingly.

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala (3)

39-39: Addition of contentTypeParamName aligns with existing conventions

The introduction of contentTypeParamName as a constant maintains consistency with the parameter naming strategy used throughout the transformer, facilitating clear and maintainable code.


117-146: Effective handling of schema-less topics in getVersionOrContentTypeParam

The getVersionOrContentTypeParam method correctly differentiates between topics with and without schemas. By providing content type options when a schema is absent, it enhances the component's flexibility and user experience.


229-235: Seamless integration of content type parameter in schemaParamStep

The updates in schemaParamStep method efficiently incorporate getVersionOrContentTypeParam, ensuring that the appropriate parameters are generated based on the topic's schema presence without disrupting the existing flow.

Comment on lines +5 to +9
object ContentTypes extends Enumeration {
type ContentType = Value

val JSON, PLAIN = Value
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using sealed trait pattern instead of Enumeration

Scala's Enumeration has several limitations that could impact maintainability and type safety. Consider using a sealed trait/case object pattern instead, which provides better type safety, pattern matching, and IDE support.

Additionally, consider adding scaladoc comments to document the purpose of each content type, especially since they're part of a public API.

Here's a suggested implementation:

-object ContentTypes extends Enumeration {
-  type ContentType = Value
-
-  val JSON, PLAIN = Value
-}
+sealed trait ContentType
+
+object ContentType {
+  /** Represents JSON formatted content */
+  case object JSON extends ContentType
+
+  /** Represents plain text content */
+  case object PLAIN extends ContentType
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
object ContentTypes extends Enumeration {
type ContentType = Value
val JSON, PLAIN = Value
}
sealed trait ContentType
object ContentType {
/** Represents JSON formatted content */
case object JSON extends ContentType
/** Represents plain text content */
case object PLAIN extends ContentType
}

Comment on lines +79 to +82
expectedSchemaData
.getOrElse(writerSchemaData)
.asInstanceOf[RuntimeSchemaData[OpenAPIJsonSchema]]
.schema
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for schema type casting

The type casting to RuntimeSchemaData[OpenAPIJsonSchema] could throw a ClassCastException if the schema is of a different type. Consider adding proper error handling to provide a more informative error message.

-      expectedSchemaData
-        .getOrElse(writerSchemaData)
-        .asInstanceOf[RuntimeSchemaData[OpenAPIJsonSchema]]
-        .schema
+      try {
+        expectedSchemaData
+          .getOrElse(writerSchemaData)
+          .asInstanceOf[RuntimeSchemaData[OpenAPIJsonSchema]]
+          .schema
+      } catch {
+        case e: ClassCastException =>
+          throw new IllegalArgumentException(s"Expected OpenAPIJsonSchema but got ${writerSchemaData.schema.getClass.getSimpleName}", e)
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
expectedSchemaData
.getOrElse(writerSchemaData)
.asInstanceOf[RuntimeSchemaData[OpenAPIJsonSchema]]
.schema
try {
expectedSchemaData
.getOrElse(writerSchemaData)
.asInstanceOf[RuntimeSchemaData[OpenAPIJsonSchema]]
.schema
} catch {
case e: ClassCastException =>
throw new IllegalArgumentException(s"Expected OpenAPIJsonSchema but got ${writerSchemaData.schema.getClass.getSimpleName}", e)
}

Comment on lines +53 to +58
case j if j.isString =>
schemaOpt match {
case None => j.asString.get.getBytes()
case Some(ContentTypesSchemas.schemaForJson) => j.asString.get.getBytes(StandardCharsets.UTF_8)
case _ => j.asString.get.getBytes(StandardCharsets.UTF_8)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify identical cases and consider charset handling

The current implementation has two concerns:

  1. Lines 56 and 57 perform identical operations, which can be simplified.
  2. When no schema is provided (line 55), using getBytes() without charset specification can lead to platform-dependent encoding issues.

Consider applying this refactoring:

  case j if j.isString =>
    schemaOpt match {
-      case None                                    => j.asString.get.getBytes()
-      case Some(ContentTypesSchemas.schemaForJson) => j.asString.get.getBytes(StandardCharsets.UTF_8)
-      case _                                       => j.asString.get.getBytes(StandardCharsets.UTF_8)
+      case None                                    => j.asString.get.getBytes(StandardCharsets.UTF_8)
+      case Some(ContentTypesSchemas.schemaForJson) | _ => j.asString.get.getBytes(StandardCharsets.UTF_8)
    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case j if j.isString =>
schemaOpt match {
case None => j.asString.get.getBytes()
case Some(ContentTypesSchemas.schemaForJson) => j.asString.get.getBytes(StandardCharsets.UTF_8)
case _ => j.asString.get.getBytes(StandardCharsets.UTF_8)
}
case j if j.isString =>
schemaOpt match {
case None => j.asString.get.getBytes(StandardCharsets.UTF_8)
case Some(ContentTypesSchemas.schemaForJson) | _ => j.asString.get.getBytes(StandardCharsets.UTF_8)
}

package pl.touk.nussknacker.defaultmodel

import io.circe.{Json, parser}
import org.apache.kafka.shaded.com.google.protobuf.ByteString
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using shaded dependencies directly

Using shaded dependencies directly (org.apache.kafka.shaded) can lead to versioning issues. Consider using the non-shaded protobuf library instead.

-import org.apache.kafka.shaded.com.google.protobuf.ByteString
+import com.google.protobuf.ByteString
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import org.apache.kafka.shaded.com.google.protobuf.ByteString
import com.google.protobuf.ByteString

_
) =>
val preparedTopic = prepareTopic(topic)
val valueValidationResult = if (contentType.equals(ContentTypes.JSON.toString)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace string comparison with enum/constant

Using string literals for content type comparison is error-prone. Consider using the ContentTypes enum values directly.

-if (contentType.equals(ContentTypes.JSON.toString)) {
+if (ContentTypes.JSON.toString.equals(contentType)) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val valueValidationResult = if (contentType.equals(ContentTypes.JSON.toString)) {
val valueValidationResult = if (ContentTypes.JSON.toString.equals(contentType)) {

Comment on lines +68 to +102
case step @ TransformationStep(
(`topicParamName`, DefinedEagerParameter(topic: String, _)) ::
(`contentTypeParamName`, DefinedEagerParameter(contentType: String, _)) :: _,
_
) =>
val preparedTopic = prepareTopic(topic)
val valueValidationResult = if (contentType.equals(ContentTypes.JSON.toString)) {
Valid(
(
Some(
RuntimeSchemaData[ParsedSchema](
new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForJson),
Some(SchemaId.fromString(ContentTypes.JSON.toString))
)
),
// This is the type after it leaves source
Unknown
)
)
} else {
Valid(
(
Some(
RuntimeSchemaData[ParsedSchema](
new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForPlain),
Some(SchemaId.fromString(ContentTypes.PLAIN.toString))
)
),
// This is the type after it leaves source
// TODO: Should be Array[Byte] when handling is implemented
Typed[Array[java.lang.Byte]]
)
)
}
prepareSourceFinalResults(preparedTopic, valueValidationResult, context, dependencies, step.parameters, Nil)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Additional error handling and test coverage needed

The verification reveals several gaps that need to be addressed:

  1. No explicit error handling for invalid content types is implemented. The code only handles JSON and PLAIN types without validation for unsupported values.
  2. While integration tests exist for both JSON and PLAIN content types (KafkaJsonItSpec), they only cover the "happy path" with round-trip scenarios.
  3. Schema evolution handling is present in the codebase but lacks dedicated test coverage.

Required changes:

  • Add validation and error handling for unsupported content types in UniversalKafkaSourceFactory
  • Extend KafkaJsonItSpec with negative test cases for invalid content types
  • Add test coverage for schema evolution scenarios
🔗 Analysis chain

Verify impact on error handling and testing

The new content type handling introduces a significant change in message processing. Please ensure:

  1. Error cases are properly handled (e.g., invalid content types)
  2. Integration tests cover both content types
  3. Schema evolution scenarios are tested

Let me check the content of these files to better understand the test coverage and error handling.


Let me check one more aspect to verify error handling for invalid content types.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for existing error handling tests
rg -l "contentType.*test" --type scala

# Check for integration tests covering both content types
rg -l "ContentTypes.(JSON|PLAIN)" --type scala

# Look for schema evolution tests
rg -l "schema.*evolution.*test" --type scala

Length of output: 1061


Script:

#!/bin/bash
# Check content of the Kafka JSON integration test
rg "ContentTypes" -A 5 -B 5 engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala

# Check error handling in UniversalKafkaSourceFactory
rg "contentType" -A 5 -B 5 utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala

# Check schema evolution handling
rg "schema.*evolution" -A 5 -B 5 utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala

Length of output: 4773


Script:

#!/bin/bash
# Check for content type validation or error handling
ast-grep --pattern 'case $_ if !contentType.equals(ContentTypes.JSON.toString) && !contentType.equals(ContentTypes.PLAIN.toString) => $_'

# Check for additional test cases in KafkaJsonItSpec
rg "test|should" engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala

Length of output: 468

Comment on lines +44 to +59
val schemaWithMetadata = {
if (schemaRegistryClient.isTopicWithSchema(topic, new AllTopicsSelectionStrategy)) {
schemaRegistryClient.getSchemaById(writerSchemaId.value)
} else {
writerSchemaId.value match {
case StringSchemaId(value) =>
if (value.equals(ContentTypes.PLAIN.toString)) {
SchemaWithMetadata(ContentTypesSchemas.schemaForPlain, SchemaId.fromString(ContentTypes.PLAIN.toString))
} else {
SchemaWithMetadata(ContentTypesSchemas.schemaForJson, SchemaId.fromString(ContentTypes.JSON.toString))
}
case _ =>
throw new IllegalStateException("Topic without schema should have ContentType Json or Plain, was neither")
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Pattern Matching on writerSchemaId

The pattern matching on writerSchemaId.value is incorrect because writerSchemaId.value is likely a String, whereas StringSchemaId is a case class expecting to match against writerSchemaId itself. This will result in a match failure.

Apply this diff to correct the pattern matching:

-        writerSchemaId.value match {
+        writerSchemaId match {
           case StringSchemaId(value) =>
             if (value.equals(ContentTypes.PLAIN.toString)) {
               SchemaWithMetadata(ContentTypesSchemas.schemaForPlain, SchemaId.fromString(ContentTypes.PLAIN.toString))
             } else {
               SchemaWithMetadata(ContentTypesSchemas.schemaForJson, SchemaId.fromString(ContentTypes.JSON.toString))
             }
           case _ =>
             throw new IllegalStateException("Topic without schema should have ContentType Json or Plain, was neither")
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val schemaWithMetadata = {
if (schemaRegistryClient.isTopicWithSchema(topic, new AllTopicsSelectionStrategy)) {
schemaRegistryClient.getSchemaById(writerSchemaId.value)
} else {
writerSchemaId.value match {
case StringSchemaId(value) =>
if (value.equals(ContentTypes.PLAIN.toString)) {
SchemaWithMetadata(ContentTypesSchemas.schemaForPlain, SchemaId.fromString(ContentTypes.PLAIN.toString))
} else {
SchemaWithMetadata(ContentTypesSchemas.schemaForJson, SchemaId.fromString(ContentTypes.JSON.toString))
}
case _ =>
throw new IllegalStateException("Topic without schema should have ContentType Json or Plain, was neither")
}
}
}
val schemaWithMetadata = {
if (schemaRegistryClient.isTopicWithSchema(topic, new AllTopicsSelectionStrategy)) {
schemaRegistryClient.getSchemaById(writerSchemaId.value)
} else {
writerSchemaId match {
case StringSchemaId(value) =>
if (value.equals(ContentTypes.PLAIN.toString)) {
SchemaWithMetadata(ContentTypesSchemas.schemaForPlain, SchemaId.fromString(ContentTypes.PLAIN.toString))
} else {
SchemaWithMetadata(ContentTypesSchemas.schemaForJson, SchemaId.fromString(ContentTypes.JSON.toString))
}
case _ =>
throw new IllegalStateException("Topic without schema should have ContentType Json or Plain, was neither")
}
}
}

Comment on lines +253 to +282
case TransformationStep(
(`topicParamName`, DefinedEagerParameter(_: String, _)) ::
(`contentTypeParamName`, DefinedEagerParameter(contentType: String, _)) ::
(`sinkKeyParamName`, _) ::
(`sinkRawEditorParamName`, DefinedEagerParameter(false, _)) :: Nil,
_
) =>
val schemaData = jsonSchema

schemaSupportDispatcher
.forSchemaType(schemaData.schema.schemaType())
.extractParameter(
schemaData.schema,
rawMode = false,
validationMode = ValidationMode.lax,
rawValueParamDeclaration.createParameter(),
restrictedParamNames
)
.map[TransformationStepResult] { valueParam =>
val state = TransformationState(schemaData, valueParam)
// shouldn't happen except for empty schema, but it can lead to infinite loop...
if (valueParam.toParameters.isEmpty) {
FinalResults(context, Nil, Some(state))
} else {
NextParameters(valueParam.toParameters, state = Some(state))
}
}
.valueOr { errors =>
FinalResults(context, errors.toList)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure PLAIN content type is handled in valueEditorParamStep

In the valueEditorParamStep method, the contentType parameter is used, but only the JSON content type seems to be handled. This might lead to issues if PLAIN content type is selected, as schemaData would incorrectly use the JSON schema.

Update the code to handle PLAIN content type similarly to JSON:

      val schemaData = if (contentType.equalsIgnoreCase(ContentTypes.JSON.toString)) {
        jsonSchema
+     } else if (contentType.equalsIgnoreCase(ContentTypes.PLAIN.toString)) {
+       plainSchema
      } else {
        jsonSchema
      }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +260 to +261
val schemaData = jsonSchema

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid hardcoding jsonSchema for all content types

Currently, schemaData is assigned jsonSchema regardless of the contentType. This may cause incorrect schema usage for other content types.

Assign schemaData based on contentType:

-     val schemaData = jsonSchema
+     val schemaData = if (contentType.equalsIgnoreCase(ContentTypes.JSON.toString)) {
+       jsonSchema
+     } else if (contentType.equalsIgnoreCase(ContentTypes.PLAIN.toString)) {
+       plainSchema
+     } else {
+       // Handle other content types or throw an error
+     }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

created: #7163
⚠️ Be careful! Snapshot changes are not necessarily the cause of the error. Check the logs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants