Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
Base support for templating (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Jul 3, 2023
1 parent be541f1 commit 146df93
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package com.datastax.oss.sga.api.model;

import lombok.Builder;
import lombok.Data;

import java.util.HashMap;
import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,26 @@ public class TopicDefinition extends Connection.Connectable {
public static final String CREATE_MODE_CREATE_IF_NOT_EXISTS = "create-if-not-exists";

public TopicDefinition() {
if (creationMode == null) {
creationMode = CREATE_MODE_NONE;
}
creationMode = CREATE_MODE_NONE;
connectableType = Connection.Connectables.TOPIC;
}

public TopicDefinition(String name, String creationMode, SchemaDefinition schema) {
this();
this.name = name;
this.creationMode = creationMode;
if (creationMode == null) {
this.creationMode = CREATE_MODE_NONE;
} else {
this.creationMode = creationMode;
}
this.schema = schema;
validateCreationMode();
}

private String name;

@JsonProperty("creation-mode")
private String creationMode = CREATE_MODE_NONE;
private String creationMode;
private SchemaDefinition schema;

private void validateCreationMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
public class ClusterRuntimeRegistry {

private final Map<String, ClusterRuntime<?>> registry = new ConcurrentHashMap<>();
protected final Map<String, ClusterRuntime<?>> registry = new ConcurrentHashMap<>();

public ClusterRuntime getClusterRuntime(StreamingCluster streamingCluster) {
Objects.requireNonNull(streamingCluster);
Expand Down
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.samskivert</groupId>
<artifactId>jmustache</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.datastax.oss.sga.impl.common;

import com.datastax.oss.sga.api.model.AgentConfiguration;
import com.datastax.oss.sga.api.model.ApplicationInstance;
import com.datastax.oss.sga.api.model.Instance;
import com.datastax.oss.sga.api.model.Module;
import com.datastax.oss.sga.api.model.Pipeline;
import com.datastax.oss.sga.api.model.Resource;
import com.datastax.oss.sga.api.model.StreamingCluster;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.samskivert.mustache.Mustache;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import lombok.SneakyThrows;

public class ApplicationInstancePlaceholderResolver {

private static final ObjectMapper mapper = new ObjectMapper();

private ApplicationInstancePlaceholderResolver() {
}

@SneakyThrows
public static ApplicationInstance resolvePlaceholders(ApplicationInstance instance) {
instance = deepCopy(instance);
final Map<String, Object> context = createContext(instance);


instance.setInstance(resolveInstance(instance, context));
instance.setResources(resolveResources(instance, context));
instance.setModules(resolveModules(instance, context));
return instance;
}

static Map<String, Object> createContext(ApplicationInstance application) throws IOException {
Map<String, Object> context = new HashMap<>();
final Instance instance = application.getInstance();
if (instance != null) {
context.put("cluster", instance.streamingCluster());
context.put("globals", instance.globals());
}

Map<String, Map<String, Object>> secrets = new HashMap<>();
if (application.getSecrets() != null && application.getSecrets().secrets() != null) {
application.getSecrets().secrets().forEach((k, v) -> secrets.put(k, v.data()));
}
context.put("secrets", secrets);
context = deepCopy(context);
return context;
}

private static Map<String, Module> resolveModules(ApplicationInstance instance, Map<String, Object> context) {
Map<String, Module> newModules = new LinkedHashMap<>();
for (Map.Entry<String, Module> moduleEntry : instance.getModules().entrySet()) {
final Module module = moduleEntry.getValue();
for (Map.Entry<String, Pipeline> pipelineEntry : module.getPipelines().entrySet()) {
final Pipeline pipeline = pipelineEntry.getValue();
Map<String, AgentConfiguration> newAgents = new LinkedHashMap<>();
for (Map.Entry<String, AgentConfiguration> stringAgentConfigurationEntry : pipeline.getAgents()
.entrySet()) {
final AgentConfiguration value = stringAgentConfigurationEntry.getValue();
value.setConfiguration(resolveMap(context, value.getConfiguration()));
newAgents.put(stringAgentConfigurationEntry.getKey(), value);
}
pipeline.setAgents(newAgents);
}
newModules.put(moduleEntry.getKey(), module);
}
return newModules;
}

private static Instance resolveInstance(ApplicationInstance applicationInstance, Map<String, Object> context) {
final StreamingCluster newCluster;
final Instance instance = applicationInstance.getInstance();
if (instance == null) {
return null;
}
final StreamingCluster cluster = instance.streamingCluster();
if (cluster != null) {
newCluster = new StreamingCluster(cluster.type(), resolveMap(context, cluster.configuration()));
} else {
newCluster = null;
}
return new Instance(
newCluster,
resolveMap(context, instance.globals())
);
}

private static Map<String, Resource> resolveResources(ApplicationInstance instance,
Map<String, Object> context) {
Map<String, Resource> newResources = new HashMap<>();
for (Map.Entry<String, Resource> resourceEntry : instance.getResources().entrySet()) {
final Resource resource = resourceEntry.getValue();
newResources.put(resourceEntry.getKey(),
new Resource(
resource.id(),
resource.name(),
resource.type(),
resolveMap(context, resource.configuration())
)
);
}
return newResources;
}

static Map<String, Object> resolveMap(Map<String, Object> context, Map<String, Object> config) {

Map<String, Object> resolvedConfig = new HashMap<>();
if (config == null) {
return resolvedConfig;
}
for (Map.Entry<String, Object> stringObjectEntry : config.entrySet()) {
resolvedConfig.put(stringObjectEntry.getKey(), resolveValue(context, stringObjectEntry.getValue() + ""));
}
return resolvedConfig;
}

static String resolveValue(Map<String, Object> context, String template) {
return Mustache.compiler()
.compile(template)
.execute(context);
}

private static ApplicationInstance deepCopy(ApplicationInstance instance) throws IOException {
return mapper.readValue(mapper.writeValueAsBytes(instance), ApplicationInstance.class);
}

private static Map<String, Object> deepCopy(Map<String, Object> context) throws IOException {
return mapper.readValue(mapper.writeValueAsBytes(context), Map.class);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.datastax.oss.sga.impl.common;

import com.datastax.oss.sga.api.model.ApplicationInstance;

public class PlaceholderEvaluator {

public static ApplicationInstance evaluate(ApplicationInstance applicationInstance) {
new ApplicationInstance();
return applicationInstance;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import com.datastax.oss.sga.api.runtime.PhysicalApplicationInstance;
import com.datastax.oss.sga.api.runtime.PluginsRegistry;
import com.datastax.oss.sga.api.runtime.ClusterRuntimeRegistry;
import com.datastax.oss.sga.impl.common.ApplicationInstancePlaceholderResolver;
import lombok.Builder;

@Builder
public class ApplicationDeployer<T extends PhysicalApplicationInstance> {

private ClusterRuntimeRegistry registry = new ClusterRuntimeRegistry();
private PluginsRegistry pluginsRegistry = new PluginsRegistry();
private ClusterRuntimeRegistry registry;
private PluginsRegistry pluginsRegistry;

public T createImplementation(ApplicationInstance applicationInstance) {
ClusterRuntime<T> clusterRuntime = registry.getClusterRuntime(applicationInstance.getInstance().streamingCluster());
Expand All @@ -20,7 +21,9 @@ public T createImplementation(ApplicationInstance applicationInstance) {

public void deploy(ApplicationInstance applicationInstance, T physicalApplicationInstance) {
ClusterRuntime<T> clusterRuntime = registry.getClusterRuntime(applicationInstance.getInstance().streamingCluster());
clusterRuntime.deploy(applicationInstance, physicalApplicationInstance);
final ApplicationInstance resolvedApplicationInstance = ApplicationInstancePlaceholderResolver
.resolvePlaceholders(applicationInstance);
clusterRuntime.deploy(resolvedApplicationInstance, physicalApplicationInstance);
}

public void delete(ApplicationInstance applicationInstance, T physicalApplicationInstance) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.datastax.oss.sga.impl.common;

import com.datastax.oss.sga.api.model.ApplicationInstance;
import com.datastax.oss.sga.api.model.Resource;
import com.datastax.oss.sga.impl.parser.ModelBuilder;
import com.samskivert.mustache.MustacheException;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class ApplicationInstancePlaceholderResolverTest {

@Test
void testAvailablePlaceholders() throws Exception {

ApplicationInstance applicationInstance = ModelBuilder
.buildApplicationInstance(Map.of(
"secrets.yaml", """
secrets:
- name: "OpenAI Azure credentials"
id: "openai-credentials"
data:
accessKey: "my-access-key"
""",
"instance.yaml", """
instance:
streamingCluster:
type: pulsar
configuration:
webServiceUrl: http://mypulsar.localhost:8080
globals:
another-url: another-value
open-api-url: http://myurl.localhost:8080/endpoint
"""));

final Map<String, Object> context = ApplicationInstancePlaceholderResolver.createContext(applicationInstance);
Assertions.assertEquals("my-access-key",
ApplicationInstancePlaceholderResolver.resolveValue(context,
"{{secrets.openai-credentials.accessKey}}"));
Assertions.assertEquals("http://mypulsar.localhost:8080",
ApplicationInstancePlaceholderResolver.resolveValue(context,
"{{cluster.configuration.webServiceUrl}}"));
Assertions.assertEquals("http://myurl.localhost:8080/endpoint",
ApplicationInstancePlaceholderResolver.resolveValue(context, "{{globals.open-api-url}}"));
}

@Test
void testResolveSecretsInConfiguration() throws Exception {
ApplicationInstance applicationInstance = ModelBuilder
.buildApplicationInstance(Map.of("configuration.yaml",
"""
configuration:
resources:
- type: "openai-azure-config"
name: "OpenAI Azure configuration"
id: "openai-azure"
configuration:
credentials: "{{secrets.openai-credentials.accessKey}}"
url: "{{globals.open-api-url}}"
""",
"secrets.yaml", """
secrets:
- name: "OpenAI Azure credentials"
id: "openai-credentials"
data:
accessKey: "my-access-key"
""",
"instance.yaml", """
instance:
globals:
another-url: another-value
open-api-url: http://myurl.localhost:8080/endpoint
"""));

final ApplicationInstance resolved =
ApplicationInstancePlaceholderResolver.resolvePlaceholders(applicationInstance);
final Resource resource = resolved.getResources().get("openai-azure");
Assertions.assertEquals("my-access-key", resource.configuration().get("credentials"));
Assertions.assertEquals("http://myurl.localhost:8080/endpoint", resource.configuration().get("url"));
}

@Test
void testResolveInAgentConfiguration() throws Exception {
ApplicationInstance applicationInstance = ModelBuilder
.buildApplicationInstance(Map.of("module1.yaml",
"""
module: "module-1"
id: "pipeline-1"
topics:
- name: "input-topic"
pipeline:
- name: "sink1"
id: "sink1"
type: "generic-pulsar-sink"
input: "input-topic"
configuration:
sinkType: "some-sink-type-on-your-cluster"
access-key: "{{ secrets.ak.value }}"
""",
"secrets.yaml", """
secrets:
- name: "OpenAI Azure credentials"
id: "ak"
data:
value: "my-access-key"
"""));

final ApplicationInstance resolved =
ApplicationInstancePlaceholderResolver.resolvePlaceholders(applicationInstance);
Assertions.assertEquals("my-access-key",
resolved.getModule("module-1").getPipelines().values().iterator().next().getAgents().get("sink1")
.getConfiguration()
.get("access-key"));
}

@Test
void testErrorOnNotFound() throws Exception {
ApplicationInstance applicationInstance = ModelBuilder
.buildApplicationInstance(Map.of("configuration.yaml",
"""
configuration:
resources:
- type: "openai-azure-config"
name: "OpenAI Azure configuration"
id: "openai-azure"
configuration:
credentials: "{{secrets.openai-credentials.invalid}}"
"""));
Assertions.assertThrows(MustacheException.Context.class, () -> {
ApplicationInstancePlaceholderResolver.resolvePlaceholders(applicationInstance);
});
}
}
Loading

0 comments on commit 146df93

Please sign in to comment.