Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1800] Add template lazy param #7162

Open
wants to merge 8 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1816,7 +1816,8 @@ lazy val flinkBaseComponentsTests = (project in flink("components/base-tests"))
)
.dependsOn(
flinkComponentsTestkit % Test,
flinkTableApiComponents % Test
flinkTableApiComponents % Test,
scenarioCompiler % "test->test"
arkadius marked this conversation as resolved.
Show resolved Hide resolved
)

lazy val flinkKafkaComponents = (project in flink("components/kafka"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.api

import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.TemplateExpression
import pl.touk.nussknacker.engine.api.LazyParameter.{Evaluate, MappedLazyParameter, ProductLazyParameter}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}

Expand Down Expand Up @@ -68,6 +69,25 @@ object LazyParameter {

trait CustomLazyParameter[+T <: AnyRef] extends LazyParameter[T]

trait TemplateLazyParameter[T <: AnyRef] extends LazyParameter[T] {
def templateExpression: TemplateExpression
}

object TemplateLazyParameter {
case class TemplateExpression(parts: List[TemplateExpressionPart])
sealed trait TemplateExpressionPart

object TemplateExpressionPart {
case class Literal(value: String) extends TemplateExpressionPart

trait Placeholder extends TemplateExpressionPart {
val evaluate: Evaluate[String]
}

}

}

final class ProductLazyParameter[T <: AnyRef, Y <: AnyRef](
val arg1: LazyParameter[T],
val arg2: LazyParameter[Y]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pl.touk.nussknacker.engine.flink

import com.typesafe.config.ConfigFactory
import org.apache.flink.api.connector.source.Boundedness
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.{Inside, LoneElement}
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.engine.testcomponents.SpelTemplateAstOperationService
import pl.touk.nussknacker.engine.util.test.TestScenarioRunner
import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage

class TemplateLazyParameterTest
extends AnyFunSuite
with FlinkSpec
with Matchers
with Inside
with ValidatedValuesDetailedMessage
with LoneElement {

private lazy val runner = TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.withExecutionMode(ExecutionMode.Batch)
.withExtraComponents(
List(ComponentDefinition("templateAstOperationService", SpelTemplateAstOperationService))
)
.build()

test("should use spel template ast operation parameter") {
val scenario = ScenarioBuilder
.streaming("test")
.source("source", TestScenarioRunner.testDataSource)
.enricher(
"customService",
"output",
"templateAstOperationService",
"template" -> Expression.spelTemplate(s"Hello#{#input}")
)
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#output".spel)

val result = runner.runWithData(scenario, List(1, 2, 3), Boundedness.BOUNDED)
result.validValue.successes shouldBe List(
"[Hello]-literal[1]-templated",
"[Hello]-literal[2]-templated",
"[Hello]-literal[3]-templated"
)
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package pl.touk.nussknacker.engine.compile.nodecompilation

import pl.touk.nussknacker.engine.api.LazyParameter.{CustomLazyParameter, Evaluate}
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.api.{Context, JobData, MetaData, NodeId}
import pl.touk.nussknacker.engine.compiledgraph.{BaseCompiledParameter, CompiledParameter}
import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.{TemplateExpression, TemplateExpressionPart}
import pl.touk.nussknacker.engine.api.LazyParameter.{CustomLazyParameter, Evaluate, TemplateLazyParameter}
import pl.touk.nussknacker.engine.api.definition.{Parameter => ParameterDef}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.engine.api.{Context, JobData, LazyParameter, NodeId}
import pl.touk.nussknacker.engine.compiledgraph.BaseCompiledParameter
import pl.touk.nussknacker.engine.definition.component.parameter.defaults.EditorBasedLanguageDeterminer
import pl.touk.nussknacker.engine.expression.ExpressionEvaluator
import pl.touk.nussknacker.engine.graph.expression.Expression.Language
import pl.touk.nussknacker.engine.spel.SpelExpression
import pl.touk.nussknacker.engine.spel.SpelTemplateExpressionPart.{Literal, Placeholder}

class EvaluableLazyParameter[T <: AnyRef](
compiledParameter: BaseCompiledParameter,
Expand All @@ -14,19 +20,88 @@ class EvaluableLazyParameter[T <: AnyRef](
override val returnType: TypingResult
) extends CustomLazyParameter[T] {

def this(
compiledParameter: CompiledParameter,
override val evaluate: Evaluate[T] =
LazyParameterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData)

}

class SpelTemplateEvaluableLazyParameter[T <: AnyRef](
compiledParameter: BaseCompiledParameter,
expressionEvaluator: ExpressionEvaluator,
nodeId: NodeId,
jobData: JobData
) extends TemplateLazyParameter[T] {

override val evaluate: Evaluate[T] =
LazyParameterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData)

override def templateExpression: TemplateExpression = compiledParameter.expression match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to evaluate it during each invocation

case expression: SpelExpression =>
expression.templateSubexpressions match {
case Some(subexpressions) =>
val templateParts = subexpressions.map {
case Placeholder(expression) => {
new TemplateExpressionPart.Placeholder {
override val evaluate: Evaluate[String] = context => {
expressionEvaluator.evaluate[String](expression, "expressionId", nodeId.id, context)(jobData).value
}
Comment on lines +43 to +45
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for expression evaluation

The expression evaluation should handle potential errors to prevent runtime exceptions.

-                override val evaluate: Evaluate[String] = context => {
-                  expressionEvaluator.evaluate[String](expression, "expressionId", nodeId.id, context)(jobData).value
-                }
+                override val evaluate: Evaluate[String] = context => {
+                  val result = expressionEvaluator.evaluate[String](expression, "expressionId", nodeId.id, context)(jobData)
+                  if (result.isRight) result.value
+                  else throw new IllegalStateException(s"Failed to evaluate template expression: ${result.left.get}")
+                }

Committable suggestion skipped: line range outside the PR's diff.

}
}
case Literal(value) => TemplateExpressionPart.Literal(value)
}
TemplateExpression(templateParts)
case None =>
throw new IllegalStateException("Non SpEL-template expression received in SpelTemplateLazyParameter")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this exceptions by moving this logic to factory method and check templateSubexpressions instead of expression langage

}
case _ => throw new IllegalStateException("Non SpEL expression received in SpelTemplateLazyParameter")
}

override def returnType: TypingResult = Typed[String]
}

private[this] object LazyParameterEvaluator {

def evaluate[T <: AnyRef](
compiledParameter: BaseCompiledParameter,
expressionEvaluator: ExpressionEvaluator,
nodeId: NodeId,
jobData: JobData
) =
this(compiledParameter, expressionEvaluator, nodeId, jobData, compiledParameter.typingInfo.typingResult)

override val evaluate: Evaluate[T] = { ctx: Context =>
): Evaluate[T] = { ctx: Context =>
expressionEvaluator
.evaluateParameter(compiledParameter, ctx)(nodeId, jobData)
.value
.asInstanceOf[T]
Comment on lines +62 to 71
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for type casting

The unsafe cast to type T could fail at runtime. Consider adding proper error handling.

   def evaluate[T <: AnyRef](
       compiledParameter: BaseCompiledParameter,
       expressionEvaluator: ExpressionEvaluator,
       nodeId: NodeId,
       jobData: JobData
   ): Evaluate[T] = { ctx: Context =>
     expressionEvaluator
       .evaluateParameter(compiledParameter, ctx)(nodeId, jobData)
       .value
-      .asInstanceOf[T]
+      match {
+        case value: T => value
+        case other => throw new ClassCastException(
+          s"Expected type ${compiledParameter.returnType} but got: ${other.getClass.getSimpleName}"
+        )
+      }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def evaluate[T <: AnyRef](
compiledParameter: BaseCompiledParameter,
expressionEvaluator: ExpressionEvaluator,
nodeId: NodeId,
jobData: JobData
) =
this(compiledParameter, expressionEvaluator, nodeId, jobData, compiledParameter.typingInfo.typingResult)
override val evaluate: Evaluate[T] = { ctx: Context =>
): Evaluate[T] = { ctx: Context =>
expressionEvaluator
.evaluateParameter(compiledParameter, ctx)(nodeId, jobData)
.value
.asInstanceOf[T]
def evaluate[T <: AnyRef](
compiledParameter: BaseCompiledParameter,
expressionEvaluator: ExpressionEvaluator,
nodeId: NodeId,
jobData: JobData
): Evaluate[T] = { ctx: Context =>
expressionEvaluator
.evaluateParameter(compiledParameter, ctx)(nodeId, jobData)
.value
match {
case value: T => value
case other => throw new ClassCastException(
s"Expected type ${compiledParameter.returnType} but got: ${other.getClass.getSimpleName}"
)
}

}

}

object EvaluableLazyParameterFactory {

def build[T <: AnyRef](
compiledParameter: BaseCompiledParameter,
expressionEvaluator: ExpressionEvaluator,
nodeId: NodeId,
jobData: JobData,
parameterDefinition: ParameterDef,
typingResult: TypingResult
): LazyParameter[T] = {
EditorBasedLanguageDeterminer.determineLanguageOf(parameterDefinition.editor) match {
arkadius marked this conversation as resolved.
Show resolved Hide resolved
case Language.SpelTemplate =>
new SpelTemplateEvaluableLazyParameter[T](
compiledParameter,
expressionEvaluator,
nodeId,
jobData
)
case _ =>
new EvaluableLazyParameter[T](
compiledParameter,
expressionEvaluator,
nodeId,
jobData,
typingResult
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class EvaluableLazyParameterCreator[T <: AnyRef](
override val returnType: TypingResult
) extends CustomLazyParameter[T] {

def create(deps: EvaluableLazyParameterCreatorDeps): EvaluableLazyParameter[T] = {
def create(deps: EvaluableLazyParameterCreatorDeps): LazyParameter[T] = {
createEvaluableLazyParameter(deps)
}

Expand All @@ -42,13 +42,15 @@ final class EvaluableLazyParameterCreator[T <: AnyRef](
override val shouldBeWrappedWithScalaOption: Boolean = parameterDef.scalaOptionParameter
override val shouldBeWrappedWithJavaOptional: Boolean = parameterDef.javaOptionalParameter
}
new EvaluableLazyParameter[T](
compiledParameter,
deps.expressionEvaluator,
nodeId,
deps.jobData,
returnType
)
EvaluableLazyParameterFactory
.build[T](
compiledParameter = compiledParameter,
expressionEvaluator = deps.expressionEvaluator,
nodeId = nodeId,
jobData = deps.jobData,
parameterDefinition = parameterDef,
typingResult = returnType
)
}

}
Expand Down Expand Up @@ -92,7 +94,8 @@ class DefaultToEvaluateFunctionConverter(deps: EvaluableLazyParameterCreatorDeps
p.fun,
p.transformTypingResult
)
case p: CustomLazyParameter[T] => p
case p: CustomLazyParameter[T] => p
case p: TemplateLazyParameter[T] => p
Comment on lines +96 to +97
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Review pattern matching order for potential issues.

The case p: CustomLazyParameter[T] might catch instances of EvaluableLazyParameterCreator before they reach the more specific case above, potentially preventing proper evaluation of nested creators. Consider:

  1. Moving these cases before the EvaluableLazyParameterCreator case, or
  2. Making the pattern matching more specific to exclude EvaluableLazyParameterCreator

Here's a suggested fix:

-      case p: CustomLazyParameter[T]   => p
-      case p: TemplateLazyParameter[T] => p
+      case p: TemplateLazyParameter[T] => p
+      case p: CustomLazyParameter[T] if !p.isInstanceOf[EvaluableLazyParameterCreator[_]] => p
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case p: CustomLazyParameter[T] => p
case p: TemplateLazyParameter[T] => p
case p: TemplateLazyParameter[T] => p
case p: CustomLazyParameter[T] if !p.isInstanceOf[EvaluableLazyParameterCreator[_]] => p

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,14 @@ class ParameterEvaluator(
): LazyParameter[Nothing] = {
lazyParameterCreationStrategy match {
case EvaluableLazyParameterStrategy =>
new EvaluableLazyParameter(
CompiledParameter(exprValue, definition),
runtimeExpressionEvaluator,
nodeId,
jobData
val compiledParameter = CompiledParameter(exprValue, definition)
EvaluableLazyParameterFactory.build(
compiledParameter = compiledParameter,
expressionEvaluator = runtimeExpressionEvaluator,
nodeId = nodeId,
jobData = jobData,
parameterDefinition = definition,
typingResult = compiledParameter.typingInfo.typingResult
)
case PostponedEvaluatorLazyParameterStrategy =>
new EvaluableLazyParameterCreator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import pl.touk.nussknacker.engine.expression.parse.{CompiledExpression, Expressi
import pl.touk.nussknacker.engine.graph.expression.Expression.Language
import pl.touk.nussknacker.engine.graph.expression.{Expression => GraphExpression}
import pl.touk.nussknacker.engine.spel.SpelExpressionParseError.ExpressionCompilationError
import pl.touk.nussknacker.engine.spel.SpelExpressionParser.Flavour
import pl.touk.nussknacker.engine.spel.SpelExpressionParser.{Flavour, Standard}
import pl.touk.nussknacker.engine.spel.SpelTemplateExpressionPart.{Literal, Placeholder}
import pl.touk.nussknacker.engine.spel.internal.EvaluationContextPreparer

import scala.util.control.NonFatal
Expand Down Expand Up @@ -80,6 +81,13 @@ class SpelExpressionEvaluationException(val expression: String, val ctxId: Strin
cause = cause
)

sealed trait SpelTemplateExpressionPart

object SpelTemplateExpressionPart {
final case class Literal(value: String) extends SpelTemplateExpressionPart
final case class Placeholder(expression: SpelExpression) extends SpelTemplateExpressionPart
}

class SpelExpression(
parsed: ParsedSpelExpression,
expectedReturnType: TypingResult,
Expand All @@ -92,6 +100,37 @@ class SpelExpression(

override val language: Language = flavour.languageId

def templateSubexpressions: Option[List[SpelTemplateExpressionPart]] = {
def createEvaluablePlaceholder(expression: org.springframework.expression.spel.standard.SpelExpression) = {
val parsedTemplateExpr = ParsedSpelExpression(expression.getExpressionString, parsed.parser, expression)
val compiledExpr = new SpelExpression(
parsedTemplateExpr,
typing.Typed[String],
Standard,
evaluationContextPreparer
)
Placeholder(compiledExpr)
}
flavour.languageId match {
case Language.SpelTemplate =>
Some(parsed.parsed match {
case compositeExpr: CompositeStringExpression =>
compositeExpr.getExpressions.toList.map {
case lit: LiteralExpression => Literal(lit.getExpressionString)
case spelExpr: org.springframework.expression.spel.standard.SpelExpression =>
createEvaluablePlaceholder(spelExpr)
}
case singleEvaluableSpelExpr: org.springframework.expression.spel.standard.SpelExpression =>
List(createEvaluablePlaceholder(singleEvaluableSpelExpr))
case singleLiteralExpr: LiteralExpression => List(Literal(singleLiteralExpr.getExpressionString))
case other =>
throw new IllegalArgumentException(s"Unsupported expression type: [${other.getClass.getName}]")

})
case _ => None
}
}

private val expectedClass =
expectedReturnType match {
case r: SingleTypingResult =>
Expand Down
Loading
Loading