From d312b1a152c7b803a4fad69f00b7993a812b13b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 09:21:12 +0100 Subject: [PATCH 1/8] add template lazy param --- build.sbt | 3 +- .../engine/api/LazyParameter.scala | 20 ++++ .../flink/TemplateLazyParameterTest.scala | 55 ++++++++++ .../EvaluableLazyParameter.scala | 95 +++++++++++++++-- .../EvaluableLazyParameterCreator.scala | 21 ++-- .../nodecompilation/ParameterEvaluator.scala | 13 ++- .../engine/spel/SpelExpression.scala | 42 +++++++- .../nussknacker/engine/InterpreterSpec.scala | 100 +++++++++++++++++- .../SpelTemplateAstOperationService.scala | 79 ++++++++++++++ 9 files changed, 401 insertions(+), 27 deletions(-) create mode 100644 engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala create mode 100644 scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala diff --git a/build.sbt b/build.sbt index fbaf1a8e2fa..5a04df406f1 100644 --- a/build.sbt +++ b/build.sbt @@ -1816,7 +1816,8 @@ lazy val flinkBaseComponentsTests = (project in flink("components/base-tests")) ) .dependsOn( flinkComponentsTestkit % Test, - flinkTableApiComponents % Test + flinkTableApiComponents % Test, + scenarioCompiler % "test->test" ) lazy val flinkKafkaComponents = (project in flink("components/kafka")) diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala index 90d756acf0d..6bfc75cebcf 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.api +import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.TemplateExpression import pl.touk.nussknacker.engine.api.LazyParameter.{Evaluate, MappedLazyParameter, ProductLazyParameter} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} @@ -68,6 +69,25 @@ object LazyParameter { trait CustomLazyParameter[+T <: AnyRef] extends LazyParameter[T] + trait TemplateLazyParameter[T <: AnyRef] extends LazyParameter[T] { + def templateExpression: TemplateExpression + } + + object TemplateLazyParameter { + case class TemplateExpression(parts: List[TemplateExpressionPart]) + sealed trait TemplateExpressionPart + + object TemplateExpressionPart { + case class NonTemplatedPart(value: String) extends TemplateExpressionPart + + trait TemplatedPart extends TemplateExpressionPart { + val evaluate: Evaluate[String] + } + + } + + } + final class ProductLazyParameter[T <: AnyRef, Y <: AnyRef]( val arg1: LazyParameter[T], val arg2: LazyParameter[Y] diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala new file mode 100644 index 00000000000..29b218ef2ce --- /dev/null +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala @@ -0,0 +1,55 @@ +package pl.touk.nussknacker.engine.flink + +import com.typesafe.config.ConfigFactory +import org.apache.flink.api.connector.source.Boundedness +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatest.{Inside, LoneElement} +import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ +import pl.touk.nussknacker.engine.graph.expression.Expression +import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode +import pl.touk.nussknacker.engine.spel.SpelExtension._ +import pl.touk.nussknacker.engine.testcomponents.SpelTemplateAstOperationService +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage + +class TemplateLazyParameterTest + extends AnyFunSuite + with FlinkSpec + with Matchers + with Inside + with ValidatedValuesDetailedMessage + with LoneElement { + + private lazy val runner = TestScenarioRunner + .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .withExecutionMode(ExecutionMode.Batch) + .withExtraComponents( + List(ComponentDefinition("templateAstOperationService", SpelTemplateAstOperationService)) + ) + .build() + + test("should use spel template ast operation parameter") { + val scenario = ScenarioBuilder + .streaming("test") + .source("source", TestScenarioRunner.testDataSource) + .enricher( + "customService", + "output", + "templateAstOperationService", + "template" -> Expression.spelTemplate(s"Hello#{#input}") + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#output".spel) + + val result = runner.runWithData(scenario, List(1, 2, 3), Boundedness.BOUNDED) + result.validValue.successes shouldBe List( + "[Hello]-literal[1]-templated", + "[Hello]-literal[2]-templated", + "[Hello]-literal[3]-templated" + ) + } + +} diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala index 1e441c02717..22d7a2fd018 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala @@ -1,10 +1,16 @@ package pl.touk.nussknacker.engine.compile.nodecompilation -import pl.touk.nussknacker.engine.api.LazyParameter.{CustomLazyParameter, Evaluate} -import pl.touk.nussknacker.engine.api.typed.typing.TypingResult -import pl.touk.nussknacker.engine.api.{Context, JobData, MetaData, NodeId} -import pl.touk.nussknacker.engine.compiledgraph.{BaseCompiledParameter, CompiledParameter} +import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.{TemplateExpression, TemplateExpressionPart} +import pl.touk.nussknacker.engine.api.LazyParameter.{CustomLazyParameter, Evaluate, TemplateLazyParameter} +import pl.touk.nussknacker.engine.api.definition.{Parameter => ParameterDef} +import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} +import pl.touk.nussknacker.engine.api.{Context, JobData, LazyParameter, NodeId} +import pl.touk.nussknacker.engine.compiledgraph.BaseCompiledParameter +import pl.touk.nussknacker.engine.definition.component.parameter.defaults.EditorBasedLanguageDeterminer import pl.touk.nussknacker.engine.expression.ExpressionEvaluator +import pl.touk.nussknacker.engine.graph.expression.Expression.Language +import pl.touk.nussknacker.engine.spel.SpelExpression +import pl.touk.nussknacker.engine.spel.SpelTemplateSubexpression.{NonTemplatedValue, TemplatedExpression} class EvaluableLazyParameter[T <: AnyRef]( compiledParameter: BaseCompiledParameter, @@ -14,15 +20,53 @@ class EvaluableLazyParameter[T <: AnyRef]( override val returnType: TypingResult ) extends CustomLazyParameter[T] { - def this( - compiledParameter: CompiledParameter, + override val evaluate: Evaluate[T] = + LazyParmeterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData) + +} + +class SpelTemplateEvaluableLazyParameter[T <: AnyRef]( + compiledParameter: BaseCompiledParameter, + expressionEvaluator: ExpressionEvaluator, + nodeId: NodeId, + jobData: JobData +) extends TemplateLazyParameter[T] { + + override val evaluate: Evaluate[T] = + LazyParmeterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData) + + override def templateExpression: TemplateExpression = compiledParameter.expression match { + case expression: SpelExpression => + expression.subexpressions match { + case Some(subexpressions) => + val templateParts = subexpressions.map { + case TemplatedExpression(expression) => { + new TemplateExpressionPart.TemplatedPart { + override val evaluate: Evaluate[String] = context => { + expressionEvaluator.evaluate[String](expression, "expressionId", nodeId.id, context)(jobData).value + } + } + } + case NonTemplatedValue(value) => TemplateExpressionPart.NonTemplatedPart(value) + } + TemplateExpression(templateParts) + case None => + throw new IllegalStateException("Non SpEL-template expression received in SpelTemplateLazyParameter") + } + case _ => throw new IllegalStateException("Non SpEL expression received in SpelTemplateLazyParameter") + } + + override def returnType: TypingResult = Typed[String] +} + +private[this] object LazyParmeterEvaluator { + + def evaluate[T <: AnyRef]( + compiledParameter: BaseCompiledParameter, expressionEvaluator: ExpressionEvaluator, nodeId: NodeId, jobData: JobData - ) = - this(compiledParameter, expressionEvaluator, nodeId, jobData, compiledParameter.typingInfo.typingResult) - - override val evaluate: Evaluate[T] = { ctx: Context => + ): Evaluate[T] = { ctx: Context => expressionEvaluator .evaluateParameter(compiledParameter, ctx)(nodeId, jobData) .value @@ -30,3 +74,34 @@ class EvaluableLazyParameter[T <: AnyRef]( } } + +object EvaluableLazyParameterFactory { + + def build[T <: AnyRef]( + compiledParameter: BaseCompiledParameter, + expressionEvaluator: ExpressionEvaluator, + nodeId: NodeId, + jobData: JobData, + parameterDefinition: ParameterDef, + typingResult: TypingResult + ): LazyParameter[T] = { + EditorBasedLanguageDeterminer.determineLanguageOf(parameterDefinition.editor) match { + case Language.SpelTemplate => + new SpelTemplateEvaluableLazyParameter[T]( + compiledParameter, + expressionEvaluator, + nodeId, + jobData + ) + case _ => + new EvaluableLazyParameter[T]( + compiledParameter, + expressionEvaluator, + nodeId, + jobData, + typingResult + ) + } + } + +} diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala index 1277c3b5de0..ce25fe92715 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala @@ -20,7 +20,7 @@ final class EvaluableLazyParameterCreator[T <: AnyRef]( override val returnType: TypingResult ) extends CustomLazyParameter[T] { - def create(deps: EvaluableLazyParameterCreatorDeps): EvaluableLazyParameter[T] = { + def create(deps: EvaluableLazyParameterCreatorDeps): LazyParameter[T] = { createEvaluableLazyParameter(deps) } @@ -42,13 +42,15 @@ final class EvaluableLazyParameterCreator[T <: AnyRef]( override val shouldBeWrappedWithScalaOption: Boolean = parameterDef.scalaOptionParameter override val shouldBeWrappedWithJavaOptional: Boolean = parameterDef.javaOptionalParameter } - new EvaluableLazyParameter[T]( - compiledParameter, - deps.expressionEvaluator, - nodeId, - deps.jobData, - returnType - ) + EvaluableLazyParameterFactory + .build[T]( + compiledParameter = compiledParameter, + expressionEvaluator = deps.expressionEvaluator, + nodeId = nodeId, + jobData = deps.jobData, + parameterDefinition = parameterDef, + typingResult = returnType + ) } } @@ -92,7 +94,8 @@ class DefaultToEvaluateFunctionConverter(deps: EvaluableLazyParameterCreatorDeps p.fun, p.transformTypingResult ) - case p: CustomLazyParameter[T] => p + case p: CustomLazyParameter[T] => p + case p: TemplateLazyParameter[T] => p } } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala index 76616515702..9a43f580b5c 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala @@ -95,11 +95,14 @@ class ParameterEvaluator( ): LazyParameter[Nothing] = { lazyParameterCreationStrategy match { case EvaluableLazyParameterStrategy => - new EvaluableLazyParameter( - CompiledParameter(exprValue, definition), - runtimeExpressionEvaluator, - nodeId, - jobData + val compiledParameter = CompiledParameter(exprValue, definition) + EvaluableLazyParameterFactory.build( + compiledParameter = compiledParameter, + expressionEvaluator = runtimeExpressionEvaluator, + nodeId = nodeId, + jobData = jobData, + parameterDefinition = definition, + typingResult = compiledParameter.typingInfo.typingResult ) case PostponedEvaluatorLazyParameterStrategy => new EvaluableLazyParameterCreator( diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala index a521545c97f..eecca121547 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala @@ -28,7 +28,8 @@ import pl.touk.nussknacker.engine.expression.parse.{CompiledExpression, Expressi import pl.touk.nussknacker.engine.graph.expression.Expression.Language import pl.touk.nussknacker.engine.graph.expression.{Expression => GraphExpression} import pl.touk.nussknacker.engine.spel.SpelExpressionParseError.ExpressionCompilationError -import pl.touk.nussknacker.engine.spel.SpelExpressionParser.Flavour +import pl.touk.nussknacker.engine.spel.SpelExpressionParser.{Flavour, Standard} +import pl.touk.nussknacker.engine.spel.SpelTemplateSubexpression.{NonTemplatedValue, TemplatedExpression} import pl.touk.nussknacker.engine.spel.internal.EvaluationContextPreparer import scala.util.control.NonFatal @@ -80,6 +81,17 @@ class SpelExpressionEvaluationException(val expression: String, val ctxId: Strin cause = cause ) +sealed trait SpelTemplateSubexpression + +object SpelTemplateSubexpression { + final case class NonTemplatedValue(val value: String) extends SpelTemplateSubexpression + + final case class TemplatedExpression(expression: SpelExpression) extends SpelTemplateSubexpression { + def evaluate: (Context, Map[String, Any]) => String = expression.evaluate[String] + } + +} + class SpelExpression( parsed: ParsedSpelExpression, expectedReturnType: TypingResult, @@ -92,6 +104,34 @@ class SpelExpression( override val language: Language = flavour.languageId + def subexpressions: Option[List[SpelTemplateSubexpression]] = { + def createTemplatedExpression(expression: org.springframework.expression.spel.standard.SpelExpression) = { + val parsedTemplateExpr = ParsedSpelExpression(expression.getExpressionString, parsed.parser, expression) + val compiledExpr = new SpelExpression( + parsedTemplateExpr, + typing.Typed[String], + Standard, + evaluationContextPreparer + ) + TemplatedExpression(compiledExpr) + } + flavour.languageId match { + case Language.SpelTemplate => + Some(parsed.parsed match { + case compositeExpr: CompositeStringExpression => + compositeExpr.getExpressions.toList.map { + case lit: LiteralExpression => NonTemplatedValue(lit.getExpressionString) + case spelExpr: org.springframework.expression.spel.standard.SpelExpression => + createTemplatedExpression(spelExpr) + } + case spelExpr: org.springframework.expression.spel.standard.SpelExpression => + List(createTemplatedExpression(spelExpr)) + case litExpr: LiteralExpression => List(NonTemplatedValue(litExpr.getExpressionString)) + }) + case _ => None + } + } + private val expectedClass = expectedReturnType match { case r: SingleTypingResult => diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala index e5135ece369..940908bbd4c 100644 --- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala +++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala @@ -6,6 +6,8 @@ import cats.effect.IO import cats.effect.unsafe.IORuntime import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import org.scalatest.prop.TableDrivenPropertyChecks.forAll +import org.scalatest.prop.Tables.Table import org.springframework.expression.spel.standard.SpelExpression import pl.touk.nussknacker.engine.InterpreterSpec._ import pl.touk.nussknacker.engine.api._ @@ -33,8 +35,8 @@ import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode import pl.touk.nussknacker.engine.canonicalgraph.{CanonicalProcess, canonicalnode} import pl.touk.nussknacker.engine.compile._ import pl.touk.nussknacker.engine.compiledgraph.part.{CustomNodePart, ProcessPart, SinkPart} +import pl.touk.nussknacker.engine.definition.component.Components import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode -import pl.touk.nussknacker.engine.definition.component.{ComponentDefinitionWithImplementation, Components} import pl.touk.nussknacker.engine.definition.model.{ModelDefinition, ModelDefinitionWithClasses} import pl.touk.nussknacker.engine.dict.SimpleDictRegistry import pl.touk.nussknacker.engine.graph.evaluatedparam.{Parameter => NodeParameter} @@ -47,6 +49,7 @@ import pl.touk.nussknacker.engine.graph.variable.Field import pl.touk.nussknacker.engine.modelconfig.ComponentsUiConfig import pl.touk.nussknacker.engine.resultcollector.ProductionServiceInvocationCollector import pl.touk.nussknacker.engine.spel.SpelExpressionRepr +import pl.touk.nussknacker.engine.testcomponents.SpelTemplateAstOperationService import pl.touk.nussknacker.engine.testing.ModelDefinitionBuilder import pl.touk.nussknacker.engine.util.service.{ EagerServiceWithStaticParametersAndReturnType, @@ -72,6 +75,7 @@ class InterpreterSpec extends AnyFunSuite with Matchers { ComponentDefinition("spelNodeService", SpelNodeService), ComponentDefinition("withExplicitMethod", WithExplicitDefinitionService), ComponentDefinition("spelTemplateService", ServiceUsingSpelTemplate), + ComponentDefinition("templateAstOperationService", SpelTemplateAstOperationService), ComponentDefinition("optionTypesService", OptionTypesService), ComponentDefinition("optionalTypesService", OptionalTypesService), ComponentDefinition("nullableTypesService", NullableTypesService), @@ -1020,6 +1024,100 @@ class InterpreterSpec extends AnyFunSuite with Matchers { interpretProcess(process, Transaction()) shouldBe "someKey" } + test("spel template ast operation parameter should work for template and literal value") { + val process = ScenarioBuilder + .streaming("test") + .source("start", "transaction-source") + .enricher( + "ex", + "out", + "templateAstOperationService", + "template" -> Expression.spelTemplate(s"Hello#{#input.msisdn}") + ) + .buildSimpleVariable("result-end", resultVariable, "#out".spel) + .emptySink("end-end", "dummySink") + + interpretProcess(process, Transaction(msisdn = "foo")) should equal("[Hello]-literal[foo]-templated") + } + + test("spel template AST operation parameter should handle multiple cases") { + val testCases = Seq( + ( + "templated value and literal value", + s"Hello#{#input.msisdn}", + Transaction(msisdn = "foo"), + "[Hello]-literal[foo]-templated" + ), + ( + "single literal value", + "Hello", + Transaction(msisdn = "foo"), + "[Hello]-literal" + ), + ( + "single templated function call expression", + "#{#input.msisdn.toString()}", + Transaction(msisdn = "foo"), + "[foo]-templated" + ), + ( + "empty value", + "", + Transaction(msisdn = "foo"), + "[]-literal" + ), + ) + for ((description, templateExpression, inputTransaction, expectedOutput) <- testCases) { + withClue(s"Test case: $description") { + val process = ScenarioBuilder + .streaming("test") + .source("start", "transaction-source") + .enricher( + "ex", + "out", + "templateAstOperationService", + "template" -> Expression.spelTemplate(templateExpression) + ) + .buildSimpleVariable("result-end", resultVariable, "#out".spel) + .emptySink("end-end", "dummySink") + + interpretProcess(process, inputTransaction) should equal(expectedOutput) + } + } + } + + test("spel template ast operation parameter should work for single literal value") { + val process = ScenarioBuilder + .streaming("test") + .source("start", "transaction-source") + .enricher( + "ex", + "out", + "templateAstOperationService", + "template" -> Expression.spelTemplate("Hello") + ) + .buildSimpleVariable("result-end", resultVariable, "#out".spel) + .emptySink("end-end", "dummySink") + + interpretProcess(process, Transaction(msisdn = "foo")) should equal("[Hello]-literal") + } + + test("spel template ast operation parameter should work for single templated function call expression") { + val process = ScenarioBuilder + .streaming("test") + .source("start", "transaction-source") + .enricher( + "ex", + "out", + "templateAstOperationService", + "template" -> Expression.spelTemplate("#{#input.msisdn.toString()}") + ) + .buildSimpleVariable("result-end", resultVariable, "#out".spel) + .emptySink("end-end", "dummySink") + + interpretProcess(process, Transaction(msisdn = "foo")) should equal("[foo]-templated") + } + } class ThrowingService extends Service { diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala new file mode 100644 index 00000000000..7ce2da9bc3d --- /dev/null +++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala @@ -0,0 +1,79 @@ +package pl.touk.nussknacker.engine.testcomponents + +import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter +import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.TemplateExpressionPart.{ + NonTemplatedPart, + TemplatedPart +} +import pl.touk.nussknacker.engine.api.{Context, EagerService, NodeId, Params, ServiceInvoker} +import pl.touk.nussknacker.engine.api.context.{OutputVar, ValidationContext} +import pl.touk.nussknacker.engine.api.context.transformation.{ + DefinedLazyParameter, + NodeDependencyValue, + SingleInputDynamicComponent +} +import pl.touk.nussknacker.engine.api.definition.{ + NodeDependency, + OutputVariableNameDependency, + ParameterDeclaration, + SpelTemplateParameterEditor +} +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.process.ComponentUseCase +import pl.touk.nussknacker.engine.api.test.InvocationCollectors +import pl.touk.nussknacker.engine.api.typed.typing + +import scala.concurrent.{ExecutionContext, Future} + +object SpelTemplateAstOperationService extends EagerService with SingleInputDynamicComponent[ServiceInvoker] { + + private val spelTemplateParameter = ParameterDeclaration + .lazyOptional[String](ParameterName("template")) + .withCreator(modify = + _.copy( + editor = Some(SpelTemplateParameterEditor) + ) + ) + + override type State = Any + + override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])( + implicit nodeId: NodeId + ): SpelTemplateAstOperationService.ContextTransformationDefinition = { + case TransformationStep(Nil, _) => NextParameters(List(spelTemplateParameter.createParameter())) + case TransformationStep((ParameterName("template"), DefinedLazyParameter(_)) :: Nil, _) => + FinalResults.forValidation(context, List.empty)(validation = + ctx => + ctx.withVariable( + OutputVariableNameDependency.extract(dependencies), + typing.Typed[String], + Some(ParameterName(OutputVar.VariableFieldName)) + ) + ) + } + + override def implementation( + params: Params, + dependencies: List[NodeDependencyValue], + finalState: Option[Any] + ): ServiceInvoker = new ServiceInvoker { + + override def invoke(context: Context)( + implicit ec: ExecutionContext, + collector: InvocationCollectors.ServiceInvocationCollector, + componentUseCase: ComponentUseCase + ): Future[Any] = { + val lazyParam = spelTemplateParameter + .extractValueUnsafe(params) + .asInstanceOf[TemplateLazyParameter[String]] + val result = lazyParam.templateExpression.parts.map { + case NonTemplatedPart(value) => s"[$value]-literal" + case template: TemplatedPart => s"[${template.evaluate(context)}]-templated" + }.mkString + Future.successful(result) + } + + } + + override def nodeDependencies: List[NodeDependency] = List(OutputVariableNameDependency) +} From 01f949b71edc394bc875dab103860179cc79d100 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 11:51:00 +0100 Subject: [PATCH 2/8] coderabbit review fixes --- .../compile/nodecompilation/EvaluableLazyParameter.scala | 8 ++++---- .../pl/touk/nussknacker/engine/spel/SpelExpression.scala | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala index 22d7a2fd018..9939448c3e5 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala @@ -21,7 +21,7 @@ class EvaluableLazyParameter[T <: AnyRef]( ) extends CustomLazyParameter[T] { override val evaluate: Evaluate[T] = - LazyParmeterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData) + LazyParameterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData) } @@ -33,11 +33,11 @@ class SpelTemplateEvaluableLazyParameter[T <: AnyRef]( ) extends TemplateLazyParameter[T] { override val evaluate: Evaluate[T] = - LazyParmeterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData) + LazyParameterEvaluator.evaluate(compiledParameter, expressionEvaluator, nodeId, jobData) override def templateExpression: TemplateExpression = compiledParameter.expression match { case expression: SpelExpression => - expression.subexpressions match { + expression.templateSubexpressions match { case Some(subexpressions) => val templateParts = subexpressions.map { case TemplatedExpression(expression) => { @@ -59,7 +59,7 @@ class SpelTemplateEvaluableLazyParameter[T <: AnyRef]( override def returnType: TypingResult = Typed[String] } -private[this] object LazyParmeterEvaluator { +private[this] object LazyParameterEvaluator { def evaluate[T <: AnyRef]( compiledParameter: BaseCompiledParameter, diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala index eecca121547..d57b328bf5f 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala @@ -104,7 +104,7 @@ class SpelExpression( override val language: Language = flavour.languageId - def subexpressions: Option[List[SpelTemplateSubexpression]] = { + def templateSubexpressions: Option[List[SpelTemplateSubexpression]] = { def createTemplatedExpression(expression: org.springframework.expression.spel.standard.SpelExpression) = { val parsedTemplateExpr = ParsedSpelExpression(expression.getExpressionString, parsed.parser, expression) val compiledExpr = new SpelExpression( @@ -127,6 +127,9 @@ class SpelExpression( case spelExpr: org.springframework.expression.spel.standard.SpelExpression => List(createTemplatedExpression(spelExpr)) case litExpr: LiteralExpression => List(NonTemplatedValue(litExpr.getExpressionString)) + case other => + throw new IllegalArgumentException(s"Unsupported expression type: [${other.getClass.getName}]") + }) case _ => None } From b62efde12a10614da103552f321f90ccc7deeed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 12:36:56 +0100 Subject: [PATCH 3/8] remove duplicated tests --- .../nussknacker/engine/InterpreterSpec.scala | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala index 940908bbd4c..aba2510a0bf 100644 --- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala +++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/InterpreterSpec.scala @@ -1024,22 +1024,6 @@ class InterpreterSpec extends AnyFunSuite with Matchers { interpretProcess(process, Transaction()) shouldBe "someKey" } - test("spel template ast operation parameter should work for template and literal value") { - val process = ScenarioBuilder - .streaming("test") - .source("start", "transaction-source") - .enricher( - "ex", - "out", - "templateAstOperationService", - "template" -> Expression.spelTemplate(s"Hello#{#input.msisdn}") - ) - .buildSimpleVariable("result-end", resultVariable, "#out".spel) - .emptySink("end-end", "dummySink") - - interpretProcess(process, Transaction(msisdn = "foo")) should equal("[Hello]-literal[foo]-templated") - } - test("spel template AST operation parameter should handle multiple cases") { val testCases = Seq( ( @@ -1086,38 +1070,6 @@ class InterpreterSpec extends AnyFunSuite with Matchers { } } - test("spel template ast operation parameter should work for single literal value") { - val process = ScenarioBuilder - .streaming("test") - .source("start", "transaction-source") - .enricher( - "ex", - "out", - "templateAstOperationService", - "template" -> Expression.spelTemplate("Hello") - ) - .buildSimpleVariable("result-end", resultVariable, "#out".spel) - .emptySink("end-end", "dummySink") - - interpretProcess(process, Transaction(msisdn = "foo")) should equal("[Hello]-literal") - } - - test("spel template ast operation parameter should work for single templated function call expression") { - val process = ScenarioBuilder - .streaming("test") - .source("start", "transaction-source") - .enricher( - "ex", - "out", - "templateAstOperationService", - "template" -> Expression.spelTemplate("#{#input.msisdn.toString()}") - ) - .buildSimpleVariable("result-end", resultVariable, "#out".spel) - .emptySink("end-end", "dummySink") - - interpretProcess(process, Transaction(msisdn = "foo")) should equal("[foo]-templated") - } - } class ThrowingService extends Service { From a7c4ee6e0b2c6440a68c05cd6fc3c5b492b7625c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 12:54:07 +0100 Subject: [PATCH 4/8] renames and remove unused def --- .../engine/api/LazyParameter.scala | 4 +-- .../EvaluableLazyParameter.scala | 8 ++--- .../engine/spel/SpelExpression.scala | 30 ++++++++----------- .../SpelTemplateAstOperationService.scala | 9 ++---- 4 files changed, 22 insertions(+), 29 deletions(-) diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala index 6bfc75cebcf..92024e232cf 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/LazyParameter.scala @@ -78,9 +78,9 @@ object LazyParameter { sealed trait TemplateExpressionPart object TemplateExpressionPart { - case class NonTemplatedPart(value: String) extends TemplateExpressionPart + case class Literal(value: String) extends TemplateExpressionPart - trait TemplatedPart extends TemplateExpressionPart { + trait Placeholder extends TemplateExpressionPart { val evaluate: Evaluate[String] } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala index 9939448c3e5..744317e8793 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.definition.component.parameter.defaults.Editor import pl.touk.nussknacker.engine.expression.ExpressionEvaluator import pl.touk.nussknacker.engine.graph.expression.Expression.Language import pl.touk.nussknacker.engine.spel.SpelExpression -import pl.touk.nussknacker.engine.spel.SpelTemplateSubexpression.{NonTemplatedValue, TemplatedExpression} +import pl.touk.nussknacker.engine.spel.SpelTemplateExpressionPart.{Literal, Placeholder} class EvaluableLazyParameter[T <: AnyRef]( compiledParameter: BaseCompiledParameter, @@ -40,14 +40,14 @@ class SpelTemplateEvaluableLazyParameter[T <: AnyRef]( expression.templateSubexpressions match { case Some(subexpressions) => val templateParts = subexpressions.map { - case TemplatedExpression(expression) => { - new TemplateExpressionPart.TemplatedPart { + case Placeholder(expression) => { + new TemplateExpressionPart.Placeholder { override val evaluate: Evaluate[String] = context => { expressionEvaluator.evaluate[String](expression, "expressionId", nodeId.id, context)(jobData).value } } } - case NonTemplatedValue(value) => TemplateExpressionPart.NonTemplatedPart(value) + case Literal(value) => TemplateExpressionPart.Literal(value) } TemplateExpression(templateParts) case None => diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala index d57b328bf5f..95ba7da8073 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala @@ -29,7 +29,7 @@ import pl.touk.nussknacker.engine.graph.expression.Expression.Language import pl.touk.nussknacker.engine.graph.expression.{Expression => GraphExpression} import pl.touk.nussknacker.engine.spel.SpelExpressionParseError.ExpressionCompilationError import pl.touk.nussknacker.engine.spel.SpelExpressionParser.{Flavour, Standard} -import pl.touk.nussknacker.engine.spel.SpelTemplateSubexpression.{NonTemplatedValue, TemplatedExpression} +import pl.touk.nussknacker.engine.spel.SpelTemplateExpressionPart.{Literal, Placeholder} import pl.touk.nussknacker.engine.spel.internal.EvaluationContextPreparer import scala.util.control.NonFatal @@ -81,15 +81,11 @@ class SpelExpressionEvaluationException(val expression: String, val ctxId: Strin cause = cause ) -sealed trait SpelTemplateSubexpression - -object SpelTemplateSubexpression { - final case class NonTemplatedValue(val value: String) extends SpelTemplateSubexpression - - final case class TemplatedExpression(expression: SpelExpression) extends SpelTemplateSubexpression { - def evaluate: (Context, Map[String, Any]) => String = expression.evaluate[String] - } +sealed trait SpelTemplateExpressionPart +object SpelTemplateExpressionPart { + final case class Literal(value: String) extends SpelTemplateExpressionPart + final case class Placeholder(expression: SpelExpression) extends SpelTemplateExpressionPart } class SpelExpression( @@ -104,8 +100,8 @@ class SpelExpression( override val language: Language = flavour.languageId - def templateSubexpressions: Option[List[SpelTemplateSubexpression]] = { - def createTemplatedExpression(expression: org.springframework.expression.spel.standard.SpelExpression) = { + def templateSubexpressions: Option[List[SpelTemplateExpressionPart]] = { + def createEvaluablePlaceholder(expression: org.springframework.expression.spel.standard.SpelExpression) = { val parsedTemplateExpr = ParsedSpelExpression(expression.getExpressionString, parsed.parser, expression) val compiledExpr = new SpelExpression( parsedTemplateExpr, @@ -113,20 +109,20 @@ class SpelExpression( Standard, evaluationContextPreparer ) - TemplatedExpression(compiledExpr) + Placeholder(compiledExpr) } flavour.languageId match { case Language.SpelTemplate => Some(parsed.parsed match { case compositeExpr: CompositeStringExpression => compositeExpr.getExpressions.toList.map { - case lit: LiteralExpression => NonTemplatedValue(lit.getExpressionString) + case lit: LiteralExpression => Literal(lit.getExpressionString) case spelExpr: org.springframework.expression.spel.standard.SpelExpression => - createTemplatedExpression(spelExpr) + createEvaluablePlaceholder(spelExpr) } - case spelExpr: org.springframework.expression.spel.standard.SpelExpression => - List(createTemplatedExpression(spelExpr)) - case litExpr: LiteralExpression => List(NonTemplatedValue(litExpr.getExpressionString)) + case singleEvaluableSpelExpr: org.springframework.expression.spel.standard.SpelExpression => + List(createEvaluablePlaceholder(singleEvaluableSpelExpr)) + case singleLiteralExpr: LiteralExpression => List(Literal(singleLiteralExpr.getExpressionString)) case other => throw new IllegalArgumentException(s"Unsupported expression type: [${other.getClass.getName}]") diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala index 7ce2da9bc3d..27eff4154cf 100644 --- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala +++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/testcomponents/SpelTemplateAstOperationService.scala @@ -1,10 +1,7 @@ package pl.touk.nussknacker.engine.testcomponents import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter -import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.TemplateExpressionPart.{ - NonTemplatedPart, - TemplatedPart -} +import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.TemplateExpressionPart._ import pl.touk.nussknacker.engine.api.{Context, EagerService, NodeId, Params, ServiceInvoker} import pl.touk.nussknacker.engine.api.context.{OutputVar, ValidationContext} import pl.touk.nussknacker.engine.api.context.transformation.{ @@ -67,8 +64,8 @@ object SpelTemplateAstOperationService extends EagerService with SingleInputDyna .extractValueUnsafe(params) .asInstanceOf[TemplateLazyParameter[String]] val result = lazyParam.templateExpression.parts.map { - case NonTemplatedPart(value) => s"[$value]-literal" - case template: TemplatedPart => s"[${template.evaluate(context)}]-templated" + case Literal(value) => s"[$value]-literal" + case template: Placeholder => s"[${template.evaluate(context)}]-templated" }.mkString Future.successful(result) } From 7d06c80ef00cd3a6709cebbb8ebd0a0a08235daa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 13:01:58 +0100 Subject: [PATCH 5/8] refactor templateSubexpressions --- .../engine/spel/SpelExpression.scala | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala index 95ba7da8073..f11916690d7 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala @@ -101,33 +101,23 @@ class SpelExpression( override val language: Language = flavour.languageId def templateSubexpressions: Option[List[SpelTemplateExpressionPart]] = { - def createEvaluablePlaceholder(expression: org.springframework.expression.spel.standard.SpelExpression) = { - val parsedTemplateExpr = ParsedSpelExpression(expression.getExpressionString, parsed.parser, expression) - val compiledExpr = new SpelExpression( - parsedTemplateExpr, - typing.Typed[String], - Standard, - evaluationContextPreparer - ) - Placeholder(compiledExpr) + def parseTemplate(expression: Expression): List[SpelTemplateExpressionPart] = expression match { + case lit: LiteralExpression => List(Literal(lit.getExpressionString)) + case spelExpr: org.springframework.expression.spel.standard.SpelExpression => + val parsedTemplateExpr = ParsedSpelExpression(spelExpr.getExpressionString, parsed.parser, spelExpr) + val compiledExpr = new SpelExpression( + parsedTemplateExpr, + typing.Typed[String], + Standard, + evaluationContextPreparer + ) + List(Placeholder(compiledExpr)) + case compositeExpr: CompositeStringExpression => compositeExpr.getExpressions.toList.flatMap(parseTemplate) + case other => throw new IllegalArgumentException(s"Unsupported expression type: [${other.getClass.getName}]") } flavour.languageId match { - case Language.SpelTemplate => - Some(parsed.parsed match { - case compositeExpr: CompositeStringExpression => - compositeExpr.getExpressions.toList.map { - case lit: LiteralExpression => Literal(lit.getExpressionString) - case spelExpr: org.springframework.expression.spel.standard.SpelExpression => - createEvaluablePlaceholder(spelExpr) - } - case singleEvaluableSpelExpr: org.springframework.expression.spel.standard.SpelExpression => - List(createEvaluablePlaceholder(singleEvaluableSpelExpr)) - case singleLiteralExpr: LiteralExpression => List(Literal(singleLiteralExpr.getExpressionString)) - case other => - throw new IllegalArgumentException(s"Unsupported expression type: [${other.getClass.getName}]") - - }) - case _ => None + case Language.SpelTemplate => Some(parseTemplate(parsed.parsed)) + case _ => None } } From 8d630c1fe3dd4daf4194f447c8af59791a21fcc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 13:04:06 +0100 Subject: [PATCH 6/8] create lazy parameter based on expression language --- .../compile/nodecompilation/EvaluableLazyParameter.scala | 5 +---- .../nodecompilation/EvaluableLazyParameterCreator.scala | 1 - .../engine/compile/nodecompilation/ParameterEvaluator.scala | 1 - 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala index 744317e8793..8bc2e679af2 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameter.scala @@ -2,11 +2,9 @@ package pl.touk.nussknacker.engine.compile.nodecompilation import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.{TemplateExpression, TemplateExpressionPart} import pl.touk.nussknacker.engine.api.LazyParameter.{CustomLazyParameter, Evaluate, TemplateLazyParameter} -import pl.touk.nussknacker.engine.api.definition.{Parameter => ParameterDef} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.api.{Context, JobData, LazyParameter, NodeId} import pl.touk.nussknacker.engine.compiledgraph.BaseCompiledParameter -import pl.touk.nussknacker.engine.definition.component.parameter.defaults.EditorBasedLanguageDeterminer import pl.touk.nussknacker.engine.expression.ExpressionEvaluator import pl.touk.nussknacker.engine.graph.expression.Expression.Language import pl.touk.nussknacker.engine.spel.SpelExpression @@ -82,10 +80,9 @@ object EvaluableLazyParameterFactory { expressionEvaluator: ExpressionEvaluator, nodeId: NodeId, jobData: JobData, - parameterDefinition: ParameterDef, typingResult: TypingResult ): LazyParameter[T] = { - EditorBasedLanguageDeterminer.determineLanguageOf(parameterDefinition.editor) match { + compiledParameter.expression.language match { case Language.SpelTemplate => new SpelTemplateEvaluableLazyParameter[T]( compiledParameter, diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala index ce25fe92715..bf52599eff8 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/EvaluableLazyParameterCreator.scala @@ -48,7 +48,6 @@ final class EvaluableLazyParameterCreator[T <: AnyRef]( expressionEvaluator = deps.expressionEvaluator, nodeId = nodeId, jobData = deps.jobData, - parameterDefinition = parameterDef, typingResult = returnType ) } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala index 9a43f580b5c..1be3a45e5d7 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/ParameterEvaluator.scala @@ -101,7 +101,6 @@ class ParameterEvaluator( expressionEvaluator = runtimeExpressionEvaluator, nodeId = nodeId, jobData = jobData, - parameterDefinition = definition, typingResult = compiledParameter.typingInfo.typingResult ) case PostponedEvaluatorLazyParameterStrategy => From 8bc5ecd5b689c80da91ae993cf99a61cf3b58e37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 13:05:49 +0100 Subject: [PATCH 7/8] dont extend redundant traits in test --- .../engine/flink/TemplateLazyParameterTest.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala index 29b218ef2ce..e33d5801c5f 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala @@ -4,7 +4,6 @@ import com.typesafe.config.ConfigFactory import org.apache.flink.api.connector.source.Boundedness import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import org.scalatest.{Inside, LoneElement} import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.test.FlinkSpec @@ -16,13 +15,7 @@ import pl.touk.nussknacker.engine.testcomponents.SpelTemplateAstOperationService import pl.touk.nussknacker.engine.util.test.TestScenarioRunner import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage -class TemplateLazyParameterTest - extends AnyFunSuite - with FlinkSpec - with Matchers - with Inside - with ValidatedValuesDetailedMessage - with LoneElement { +class TemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers with ValidatedValuesDetailedMessage { private lazy val runner = TestScenarioRunner .flinkBased(ConfigFactory.empty(), flinkMiniCluster) From 9e917bc4b5aba417706820b9c13c6567dec8de0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Mon, 18 Nov 2024 15:41:44 +0100 Subject: [PATCH 8/8] wip non passing test --- .../flink/TemplateLazyParameterTest.scala | 99 +++++++++++++++++-- 1 file changed, 90 insertions(+), 9 deletions(-) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala index e33d5801c5f..17c5a8c3125 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/TemplateLazyParameterTest.scala @@ -2,16 +2,39 @@ package pl.touk.nussknacker.engine.flink import com.typesafe.config.ConfigFactory import org.apache.flink.api.connector.source.Boundedness +import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter +import pl.touk.nussknacker.engine.api.LazyParameter.TemplateLazyParameter.TemplateExpressionPart.{Literal, Placeholder} +import pl.touk.nussknacker.engine.api.{Context, LazyParameter, MethodToInvoke, NodeId, Params, ValueWithContext} +import pl.touk.nussknacker.engine.api.component.{BoundedStreamComponent, Component, ComponentDefinition} +import pl.touk.nussknacker.engine.api.context.{OutputVar, ValidationContext} +import pl.touk.nussknacker.engine.api.context.transformation.{ + DefinedLazyParameter, + NodeDependencyValue, + SingleInputDynamicComponent +} +import pl.touk.nussknacker.engine.api.definition.{ + NodeDependency, + OutputVariableNameDependency, + ParameterDeclaration, + SpelTemplateParameterEditor +} +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.process.{Sink, SinkFactory, Source, SourceFactory} +import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSink, StandardFlinkSource} import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode import pl.touk.nussknacker.engine.spel.SpelExtension._ -import pl.touk.nussknacker.engine.testcomponents.SpelTemplateAstOperationService import pl.touk.nussknacker.engine.util.test.TestScenarioRunner import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage @@ -21,7 +44,7 @@ class TemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers .flinkBased(ConfigFactory.empty(), flinkMiniCluster) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents( - List(ComponentDefinition("templateAstOperationService", SpelTemplateAstOperationService)) + List(ComponentDefinition("templateAstOperationSink", SpelTemplateAstOperationSink)) ) .build() @@ -29,15 +52,14 @@ class TemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers val scenario = ScenarioBuilder .streaming("test") .source("source", TestScenarioRunner.testDataSource) - .enricher( - "customService", - "output", - "templateAstOperationService", - "template" -> Expression.spelTemplate(s"Hello#{#input}") + .emptySink( + "end", + "templateAstOperationSink", + "templateOutput" -> Expression.spelTemplate(s"Hello#{#input}") ) - .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#output".spel) val result = runner.runWithData(scenario, List(1, 2, 3), Boundedness.BOUNDED) + println(result) result.validValue.successes shouldBe List( "[Hello]-literal[1]-templated", "[Hello]-literal[2]-templated", @@ -46,3 +68,62 @@ class TemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers } } + +object SpelTemplateAstOperationSink + extends SingleInputDynamicComponent[Sink] + with SinkFactory + with BoundedStreamComponent { + + private val spelTemplateParameter = ParameterDeclaration + .lazyOptional[String](ParameterName("templateOutput")) + .withCreator(modify = + _.copy( + editor = Some(SpelTemplateParameterEditor) + ) + ) + + override type State = Unit + + override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])( + implicit nodeId: NodeId + ): SpelTemplateAstOperationSink.ContextTransformationDefinition = { + case TransformationStep(Nil, _) => NextParameters(List(spelTemplateParameter.createParameter())) + case TransformationStep((ParameterName("templateOutput"), DefinedLazyParameter(_)) :: Nil, _) => + FinalResults(context, List.empty) + } + + override def nodeDependencies: List[NodeDependency] = List.empty + + override def implementation(params: Params, dependencies: List[NodeDependencyValue], finalState: Option[Unit]): Sink = + new FlinkSink { + override type Value = String + + val valueFromParam: LazyParameter[String] = spelTemplateParameter.extractValueUnsafe(params) + + override def prepareValue( + dataStream: DataStream[Context], + flinkCustomNodeContext: FlinkCustomNodeContext + ): DataStream[ValueWithContext[Value]] = { + dataStream.flatMap( + flinkCustomNodeContext.lazyParameterHelper.lazyMapFunction(valueFromParam), + flinkCustomNodeContext.valueWithContextInfo.forType(valueFromParam.returnType) + ) + } + + override def registerSink( + dataStream: DataStream[ValueWithContext[String]], + flinkNodeContext: FlinkCustomNodeContext + ): DataStreamSink[_] = { + println(dataStream) + dataStream.addSink(new SinkFunction[ValueWithContext[String]] { + override def invoke(value: ValueWithContext[String], context: SinkFunction.Context): Unit = { + println(value) + println("debug") + + } + }) + } + + } + +}