Skip to content

Commit

Permalink
Fix deployments for scenarios with dict editors after model reload
Browse files Browse the repository at this point in the history
  • Loading branch information
Elmacioro committed Nov 6, 2024
1 parent 58f8185 commit 0c12933
Show file tree
Hide file tree
Showing 31 changed files with 348 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package pl.touk.nussknacker.engine.api.parameter

import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder}
import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder}

final case class ParameterName(value: String) {
def withBranchId(branchId: String): ParameterName = ParameterName(s"$value for branch $branchId")
}

object ParameterName {
implicit val encoder: Encoder[ParameterName] = deriveUnwrappedEncoder
implicit val decoder: Decoder[ParameterName] = deriveUnwrappedDecoder

implicit val keyEncoder: KeyEncoder[ParameterName] = KeyEncoder.encodeKeyString.contramap(_.value)
implicit val keyDecoder: KeyDecoder[ParameterName] = KeyDecoder.decodeKeyString.map(ParameterName(_))
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.api.component

import io.circe.generic.JsonCodec
import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue
import pl.touk.nussknacker.engine.api.parameter.{
ParameterName,
Expand All @@ -24,6 +25,7 @@ object AdditionalUIConfigProvider {
val empty = new DefaultAdditionalUIConfigProvider(Map.empty, Map.empty)
}

@JsonCodec
case class ComponentAdditionalConfig(
parameterConfigs: Map[ParameterName, ParameterAdditionalUIConfig],
icon: Option[String] = None,
Expand All @@ -32,6 +34,7 @@ case class ComponentAdditionalConfig(
disabled: Boolean = false
)

@JsonCodec
case class ParameterAdditionalUIConfig(
required: Boolean,
initialValue: Option[FixedExpressionValue],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.touk.nussknacker.engine.api.component

import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder}
import io.circe.{Decoder, Encoder}
import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder}

// TODO This class is used as a work around for the problem that the components are duplicated across processing types.
// We plan to get rid of this. After that, we could replace usages of this class by usage of ComponentId
Expand All @@ -14,6 +14,10 @@ object DesignerWideComponentId {
implicit val encoder: Encoder[DesignerWideComponentId] = deriveUnwrappedEncoder
implicit val decoder: Decoder[DesignerWideComponentId] = deriveUnwrappedDecoder

implicit val keyEncoder: KeyEncoder[DesignerWideComponentId] = KeyEncoder.encodeKeyString.contramap(_.value)
implicit val keyDecoder: KeyDecoder[DesignerWideComponentId] =
KeyDecoder.decodeKeyString.map(DesignerWideComponentId(_))

def apply(value: String): DesignerWideComponentId = new DesignerWideComponentId(value.toLowerCase)

def forBuiltInComponent(componentId: ComponentId): DesignerWideComponentId = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.deployment.{
ProcessingTypeDeployedScenariosProvider,
ScenarioActivityManager
}
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import sttp.client3.SttpBackend

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -17,6 +18,7 @@ case class DeploymentManagerDependencies(
executionContext: ExecutionContext,
actorSystem: ActorSystem,
sttpBackend: SttpBackend[Future, Any],
configsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty
) {
implicit def implicitExecutionContext: ExecutionContext = executionContext
implicit def implicitActorSystem: ActorSystem = actorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class DefaultProcessingTypeDeployedScenariosProvider(
DeploymentId.fromActionId(lastDeployAction.id),
deployingUser,
Map.empty,
NodesDeploymentData.empty
NodesDeploymentData.empty,
Map.empty
)
val deployedScenarioDataTry =
scenarioResolver.resolveScenario(details.json).map { resolvedScenario =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ import cats.syntax.functor._
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.component.{
ComponentAdditionalConfig,
DesignerWideComponentId,
NodesDeploymentData
}
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser}
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess}
Expand Down Expand Up @@ -55,6 +60,10 @@ class DeploymentService(
processChangeListener: ProcessChangeListener,
scenarioStateTimeout: Option[FiniteDuration],
deploymentCommentSettings: Option[DeploymentCommentSettings],
additionalComponentConfigs: ProcessingTypeDataProvider[
Map[DesignerWideComponentId, ComponentAdditionalConfig],
_
],
clock: Clock = Clock.systemUTC()
)(implicit system: ActorSystem)
extends ActionService
Expand Down Expand Up @@ -324,7 +333,10 @@ class DeploymentService(
DeploymentId.fromActionId(actionId),
user.toManagerUser,
additionalDeploymentData,
nodesDeploymentData
nodesDeploymentData,
AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors(
additionalComponentConfigs.forProcessingType(processDetails.processingType).getOrElse(Map.empty)
)
)
} yield DeployedScenarioData(processDetails.toEngineProcessVersion, deploymentData, resolvedCanonicalProcess)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import cats.Applicative
import cats.data.{EitherT, NonEmptyList}
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.component.{
ComponentAdditionalConfig,
DesignerWideComponentId,
NodesDeploymentData
}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.engine.api.{ProcessVersion => RuntimeVersionData}
import pl.touk.nussknacker.engine.deployment.{DeploymentData, DeploymentId => LegacyDeploymentId}
import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor
import pl.touk.nussknacker.engine.newdeployment.DeploymentId
import pl.touk.nussknacker.restmodel.validation.ValidationResults.ValidationErrors
import pl.touk.nussknacker.security.Permission
Expand All @@ -19,6 +24,7 @@ import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps
import pl.touk.nussknacker.ui.process.newdeployment.DeploymentEntityFactory.{DeploymentEntityData, WithModifiedAt}
import pl.touk.nussknacker.ui.process.newdeployment.DeploymentService._
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, ScenarioMetadataRepository}
import pl.touk.nussknacker.ui.process.version.ScenarioGraphVersionService
import pl.touk.nussknacker.ui.security.api.LoggedUser
Expand All @@ -42,7 +48,11 @@ class DeploymentService(
deploymentRepository: DeploymentRepository,
dmDispatcher: DeploymentManagerDispatcher,
dbioRunner: DBIOActionRunner,
clock: Clock
clock: Clock,
additionalComponentConfigs: ProcessingTypeDataProvider[
Map[DesignerWideComponentId, ComponentAdditionalConfig],
_
],
)(implicit ec: ExecutionContext)
extends LazyLogging {

Expand Down Expand Up @@ -156,7 +166,10 @@ class DeploymentService(
LegacyDeploymentId(""),
user.toManagerUser,
Map.empty,
NodesDeploymentData.empty
NodesDeploymentData.empty,
AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors(
additionalComponentConfigs.forProcessingType(scenarioMetadata.processingType)(user).getOrElse(Map.empty)
)
)
for {
result <- EitherT[Future, RunDeploymentError, Unit](
Expand Down Expand Up @@ -189,7 +202,10 @@ class DeploymentService(
toLegacyDeploymentId(command.id),
command.user.toManagerUser,
additionalDeploymentData = Map.empty,
command.nodesDeploymentData
command.nodesDeploymentData,
AdditionalDictComponentConfigsExtractor.getAdditionalConfigsWithDictParametersEditors(
additionalComponentConfigs.forProcessingType(scenarioMetadata.processingType)(command.user).getOrElse(Map.empty)
)
)
dmDispatcher
.deploymentManagerUnsafe(scenarioMetadata.processingType)(command.user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import pl.touk.nussknacker.engine.compile.ProcessValidator
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode
import pl.touk.nussknacker.engine.definition.test.ModelDataTestInfoProvider
import pl.touk.nussknacker.engine.dict.ProcessDictSubstitutor
import pl.touk.nussknacker.engine.management.periodic.AdditionalDictComponentConfigsExtractor
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader
import pl.touk.nussknacker.engine.util.multiplicity.{Empty, Many, Multiplicity, One}
import pl.touk.nussknacker.engine.{ConfigWithUnresolvedVersion, DeploymentManagerDependencies, ModelDependencies}
Expand Down Expand Up @@ -232,6 +233,10 @@ class AkkaHttpBasedRouteProvider(
futureProcessRepository
)

val additionalComponentConfigs = processingTypeDataProvider.mapValues { processingTypeData =>
processingTypeData.designerModelData.modelData.additionalConfigsFromProvider
}

val legacyDeploymentService = new LegacyDeploymentService(
dmDispatcher,
processRepository,
Expand All @@ -241,7 +246,8 @@ class AkkaHttpBasedRouteProvider(
scenarioResolver,
processChangeListener,
featureTogglesConfig.scenarioStateTimeout,
featureTogglesConfig.deploymentCommentSettings
featureTogglesConfig.deploymentCommentSettings,
additionalComponentConfigs
)
legacyDeploymentService.invalidateInProgressActions()

Expand Down Expand Up @@ -438,7 +444,8 @@ class AkkaHttpBasedRouteProvider(
deploymentRepository,
dmDispatcher,
dbioRunner,
Clock.systemDefaultZone()
Clock.systemDefaultZone(),
additionalComponentConfigs
)
val activityService =
new ActivityService(
Expand Down Expand Up @@ -707,6 +714,7 @@ class AkkaHttpBasedRouteProvider(
featureTogglesConfig.componentDefinitionExtractionMode
),
getDeploymentManagerDependencies(
additionalUIConfigProvider,
actionServiceProvider,
scenarioActivityRepository,
dbioActionRunner,
Expand All @@ -722,12 +730,14 @@ class AkkaHttpBasedRouteProvider(
}

private def getDeploymentManagerDependencies(
additionalUIConfigProvider: AdditionalUIConfigProvider,
actionServiceProvider: Supplier[ActionService],
scenarioActivityRepository: ScenarioActivityRepository,
dbioActionRunner: DBIOActionRunner,
sttpBackend: SttpBackend[Future, Any],
processingType: ProcessingType
)(implicit executionContext: ExecutionContext) = {
val additionalConfigsFromProvider = additionalUIConfigProvider.getAllForProcessingType(processingType)
DeploymentManagerDependencies(
DefaultProcessingTypeDeployedScenariosProvider(dbRef, processingType),
new DefaultProcessingTypeActionService(
Expand All @@ -740,7 +750,8 @@ class AkkaHttpBasedRouteProvider(
),
system.dispatcher,
system,
sttpBackend
sttpBackend,
additionalConfigsFromProvider
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.{Assertion, BeforeAndAfterEach, OptionValues, Suite}
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.CirceUtil.humanReadablePrinter
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.VersionId.initialVersionId
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode
import pl.touk.nussknacker.engine.definition.test.{ModelDataTestInfoProvider, TestInfoProvider}
import pl.touk.nussknacker.restmodel.CustomActionRequest
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
Expand Down Expand Up @@ -120,7 +118,8 @@ trait NuResourcesTest
scenarioResolverByProcessingType,
processChangeListener,
None,
deploymentCommentSettings
deploymentCommentSettings,
mapProcessingTypeDataProvider()
)

protected val processingTypeConfig: ProcessingTypeConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect.unsafe.IORuntime
import cats.instances.future._
import com.typesafe.config.ConfigFactory
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.api.component.{DesignerWideComponentId, ProcessingMode}
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId, ProcessingMode}
import pl.touk.nussknacker.engine.api.definition.FixedExpressionValue
import pl.touk.nussknacker.engine.api.deployment.{
NoOpScenarioActivityManager,
Expand Down Expand Up @@ -120,6 +120,10 @@ object TestFactory {
Streaming.stringify -> new ScenarioResolver(sampleResolver, Streaming.stringify)
)

def additionalComponentConfigsByProcessingType
: ProcessingTypeDataProvider[Map[DesignerWideComponentId, ComponentAdditionalConfig], _] =
mapProcessingTypeDataProvider()

val modelDependencies: ModelDependencies =
ModelDependencies(
TestAdditionalUIConfigProvider.componentAdditionalConfigMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class NotificationServiceTest
mock[ProcessChangeListener],
None,
None,
TestFactory.additionalComponentConfigsByProcessingType,
clock
) {
override protected def validateBeforeDeploy(
Expand All @@ -216,7 +217,8 @@ class NotificationServiceTest
DeploymentId.fromActionId(actionId),
user.toManagerUser,
additionalDeploymentData,
nodesDeploymentData
nodesDeploymentData,
Map.empty
),
processDetails.json
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class DeploymentServiceSpec
TestFactory.scenarioResolverByProcessingType,
listener,
scenarioStateTimeout,
deploymentCommentSettings
deploymentCommentSettings,
additionalComponentConfigsByProcessingType
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class DeploymentServiceTest
TestFactory.newFutureFetchingScenarioRepository(testDbRef)
),
dbioRunner,
clock
clock,
TestFactory.additionalComponentConfigsByProcessingType
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.process.compiler

import com.typesafe.config.Config
import pl.touk.nussknacker.engine.ModelData.ExtractDefinitionFun
import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import pl.touk.nussknacker.engine.api.dict.EngineDictRegistry
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, ProcessConfigCreator, ProcessObjectDependencies}
Expand Down Expand Up @@ -36,6 +36,7 @@ class FlinkProcessCompilerDataFactory(
modelConfig: Config,
namingStrategy: NamingStrategy,
componentUseCase: ComponentUseCase,
configsFromProviderWithDictionaryEditor: Map[DesignerWideComponentId, ComponentAdditionalConfig]
) extends Serializable {

import net.ceedubs.ficus.Ficus._
Expand All @@ -47,6 +48,7 @@ class FlinkProcessCompilerDataFactory(
modelData.modelConfig,
modelData.namingStrategy,
componentUseCase = ComponentUseCase.EngineRuntime,
modelData.additionalConfigsFromProvider
)

def prepareCompilerData(
Expand Down Expand Up @@ -119,12 +121,11 @@ class FlinkProcessCompilerDataFactory(
): (ModelDefinitionWithClasses, EngineDictRegistry) = {
val dictRegistryFactory = loadDictRegistry(userCodeClassLoader)
val modelDefinitionWithTypes = ModelDefinitionWithClasses(
// additionalConfigsFromProvider aren't provided, as it's not needed to run the process on flink
extractModelDefinition(
userCodeClassLoader,
modelDependencies,
id => DesignerWideComponentId(id.toString),
Map.empty
configsFromProviderWithDictionaryEditor
)
)
val dictRegistry = dictRegistryFactory.createEngineDictRegistry(
Expand Down
Loading

0 comments on commit 0c12933

Please sign in to comment.