diff --git a/cdh_5_4-cluster-cdh_kafka_1_2_0-lib/src/main/resources/data-collector-library.properties b/cdh_5_4-cluster-cdh_kafka_1_2_0-lib/src/main/resources/data-collector-library.properties index dd1ba073055..48d45ef62ef 100644 --- a/cdh_5_4-cluster-cdh_kafka_1_2_0-lib/src/main/resources/data-collector-library.properties +++ b/cdh_5_4-cluster-cdh_kafka_1_2_0-lib/src/main/resources/data-collector-library.properties @@ -20,4 +20,4 @@ # cluster.jar.blacklist.regex_com.streamsets.pipeline.stage.origin.kafka.KafkaDSource=^spark-streaming-kafka.* -execution.mode_com.streamsets.pipeline.stage.origin.kafka.KafkaDSource=CLUSTER +execution.mode_com.streamsets.pipeline.stage.origin.kafka.KafkaDSource=CLUSTER_STREAMING diff --git a/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/ClusterHdfsDSource.java b/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/ClusterHdfsDSource.java index 123d17297eb..123fc369dd4 100644 --- a/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/ClusterHdfsDSource.java +++ b/cluster-hdfs-protolib/src/main/java/com/streamsets/pipeline/stage/origin/hdfs/cluster/ClusterHdfsDSource.java @@ -37,6 +37,7 @@ import com.streamsets.pipeline.config.LogModeChooserValues; import com.streamsets.pipeline.configurablestage.DClusterSourceOffsetCommitter; import com.streamsets.pipeline.lib.parser.log.RegExConfig; + import java.util.List; import java.util.Map; @@ -45,7 +46,8 @@ version = 2, label = "Hadoop FS", description = "Reads data from Hadoop file system", - execution = ExecutionMode.CLUSTER, + execution = ExecutionMode.CLUSTER_BATCH, + libJarsRegex = {"avro-\\d+.*", "avro-mapred.*"}, icon = "hdfs.png", privateClassLoader = true, upgrader = ClusterHdfsSourceUpgrader.class diff --git a/commonlib/src/main/java/com/streamsets/pipeline/configurablestage/DClusterSourceOffsetCommitter.java b/commonlib/src/main/java/com/streamsets/pipeline/configurablestage/DClusterSourceOffsetCommitter.java index 3a3027451a8..3c9614cd336 100644 --- a/commonlib/src/main/java/com/streamsets/pipeline/configurablestage/DClusterSourceOffsetCommitter.java +++ b/commonlib/src/main/java/com/streamsets/pipeline/configurablestage/DClusterSourceOffsetCommitter.java @@ -46,14 +46,20 @@ Stage createStage() { @Override public String getName() { - initializeClusterSource(); - return clusterSource.getName(); + if (initializeClusterSource()) { + return clusterSource.getName(); + } else { + return null; + } } @Override public boolean isInBatchMode() { - initializeClusterSource(); - return clusterSource.isInBatchMode(); + if (initializeClusterSource()) { + return clusterSource.isInBatchMode(); + } else { + return false; + } } /** @@ -63,8 +69,9 @@ public boolean isInBatchMode() { */ @Override public void put(List batch) throws InterruptedException { - initializeClusterSource(); - clusterSource.put(batch); + if (initializeClusterSource()) { + clusterSource.put(batch); + } } private boolean initializeClusterSource() { diff --git a/container/src/main/java/com/streamsets/datacollector/cluster/ClusterModeConstants.java b/container/src/main/java/com/streamsets/datacollector/cluster/ClusterModeConstants.java index 464a11bcddd..946a40b6912 100644 --- a/container/src/main/java/com/streamsets/datacollector/cluster/ClusterModeConstants.java +++ b/container/src/main/java/com/streamsets/datacollector/cluster/ClusterModeConstants.java @@ -24,13 +24,11 @@ public class ClusterModeConstants { public static final String USER_LIBS = "user-libs"; public static final String NUM_EXECUTORS_KEY = "num-executors"; - public static final String CLUSTER_SOURCE_NAME = "cluster.source.name"; public static final String CLUSTER_PIPELINE_NAME = "cluster.pipeline.name"; public static final String CLUSTER_PIPELINE_REV = "cluster.pipeline.rev"; public static final String CLUSTER_PIPELINE_USER = "cluster.pipeline.user"; - public static final String SPARK_KAFKA_JAR_PREFIX = "spark-streaming-kafka"; - public static final String CLUSTER_SOURCE_BATCHMODE = "cluster.source.batchmode"; - public static final String AVRO_MAPRED_JAR_PREFIX = "avro-mapred"; + public static final String SPARK_KAFKA_JAR_REGEX = "spark-streaming-kafka.*"; + public static final String AVRO_MAPRED_JAR_REGEX = "avro-mapred.*"; public static final String AVRO_JAR_REGEX = "avro-\\d+.*"; } diff --git a/container/src/main/java/com/streamsets/datacollector/cluster/ClusterProviderImpl.java b/container/src/main/java/com/streamsets/datacollector/cluster/ClusterProviderImpl.java index a078001a7e6..d5a17082a5c 100644 --- a/container/src/main/java/com/streamsets/datacollector/cluster/ClusterProviderImpl.java +++ b/container/src/main/java/com/streamsets/datacollector/cluster/ClusterProviderImpl.java @@ -18,7 +18,6 @@ package com.streamsets.datacollector.cluster; import static com.streamsets.datacollector.definition.StageLibraryDefinitionExtractor.DATA_COLLECTOR_LIBRARY_PROPERTIES; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Splitter; @@ -90,9 +89,9 @@ public class ClusterProviderImpl implements ClusterProvider { static final Pattern YARN_APPLICATION_ID_REGEX = Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)"); static final Pattern NO_VALID_CREDENTIALS = Pattern.compile("(No valid credentials provided.*)"); - private static final String CLUSTER_TYPE = "CLUSTER_TYPE"; - private static final String CLUSTER_TYPE_SPARK = "spark"; - private static final String CLUSTER_TYPE_MAPREDUCE = "mr"; + public static final String CLUSTER_TYPE = "CLUSTER_TYPE"; + public static final String CLUSTER_TYPE_SPARK = "spark"; + public static final String CLUSTER_TYPE_MAPREDUCE = "mr"; private static final String CLUSTER_TYPE_YARN = "yarn"; private static final String KERBEROS_AUTH = "KERBEROS_AUTH"; private static final String KERBEROS_KEYTAB = "KERBEROS_KEYTAB"; @@ -410,10 +409,7 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce // order is important here as we don't want error stage // configs overriding source stage configs String clusterToken = UUID.randomUUID().toString(); - ClusterOrigin clusterOrigin = null; - String pathToSparkKafkaJar = null; - String pathToAvroMapredJar = null; - String pathToAvroJar = null; + List jarsToShip = new ArrayList(); List errors = new ArrayList<>(); PipelineBean pipelineBean = PipelineBeanCreator.get().create(false, stageLibrary, pipelineConfiguration, errors); if (!errors.isEmpty()) { @@ -424,9 +420,10 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce for (StageBean stageBean : pipelineBean.getStages()) { pipelineConfigurations.add(stageBean.getConfiguration()); } + boolean isBatch = false; for (StageConfiguration stageConf : pipelineConfigurations.build()) { StageDefinition stageDef = stageLibrary.getStage(stageConf.getLibrary(), stageConf.getStageName(), - false); + false); if (stageConf.getInputLanes().isEmpty()) { for (Config conf : stageConf.getConfiguration()) { if (conf.getValue() != null) { @@ -458,37 +455,29 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce } } } - // find the spark-kafka jar - for (URL jarUrl : ((URLClassLoader)stageDef.getStageClassLoader()).getURLs()) { - File jarFile = new File(jarUrl.getPath()); - if (jarFile.getName().startsWith(ClusterModeConstants.SPARK_KAFKA_JAR_PREFIX)) { - pathToSparkKafkaJar = jarFile.getAbsolutePath(); - } - } - - for (URL jarUrl : ((URLClassLoader) stageDef.getStageClassLoader()).getURLs()) { - File jarFile = new File(jarUrl.getPath()); - if (jarFile.getName().startsWith(ClusterModeConstants.AVRO_MAPRED_JAR_PREFIX)) { - pathToAvroMapredJar = jarFile.getAbsolutePath(); - } + ExecutionMode executionMode = PipelineBeanCreator.get().getExecutionMode(pipelineConfiguration, new ArrayList()); + if (executionMode == ExecutionMode.CLUSTER_BATCH) { + isBatch = true; + } else if (executionMode == ExecutionMode.CLUSTER_STREAMING) { + isBatch = false; + } else { + throw new IllegalStateException(Utils.format("Unsupported execution mode '{}' for cluster mode", + executionMode)); } - for (URL jarUrl : ((URLClassLoader) stageDef.getStageClassLoader()).getURLs()) { - File jarFile = new File(jarUrl.getPath()); - Pattern pattern = Pattern.compile(ClusterModeConstants.AVRO_JAR_REGEX); - Matcher matcher = pattern.matcher(jarFile.getName()); - if (matcher.matches()) { - pathToAvroJar = jarFile.getAbsolutePath(); + List libJarsRegex = stageDef.getLibJarsRegex(); + if (!libJarsRegex.isEmpty()) { + for (URL jarUrl : ((URLClassLoader) stageDef.getStageClassLoader()).getURLs()) { + File jarFile = new File(jarUrl.getPath()); + for (String libJar : libJarsRegex) { + Pattern pattern = Pattern.compile(libJar); + Matcher matcher = pattern.matcher(jarFile.getName()); + if (matcher.matches()) { + jarsToShip.add(jarFile.getAbsolutePath()); + } + } } } } - try { - clusterOrigin = ClusterOrigin.valueOf(Strings.nullToEmpty(sourceInfo.get(ClusterModeConstants. - CLUSTER_SOURCE_NAME)).toUpperCase(Locale.ENGLISH)); - } catch (IllegalArgumentException ex) { - String msg = Utils.format("Illegal value '{}' for '{}'", sourceInfo. - get(ClusterModeConstants.CLUSTER_SOURCE_NAME), ClusterModeConstants.CLUSTER_SOURCE_NAME); - throw new IllegalArgumentException(msg, ex); - } String type = StageLibraryUtils.getLibraryType(stageDef.getStageClassLoader()); String name = StageLibraryUtils.getLibraryName(stageDef.getStageClassLoader()); if (ClusterModeConstants.STREAMSETS_LIBS.equals(type)) { @@ -500,10 +489,6 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce throw new IllegalStateException(Utils.format("Error unknown stage library type: '{}'", type)); } } - if (clusterOrigin == ClusterOrigin.KAFKA) { - Utils.checkState(pathToSparkKafkaJar != null, "Could not find spark kafka jar"); - } - LOG.info("stagingDir = '{}'", stagingDir); LOG.info("bootstrapDir = '{}'", bootstrapDir); LOG.info("etcDir = '{}'", etcDir); @@ -554,9 +539,6 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce String msg = errorString("serializing etc directory: {}", ex); throw new RuntimeException(msg, ex); } - boolean isBatch = Boolean.parseBoolean(Utils.checkNotNull(sourceInfo.get( - ClusterModeConstants.CLUSTER_SOURCE_BATCHMODE), ClusterModeConstants.CLUSTER_SOURCE_BATCHMODE) - .trim().toLowerCase(Locale.ENGLISH)); File bootstrapJar = getBootstrapJar(new File(bootstrapDir, "main"), "streamsets-datacollector-bootstrap"); File clusterBootstrapJar = getBootstrapJar(new File(bootstrapDir, "spark"), "streamsets-datacollector-spark-bootstrap"); @@ -564,8 +546,6 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce InputStream clusterLog4jProperties = null; try { if (isBatch) { - Utils.checkState(pathToAvroJar != null, "Could not find avro jar"); - Utils.checkState(pathToAvroMapredJar != null, "Could not find avro-mapred jar"); clusterLog4jProperties = Utils.checkNotNull(getClass().getResourceAsStream("/cluster-mr-log4j.properties"), "Cluster Log4J Properties"); } else { @@ -593,15 +573,14 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce args = generateMRArgs(clusterManager.getAbsolutePath(), String.valueOf(config.clusterSlaveMemory), config.clusterSlaveJavaOpts, libsTarGz.getAbsolutePath(), etcTarGz.getAbsolutePath(), resourcesTarGz.getAbsolutePath(), log4jProperties.getAbsolutePath(), bootstrapJar.getAbsolutePath(), - sdcPropertiesFile.getAbsolutePath(), clusterBootstrapJar.getAbsolutePath(), pathToAvroJar, - pathToAvroMapredJar); + sdcPropertiesFile.getAbsolutePath(), clusterBootstrapJar.getAbsolutePath(), jarsToShip); } else { LOG.info("Submitting Spark Job"); environment.put(CLUSTER_TYPE, CLUSTER_TYPE_SPARK); args = generateSparkArgs(clusterManager.getAbsolutePath(), String.valueOf(config.clusterSlaveMemory), config.clusterSlaveJavaOpts, numExecutors, libsTarGz.getAbsolutePath(), etcTarGz.getAbsolutePath(), resourcesTarGz.getAbsolutePath(), log4jProperties.getAbsolutePath(), bootstrapJar.getAbsolutePath(), - pathToSparkKafkaJar, clusterBootstrapJar.getAbsolutePath()); + jarsToShip, clusterBootstrapJar.getAbsolutePath()); } SystemProcess process = systemProcessFactory.create(ClusterProviderImpl.class.getSimpleName(), outputDir, args); LOG.info("Starting: " + process); @@ -656,7 +635,7 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce private List generateMRArgs(String clusterManager, String slaveMemory, String javaOpts, String libsTarGz, String etcTarGz, String resourcesTarGz, String log4jProperties, String bootstrapJar, String sdcPropertiesFile, - String clusterBootstrapJar, String pathToAvroJar, String pathToAvroMapredJar) { + String clusterBootstrapJar, List jarsToShip) { List args = new ArrayList<>(); args.add(clusterManager); args.add("start"); @@ -668,15 +647,20 @@ private List generateMRArgs(String clusterManager, String slaveMemory, S args.add("-D"); args.add("mapreduce.job.log4j-properties-file=" + log4jProperties); args.add("-libjars"); - args.add(Joiner.on(",").join(bootstrapJar, pathToAvroMapredJar, pathToAvroJar)); + String libJarString = bootstrapJar; + for (String jarToShip: jarsToShip) { + libJarString = libJarString + "," + jarToShip; + } + args.add(libJarString); args.add(sdcPropertiesFile); args.add(Joiner.on(" ").join(String.format("-Xmx%sm", slaveMemory), javaOpts, "-javaagent:./" + (new File(bootstrapJar)).getName())); return args; } + private List generateSparkArgs(String clusterManager, String slaveMemory, String javaOpts, String numExecutors, String libsTarGz, String etcTarGz, String resourcesTarGz, - String log4jProperties, String bootstrapJar, String pathToSparkKafkaJar, + String log4jProperties, String bootstrapJar, List jarsToShip, String clusterBootstrapJar) { List args = new ArrayList<>(); args.add(clusterManager); @@ -702,7 +686,11 @@ private List generateSparkArgs(String clusterManager, String slaveMemory args.add("--files"); args.add(log4jProperties); args.add("--jars"); - args.add(Joiner.on(",").skipNulls().join(bootstrapJar, pathToSparkKafkaJar)); + String libJarString = bootstrapJar; + for (String jarToShip: jarsToShip) { + libJarString = libJarString + "," + jarToShip; + } + args.add(libJarString); // use our javaagent and java opt configs args.add("--conf"); args.add("spark.executor.extraJavaOptions=" + Joiner.on(" ").join("-javaagent:./" + (new File(bootstrapJar)).getName(), diff --git a/container/src/main/java/com/streamsets/datacollector/config/ExecutionModeChooserValues.java b/container/src/main/java/com/streamsets/datacollector/config/ExecutionModeChooserValues.java index 915dbb28c3c..568c8deeccd 100644 --- a/container/src/main/java/com/streamsets/datacollector/config/ExecutionModeChooserValues.java +++ b/container/src/main/java/com/streamsets/datacollector/config/ExecutionModeChooserValues.java @@ -23,6 +23,6 @@ public class ExecutionModeChooserValues extends BaseEnumChooserValues { public ExecutionModeChooserValues() { - super(ExecutionMode.STANDALONE, ExecutionMode.CLUSTER); + super(ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH, ExecutionMode.CLUSTER_STREAMING); } } diff --git a/container/src/main/java/com/streamsets/datacollector/config/StageDefinition.java b/container/src/main/java/com/streamsets/datacollector/config/StageDefinition.java index 131fcfd486a..b60bad7cf71 100644 --- a/container/src/main/java/com/streamsets/datacollector/config/StageDefinition.java +++ b/container/src/main/java/com/streamsets/datacollector/config/StageDefinition.java @@ -47,17 +47,18 @@ public class StageDefinition { private final boolean preconditions; private final boolean onRecordError; private final RawSourceDefinition rawSourceDefinition; - private List configDefinitions; - private Map configDefinitionsMap; + private final List configDefinitions; + private final Map configDefinitionsMap; private final String icon; private final ConfigGroupDefinition configGroupDefinition; private final boolean variableOutputStreams; private final int outputStreams; private final String outputStreamLabelProviderClass; private List outputStreamLabels; - private List executionModes; + private final List executionModes; private final boolean recordsByRef; private final StageUpgrader upgrader; + private final List libJarsRegex; // localized version private StageDefinition(StageLibraryDefinition libraryDefinition, boolean privateClassLoader, ClassLoader classLoader, @@ -66,7 +67,7 @@ private StageDefinition(StageLibraryDefinition libraryDefinition, boolean privat boolean onRecordError, List configDefinitions, RawSourceDefinition rawSourceDefinition, String icon, ConfigGroupDefinition configGroupDefinition, boolean variableOutputStreams, int outputStreams, List outputStreamLabels, List executionModes, boolean recordsByRef, - StageUpgrader upgrader) { + StageUpgrader upgrader, List libJarsRegex) { this.libraryDefinition = libraryDefinition; this.privateClassLoader = privateClassLoader; this.classLoader = classLoader; @@ -103,6 +104,8 @@ private StageDefinition(StageLibraryDefinition libraryDefinition, boolean privat this.executionModes = executionModes; this.recordsByRef = recordsByRef; this.upgrader = upgrader; + this.libJarsRegex = libJarsRegex; + } public StageDefinition(StageDefinition def, ClassLoader classLoader) { @@ -133,6 +136,7 @@ public StageDefinition(StageDefinition def, ClassLoader classLoader) { executionModes = def.executionModes; recordsByRef = def.recordsByRef; upgrader = def.upgrader; + libJarsRegex = def.libJarsRegex; } public StageDefinition(StageLibraryDefinition libraryDefinition, boolean privateClassLoader, Class klass, @@ -141,7 +145,7 @@ public StageDefinition(StageLibraryDefinition libraryDefinition, boolean private List configDefinitions, RawSourceDefinition rawSourceDefinition, String icon, ConfigGroupDefinition configGroupDefinition, boolean variableOutputStreams, int outputStreams, String outputStreamLabelProviderClass, List executionModes, boolean recordsByRef, - StageUpgrader upgrader) { + StageUpgrader upgrader, List libJarsRegex) { this.libraryDefinition = libraryDefinition; this.privateClassLoader = privateClassLoader; this.classLoader = libraryDefinition.getClassLoader(); @@ -177,6 +181,7 @@ public StageDefinition(StageLibraryDefinition libraryDefinition, boolean private this.executionModes = executionModes; this.recordsByRef = recordsByRef; this.upgrader = upgrader; + this.libJarsRegex = libJarsRegex; } public List getLibraryExecutionModes() { @@ -298,6 +303,10 @@ public List getExecutionModes() { return executionModes; } + public List getLibJarsRegex() { + return libJarsRegex; + } + public boolean getRecordsByRef() { return recordsByRef; } @@ -400,7 +409,7 @@ public StageDefinition localize() { getVersion(), label, description, getType(), isErrorStage(), hasPreconditions(), hasOnRecordError(), configDefs, rawSourceDef, getIcon(), groupDefs, isVariableOutputStreams(), getOutputStreams(), streamLabels, executionModes, - recordsByRef, upgrader); + recordsByRef, upgrader, libJarsRegex); } private List _getOutputStreamLabels(ClassLoader classLoader, boolean localized) { diff --git a/container/src/main/java/com/streamsets/datacollector/creation/PipelineConfigBean.java b/container/src/main/java/com/streamsets/datacollector/creation/PipelineConfigBean.java index 33600f53481..70288b67bf7 100644 --- a/container/src/main/java/com/streamsets/datacollector/creation/PipelineConfigBean.java +++ b/container/src/main/java/com/streamsets/datacollector/creation/PipelineConfigBean.java @@ -170,7 +170,7 @@ public class PipelineConfigBean implements Stage { displayPosition = 100, group = "CLUSTER", dependsOn = "executionMode", - triggeredByValue = "CLUSTER" + triggeredByValue = {"CLUSTER_BATCH", "CLUSTER_STREAMING"} ) public long clusterSlaveMemory; @@ -184,7 +184,7 @@ public class PipelineConfigBean implements Stage { displayPosition = 110, group = "CLUSTER", dependsOn = "executionMode", - triggeredByValue = "CLUSTER" + triggeredByValue = {"CLUSTER_BATCH", "CLUSTER_STREAMING"} ) public String clusterSlaveJavaOpts; @@ -198,7 +198,7 @@ public class PipelineConfigBean implements Stage { displayPosition = 120, group = "CLUSTER", dependsOn = "executionMode", - triggeredByValue = "CLUSTER" + triggeredByValue = {"CLUSTER_BATCH", "CLUSTER_STREAMING"} ) public Map clusterLauncherEnv; diff --git a/container/src/main/java/com/streamsets/datacollector/definition/StageDefinitionExtractor.java b/container/src/main/java/com/streamsets/datacollector/definition/StageDefinitionExtractor.java index 7acaeea135f..26040ba5dec 100644 --- a/container/src/main/java/com/streamsets/datacollector/definition/StageDefinitionExtractor.java +++ b/container/src/main/java/com/streamsets/datacollector/definition/StageDefinitionExtractor.java @@ -41,6 +41,7 @@ import com.streamsets.pipeline.api.impl.Utils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -181,12 +182,11 @@ public StageDefinition extract(StageLibraryDefinition libraryDef, Class executionModes = ImmutableList.copyOf(sDef.execution()); - List executionModesLibraryOverride = libraryDef.getStageExecutionModesOverride(klass); if (executionModesLibraryOverride != null) { executionModes = executionModesLibraryOverride; } - + List libJarsRegex = ImmutableList.copyOf(sDef.libJarsRegex()); boolean recordsByRef = sDef.recordsByRef(); List systemConfigs = ConfigDefinitionExtractor.get().extract(StageConfigBean.class, @@ -228,7 +228,7 @@ public StageDefinition extract(StageLibraryDefinition libraryDef, Class configsToShip; - ClusterSourceInfo(String clusterSourceName, int parallelism, boolean isInBatchMode, Map configsToShip) { + ClusterSourceInfo(int parallelism, Map configsToShip) { this.parallelism = parallelism; - this.clusterSourceName = clusterSourceName; - this.isInBatchMode = isInBatchMode; this.configsToShip = configsToShip; } @@ -543,19 +538,10 @@ int getParallelism() { return parallelism; } - String getClusterSourceName() { - return clusterSourceName; - } - - boolean isInBatchMode() { - return isInBatchMode; - } - Map getConfigsToShip() { return configsToShip; } - - } + } private ProductionPipeline createProductionPipeline(String name, String rev, Configuration configuration, PipelineConfiguration pipelineConfiguration) throws PipelineStoreException, PipelineRuntimeException, @@ -664,8 +650,6 @@ private synchronized void doStart(PipelineConfiguration pipelineConf, ClusterSou File bootstrapDir = new File(this.runtimeInfo.getLibexecDir(), "bootstrap-libs"); // create pipeline and get the parallelism info from the source sourceInfo.put(ClusterModeConstants.NUM_EXECUTORS_KEY, String.valueOf(clusterSourceInfo.getParallelism())); - sourceInfo.put(ClusterModeConstants.CLUSTER_SOURCE_NAME, clusterSourceInfo.getClusterSourceName()); - sourceInfo.put(ClusterModeConstants.CLUSTER_SOURCE_BATCHMODE, String.valueOf(clusterSourceInfo.isInBatchMode())); sourceInfo.put(ClusterModeConstants.CLUSTER_PIPELINE_NAME, name); sourceInfo.put(ClusterModeConstants.CLUSTER_PIPELINE_REV, rev); sourceInfo.put(ClusterModeConstants.CLUSTER_PIPELINE_USER, user); diff --git a/container/src/main/java/com/streamsets/datacollector/execution/runner/common/MetricsObserverRunner.java b/container/src/main/java/com/streamsets/datacollector/execution/runner/common/MetricsObserverRunner.java index c679ed4bff2..54ed3402788 100644 --- a/container/src/main/java/com/streamsets/datacollector/execution/runner/common/MetricsObserverRunner.java +++ b/container/src/main/java/com/streamsets/datacollector/execution/runner/common/MetricsObserverRunner.java @@ -24,7 +24,6 @@ import com.streamsets.datacollector.execution.alerts.MetricRuleEvaluator; import com.streamsets.datacollector.metrics.MetricsConfigurator; import com.streamsets.datacollector.runner.production.RulesConfigurationChangeRequest; - import java.util.List; public class MetricsObserverRunner { diff --git a/container/src/main/java/com/streamsets/datacollector/execution/runner/provider/StandaloneAndClusterRunnerProviderImpl.java b/container/src/main/java/com/streamsets/datacollector/execution/runner/provider/StandaloneAndClusterRunnerProviderImpl.java index 9f9b0e0569c..7f4ee7687cd 100644 --- a/container/src/main/java/com/streamsets/datacollector/execution/runner/provider/StandaloneAndClusterRunnerProviderImpl.java +++ b/container/src/main/java/com/streamsets/datacollector/execution/runner/provider/StandaloneAndClusterRunnerProviderImpl.java @@ -50,7 +50,8 @@ public Runner createRunner(String user, String name, String rev, ObjectGraph obj List modules = new ArrayList<>(); LOG.info(Utils.format("Pipeline execution mode is: {} ", executionMode)); switch (executionMode) { - case CLUSTER: + case CLUSTER_BATCH: + case CLUSTER_STREAMING: objectGraph = objectGraph.plus(ClusterRunnerInjectorModule.class); modules.add(new ClusterRunnerModule(user, name, rev, objectGraph)); break; diff --git a/container/src/main/java/com/streamsets/datacollector/restapi/bean/BeanHelper.java b/container/src/main/java/com/streamsets/datacollector/restapi/bean/BeanHelper.java index 9762599f18c..42801fbdec4 100644 --- a/container/src/main/java/com/streamsets/datacollector/restapi/bean/BeanHelper.java +++ b/container/src/main/java/com/streamsets/datacollector/restapi/bean/BeanHelper.java @@ -1110,8 +1110,10 @@ public static ExecutionModeJson wrapExecutionMode(ExecutionMode executionMode) { return null; } switch (executionMode) { - case CLUSTER: - return ExecutionModeJson.CLUSTER; + case CLUSTER_BATCH: + return ExecutionModeJson.CLUSTER_BATCH; + case CLUSTER_STREAMING: + return ExecutionModeJson.CLUSTER_STREAMING; case STANDALONE: return ExecutionModeJson.STANDALONE; case SLAVE: @@ -1126,8 +1128,10 @@ public static ExecutionMode unwrapExecutionMode(ExecutionModeJson executionModeJ return null; } switch (executionModeJson) { - case CLUSTER: - return ExecutionMode.CLUSTER; + case CLUSTER_BATCH: + return ExecutionMode.CLUSTER_BATCH; + case CLUSTER_STREAMING: + return ExecutionMode.CLUSTER_STREAMING; case STANDALONE: return ExecutionMode.STANDALONE; case SLAVE: diff --git a/container/src/main/java/com/streamsets/datacollector/restapi/bean/ExecutionModeJson.java b/container/src/main/java/com/streamsets/datacollector/restapi/bean/ExecutionModeJson.java index 6ce596c2b26..0248da76337 100644 --- a/container/src/main/java/com/streamsets/datacollector/restapi/bean/ExecutionModeJson.java +++ b/container/src/main/java/com/streamsets/datacollector/restapi/bean/ExecutionModeJson.java @@ -19,7 +19,8 @@ public enum ExecutionModeJson { STANDALONE, - CLUSTER, + CLUSTER_BATCH, + CLUSTER_STREAMING, SLAVE ; } diff --git a/container/src/main/java/com/streamsets/datacollector/runner/Pipeline.java b/container/src/main/java/com/streamsets/datacollector/runner/Pipeline.java index 8e4502c5a71..c8aac3be5ac 100644 --- a/container/src/main/java/com/streamsets/datacollector/runner/Pipeline.java +++ b/container/src/main/java/com/streamsets/datacollector/runner/Pipeline.java @@ -30,6 +30,7 @@ import com.streamsets.datacollector.util.ContainerError; import com.streamsets.datacollector.validation.Issue; import com.streamsets.datacollector.validation.IssueCreator; +import com.streamsets.pipeline.api.ExecutionMode; import com.streamsets.pipeline.api.OnRecordError; import com.streamsets.pipeline.api.Source; import com.streamsets.pipeline.api.Stage; @@ -278,7 +279,8 @@ private boolean isClusterMode(PipelineConfiguration pipelineConf) { boolean clusterMode = false; if(pipelineConf.getConfiguration(EXECUTION_MODE_CONFIG_KEY) != null) { String executionMode = (String) pipelineConf.getConfiguration(EXECUTION_MODE_CONFIG_KEY).getValue(); - if (executionMode != null && !executionMode.isEmpty() && executionMode.equals(EXECUTION_MODE_CLUSTER)) { + if (executionMode != null && !executionMode.isEmpty() && (executionMode.equalsIgnoreCase(ExecutionMode.CLUSTER_BATCH.name()) || + executionMode.equalsIgnoreCase(ExecutionMode.CLUSTER_STREAMING.name()))) { clusterMode = true; } } diff --git a/container/src/main/java/com/streamsets/datacollector/runner/preview/PreviewStageLibraryTask.java b/container/src/main/java/com/streamsets/datacollector/runner/preview/PreviewStageLibraryTask.java index 47e1aa72324..c9ed63b2c8b 100644 --- a/container/src/main/java/com/streamsets/datacollector/runner/preview/PreviewStageLibraryTask.java +++ b/container/src/main/java/com/streamsets/datacollector/runner/preview/PreviewStageLibraryTask.java @@ -44,7 +44,7 @@ public class PreviewStageLibraryTask extends TaskWrapper implements StageLibrary private static final StageDefinition PLUG_STAGE = new StageDefinition(PREVIEW_LIB, false, PreviewPlugTarget.class, NAME, VERSION, "previewPlug", "Preview Plug", StageType.TARGET, false, false, false, Collections.emptyList(), null/*raw source definition*/, "", null, false, 0, null, Arrays.asList(ExecutionMode.STANDALONE), false, - new StageUpgrader.Default()); + new StageUpgrader.Default(), Collections.emptyList()); private final StageLibraryTask library; diff --git a/container/src/test/java/com/streamsets/datacollector/cluster/MockSystemProcess.java b/container/src/test/java/com/streamsets/datacollector/cluster/MockSystemProcess.java index 5027148d3fa..96de6445ef3 100644 --- a/container/src/test/java/com/streamsets/datacollector/cluster/MockSystemProcess.java +++ b/container/src/test/java/com/streamsets/datacollector/cluster/MockSystemProcess.java @@ -32,6 +32,7 @@ public class MockSystemProcess implements SystemProcess { public static final List output = new ArrayList<>(); public static final List error = new ArrayList<>(); public static final List args = new ArrayList<>(); + public static Map env; public static void reset() { isAlive = false; @@ -56,6 +57,7 @@ public void start() throws IOException { @Override public void start(Map env) throws IOException { + MockSystemProcess.env = env; start(); } diff --git a/container/src/test/java/com/streamsets/datacollector/cluster/TestClusterProviderImpl.java b/container/src/test/java/com/streamsets/datacollector/cluster/TestClusterProviderImpl.java index 876357f7553..c29845aa6c2 100644 --- a/container/src/test/java/com/streamsets/datacollector/cluster/TestClusterProviderImpl.java +++ b/container/src/test/java/com/streamsets/datacollector/cluster/TestClusterProviderImpl.java @@ -30,11 +30,13 @@ import com.streamsets.datacollector.store.PipelineInfo; import com.streamsets.datacollector.store.PipelineStoreTask; import com.streamsets.pipeline.api.Config; +import com.streamsets.pipeline.api.ExecutionMode; import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -103,14 +105,19 @@ public void setup() throws Exception { configs.add(new Config("clusterKerberos", false)); configs.add(new Config("kerberosPrincipal", "")); configs.add(new Config("kerberosKeytab", "")); + configs.add(new Config("executionMode", ExecutionMode.CLUSTER_STREAMING)); pipelineConf = new PipelineConfiguration(PipelineStoreTask.SCHEMA_VERSION, PipelineConfigBean.VERSION, UUID.randomUUID(), null, configs, null, new ArrayList(), MockStages.getErrorStageConfig()); pipelineConf.setPipelineInfo(new PipelineInfo("name", "desc", null, null, "aaa", null, null, null, true)); - File sparkKafkaJar = new File(tempDir, ClusterModeConstants.SPARK_KAFKA_JAR_PREFIX + ".jar"); + File sparkKafkaJar = new File(tempDir, "spark-streaming-kafka-1.2.jar"); + File avroJar = new File(tempDir, "avro-1.7.7.jar"); + File avroMapReduceJar = new File(tempDir, "avro-mapred-1.7.7.jar"); Assert.assertTrue(sparkKafkaJar.createNewFile()); - classLoader = new URLClassLoader(new URL[] {sparkKafkaJar.toURL()}) { + Assert.assertTrue(avroJar.createNewFile()); + Assert.assertTrue(avroMapReduceJar.createNewFile()); + classLoader = new URLClassLoader(new URL[] {sparkKafkaJar.toURL(), avroJar.toURL(), avroMapReduceJar.toURL()}) { public String getType() { return ClusterModeConstants.USER_LIBS; } @@ -119,8 +126,6 @@ public String getType() { env = new HashMap<>(); sourceInfo = new HashMap<>(); sourceInfo.put(ClusterModeConstants.NUM_EXECUTORS_KEY, "64"); - sourceInfo.put(ClusterModeConstants.CLUSTER_SOURCE_NAME, "kafka"); - sourceInfo.put(ClusterModeConstants.CLUSTER_SOURCE_BATCHMODE, "false"); sparkProvider = new ClusterProviderImpl(); } @@ -165,6 +170,47 @@ public void testMoreThanOneAppId() throws Throwable { new ArrayList(), new ArrayList(), UUID.randomUUID())).getId()); } + @Test + public void testStreamingExecutionMode() throws Throwable { + MockSystemProcess.output.add(" application_1429587312661_0024 "); + List list = new ArrayList(); + list.add(new Config("executionMode", ExecutionMode.CLUSTER_STREAMING.name())); + PipelineConfiguration pipelineConf = new PipelineConfiguration(PipelineStoreTask.SCHEMA_VERSION, PipelineConfigBean.VERSION, + UUID.randomUUID(), null, list, null, MockStages.getSourceStageConfig(), + MockStages.getErrorStageConfig()); + pipelineConf.setPipelineInfo(new PipelineInfo("name", "desc", null, null, + "aaa", null, null, null, true)); + Assert.assertNotNull(sparkProvider.startPipeline(new MockSystemProcessFactory(), sparkManagerShell, + providerTemp, env, sourceInfo, pipelineConf, MockStages.createClusterStreamingStageLibrary(classLoader), etcDir, resourcesDir, + webDir, bootstrapLibDir, classLoader, classLoader, 60, + new RuleDefinitions(new ArrayList(), new ArrayList(), + new ArrayList(), UUID.randomUUID())).getId()); + Assert.assertEquals(ClusterProviderImpl.CLUSTER_TYPE_SPARK, + MockSystemProcess.env.get(ClusterProviderImpl.CLUSTER_TYPE)); + Assert.assertTrue(MockSystemProcess.args + .contains("/bootstrap-lib/main/streamsets-datacollector-bootstrap.jar," + + "/spark-streaming-kafka-1.2.jar")); + } + + @Test + public void testBatchExecutionMode() throws Throwable { + MockSystemProcess.output.add(" application_1429587312661_0024 "); + List list = new ArrayList(); + list.add(new Config("executionMode", ExecutionMode.CLUSTER_BATCH.name())); + PipelineConfiguration pipelineConf = new PipelineConfiguration(PipelineStoreTask.SCHEMA_VERSION, PipelineConfigBean.VERSION, + UUID.randomUUID(), null, list, null, MockStages.getSourceStageConfig(), + MockStages.getErrorStageConfig()); + pipelineConf.setPipelineInfo(new PipelineInfo("name", "desc", null, null, + "aaa", null, null, null, true)); + Assert.assertNotNull(sparkProvider.startPipeline(new MockSystemProcessFactory(), sparkManagerShell, + providerTemp, env, sourceInfo, pipelineConf, MockStages.createClusterBatchStageLibrary(classLoader), etcDir, resourcesDir, webDir, + bootstrapLibDir, classLoader, classLoader, 60, new RuleDefinitions(new ArrayList(), + new ArrayList(), new ArrayList(), UUID.randomUUID())).getId()); + Assert.assertEquals(ClusterProviderImpl.CLUSTER_TYPE_MAPREDUCE, MockSystemProcess.env.get(ClusterProviderImpl.CLUSTER_TYPE)); + Assert.assertTrue(MockSystemProcess.args.contains("/bootstrap-lib/main/streamsets-datacollector-bootstrap.jar," + + "/avro-1.7.7.jar," + "/avro-mapred-1.7.7.jar")); + } + @Test public void testKerberosError() throws Throwable { MockSystemProcess.output.add("Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]"); @@ -193,7 +239,7 @@ public void testSuccess() throws Throwable { "--executor-memory", "512m", "--executor-cores", "1", "--num-executors", "64", "--archives", "/provider-temp/staging/libs.tar.gz,/provider-temp/staging/etc.tar.gz,/provider-temp/staging/resources.tar.gz", "--files", "/provider-temp/staging/log4j.properties", "--jars", - "/bootstrap-lib/main/streamsets-datacollector-bootstrap.jar,/spark-streaming-kafka.jar", + "/bootstrap-lib/main/streamsets-datacollector-bootstrap.jar", "--conf", "spark.executor.extraJavaOptions=-javaagent:./streamsets-datacollector-bootstrap.jar ", "--class", "com.streamsets.pipeline.BootstrapClusterStreaming", "/bootstrap-lib/spark/streamsets-datacollector-spark-bootstrap.jar"}, MockSystemProcess.args.toArray()); diff --git a/container/src/test/java/com/streamsets/datacollector/config/TestStageDefinitionLocalization.java b/container/src/test/java/com/streamsets/datacollector/config/TestStageDefinitionLocalization.java index 8b169f68725..0b2cdedaf41 100644 --- a/container/src/test/java/com/streamsets/datacollector/config/TestStageDefinitionLocalization.java +++ b/container/src/test/java/com/streamsets/datacollector/config/TestStageDefinitionLocalization.java @@ -85,8 +85,8 @@ private StageDefinition createStageDefinition() { "StageDescription", StageType.PROCESSOR, true, true, true, configs, rawSource, "", configGroup, false, 1, TOutput.class.getName(), - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, - new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_BATCH, ExecutionMode.STANDALONE), false, + new StageUpgrader.Default(), Collections.emptyList()); return def; } diff --git a/container/src/test/java/com/streamsets/datacollector/creation/TestPipelineBeanCreator.java b/container/src/test/java/com/streamsets/datacollector/creation/TestPipelineBeanCreator.java index d224289477c..c436e5ecbc8 100644 --- a/container/src/test/java/com/streamsets/datacollector/creation/TestPipelineBeanCreator.java +++ b/container/src/test/java/com/streamsets/datacollector/creation/TestPipelineBeanCreator.java @@ -508,7 +508,7 @@ public void testCreatePipelineBean() { Mockito.when(libraryDef.getClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader()); List pipelineConfigs = ImmutableList.of( - new Config("executionMode", ExecutionMode.CLUSTER.name()), + new Config("executionMode", ExecutionMode.CLUSTER_BATCH.name()), new Config("memoryLimit", 1000) ); @@ -527,7 +527,7 @@ public void testCreatePipelineBean() { Assert.assertNotNull(bean); // pipeline configs - Assert.assertEquals(ExecutionMode.CLUSTER, bean.getConfig().executionMode); + Assert.assertEquals(ExecutionMode.CLUSTER_BATCH, bean.getConfig().executionMode); // stages Assert.assertEquals(1, bean.getStages().size()); @@ -580,7 +580,7 @@ public void testConfigsWithJavaDefaults() { Mockito.when(libraryDef.getClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader()); List pipelineConfigs = ImmutableList.of( - new Config("executionMode", ExecutionMode.CLUSTER.name()), + new Config("executionMode", ExecutionMode.CLUSTER_BATCH.name()), new Config("memoryLimit", 1000) ); diff --git a/container/src/test/java/com/streamsets/datacollector/definition/TestStageDefinitionExtractor.java b/container/src/test/java/com/streamsets/datacollector/definition/TestStageDefinitionExtractor.java index c9ca0769976..871692db5d6 100644 --- a/container/src/test/java/com/streamsets/datacollector/definition/TestStageDefinitionExtractor.java +++ b/container/src/test/java/com/streamsets/datacollector/definition/TestStageDefinitionExtractor.java @@ -18,6 +18,7 @@ package com.streamsets.datacollector.definition; import com.google.common.collect.ImmutableList; +import com.streamsets.datacollector.cluster.ClusterModeConstants; import com.streamsets.datacollector.config.StageDefinition; import com.streamsets.datacollector.config.StageLibraryDefinition; import com.streamsets.datacollector.config.StageType; @@ -75,7 +76,7 @@ public void setMimeType(String mimeType) { } } - @StageDef(version = 1, label = "L", description = "D", icon = "TargetIcon.svg") + @StageDef(version = 1, label = "L", description = "D", icon = "TargetIcon.svg", libJarsRegex = {ClusterModeConstants.AVRO_JAR_REGEX, ClusterModeConstants.AVRO_MAPRED_JAR_REGEX}) public static class Source1 extends BaseSource { @ConfigDef( @@ -118,7 +119,7 @@ public String getLabel() { } @StageDef(version = 2, label = "LL", description = "DD", icon = "TargetIcon.svg", - execution = ExecutionMode.STANDALONE, outputStreams = TwoOutputStreams.class, recordsByRef = true, + execution = {ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH}, outputStreams = TwoOutputStreams.class, recordsByRef = true, privateClassLoader = true, upgrader = Source2Upgrader.class) @ConfigGroups(Group1.class) @RawSource(rawSourcePreviewer = Previewer.class) @@ -209,7 +210,8 @@ public void testExtractSource1() { Assert.assertEquals(0, def.getConfigGroupDefinition().getGroupNames().size()); Assert.assertEquals(3, def.getConfigDefinitions().size()); Assert.assertEquals(1, def.getOutputStreams()); - Assert.assertEquals(2, def.getExecutionModes().size()); + Assert.assertEquals(3, def.getExecutionModes().size()); + Assert.assertEquals(2, def.getLibJarsRegex().size()); Assert.assertEquals("TargetIcon.svg", def.getIcon()); Assert.assertEquals(StageDef.DefaultOutputStreams.class.getName(), def.getOutputStreamLabelProviderClass()); Assert.assertEquals(null, def.getOutputStreamLabels()); @@ -234,7 +236,7 @@ public void testExtractSource2() { Assert.assertEquals(1, def.getConfigGroupDefinition().getGroupNames().size()); Assert.assertEquals(3, def.getConfigDefinitions().size()); Assert.assertEquals(2, def.getOutputStreams()); - Assert.assertEquals(1, def.getExecutionModes().size()); + Assert.assertEquals(2, def.getExecutionModes().size()); Assert.assertEquals("TargetIcon.svg", def.getIcon()); Assert.assertEquals(TwoOutputStreams.class.getName(), def.getOutputStreamLabelProviderClass()); Assert.assertEquals(null, def.getOutputStreamLabels()); @@ -297,12 +299,12 @@ public void testExtractMissingIcon() { @Test public void testLibraryExecutionOverride() { Properties props = new Properties(); - props.put(StageLibraryDefinition.EXECUTION_MODE_PREFIX + Source1.class.getName(), "CLUSTER"); + props.put(StageLibraryDefinition.EXECUTION_MODE_PREFIX + Source1.class.getName(), "CLUSTER_BATCH"); StageLibraryDefinition libDef = new StageLibraryDefinition(TestStageDefinitionExtractor.class.getClassLoader(), "mock", "MOCK", props, null, null, null); StageDefinition def = StageDefinitionExtractor.get().extract(libDef, Source1.class, "x"); - Assert.assertEquals(ImmutableList.of(ExecutionMode.CLUSTER),def.getExecutionModes()); + Assert.assertEquals(ImmutableList.of(ExecutionMode.CLUSTER_BATCH),def.getExecutionModes()); } } diff --git a/container/src/test/java/com/streamsets/datacollector/execution/manager/standalone/TestStandalonePipelineManager.java b/container/src/test/java/com/streamsets/datacollector/execution/manager/standalone/TestStandalonePipelineManager.java index 1be37a97792..d92636482f1 100644 --- a/container/src/test/java/com/streamsets/datacollector/execution/manager/standalone/TestStandalonePipelineManager.java +++ b/container/src/test/java/com/streamsets/datacollector/execution/manager/standalone/TestStandalonePipelineManager.java @@ -307,10 +307,10 @@ public void testChangeExecutionModes() throws Exception { pipelineStoreTask.create("user1", "pipeline2", "blah"); pipelineStateStore.saveState("user", "pipeline2", "0", PipelineStatus.EDITED, "blah", null, ExecutionMode.STANDALONE, null, 0, 0); Runner runner1 = pipelineManager.getRunner("user1", "pipeline2", "0"); - pipelineStateStore.saveState("user", "pipeline2", "0", PipelineStatus.EDITED, "blah", null, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("user", "pipeline2", "0", PipelineStatus.EDITED, "blah", null, ExecutionMode.CLUSTER_BATCH, null, 0, 0); Runner runner2 = pipelineManager.getRunner("user1", "pipeline2", "0"); assertTrue(runner1 != runner2); - pipelineStateStore.saveState("user", "pipeline2", "0", PipelineStatus.STARTING, "blah", null, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("user", "pipeline2", "0", PipelineStatus.STARTING, "blah", null, ExecutionMode.CLUSTER_BATCH, null, 0, 0); pipelineManager.getRunner("user1", "pipeline2", "0"); pipelineStateStore.saveState("user", "pipeline2", "0", PipelineStatus.STARTING, "blah", null, ExecutionMode.STANDALONE, null, 0, 0); try { diff --git a/container/src/test/java/com/streamsets/datacollector/execution/runner/cluster/TestClusterRunner.java b/container/src/test/java/com/streamsets/datacollector/execution/runner/cluster/TestClusterRunner.java index 32e16c01813..b9826c697c2 100644 --- a/container/src/test/java/com/streamsets/datacollector/execution/runner/cluster/TestClusterRunner.java +++ b/container/src/test/java/com/streamsets/datacollector/execution/runner/cluster/TestClusterRunner.java @@ -121,14 +121,14 @@ public void setup() throws Exception { "description2"); PipelineConfiguration mockPipelineConf = MockStages.createPipelineConfigurationSourceProcessorTargetHigherVersion(); mockPipelineConf.getConfiguration().add(new Config("executionMode", - ExecutionMode.CLUSTER.name())); + ExecutionMode.CLUSTER_BATCH.name())); mockPipelineConf.setUuid(pipelineConfiguration.getUuid()); pipelineStoreTask.save("user2", TestUtil.HIGHER_VERSION_PIPELINE, "0", "description" , mockPipelineConf); clusterHelper = new ClusterHelper(new MockSystemProcessFactory(), clusterProvider, tempDir, sparkManagerShell, emptyCL, emptyCL, null); - setExecMode(ExecutionMode.CLUSTER); + setExecMode(ExecutionMode.CLUSTER_BATCH); } @After @@ -159,24 +159,24 @@ public void testPipelinePrepareDataCollectorStart() throws Exception { Runner clusterRunner = createClusterRunner(); clusterRunner.prepareForDataCollectorStart(); assertEquals(PipelineStatus.EDITED, clusterRunner.getState().getStatus()); - pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.RUNNING, null, attributes, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.RUNNING, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); clusterRunner.prepareForDataCollectorStart(); assertEquals(PipelineStatus.DISCONNECTED, clusterRunner.getState().getStatus()); - pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.STARTING, null, attributes, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.STARTING, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); clusterRunner.prepareForDataCollectorStart(); assertEquals(PipelineStatus.DISCONNECTED, clusterRunner.getState().getStatus()); pipelineStateStore - .saveState("admin", NAME, "0", PipelineStatus.CONNECTING, null, attributes, ExecutionMode.CLUSTER, null, 0, 0); + .saveState("admin", NAME, "0", PipelineStatus.CONNECTING, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); clusterRunner.prepareForDataCollectorStart(); assertEquals(PipelineStatus.DISCONNECTED, clusterRunner.getState().getStatus()); - pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.STOPPING, null, attributes, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.STOPPING, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); clusterRunner.prepareForDataCollectorStart(); assertEquals(PipelineStatus.DISCONNECTED, clusterRunner.getState().getStatus()); - pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.STOPPED, null, attributes, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.STOPPED, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); clusterRunner.prepareForDataCollectorStart(); assertEquals(PipelineStatus.STOPPED, clusterRunner.getState().getStatus()); pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.RUNNING_ERROR, null, attributes, - ExecutionMode.CLUSTER, null, 0, 0); + ExecutionMode.CLUSTER_BATCH, null, 0, 0); try { clusterRunner.prepareForDataCollectorStart(); fail("Expected exception but didn't get any"); @@ -193,18 +193,18 @@ public void testMetricsInStore() throws Exception { stageLibraryTask, executorService, clusterHelper, new ResourceManager(conf), eventListenerManager); assertEquals("My_dummy_metrics", clusterRunner.getMetrics().toString()); assertNull(clusterRunner.getState().getMetrics()); - pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.RUNNING, null, attributes, ExecutionMode.CLUSTER, + pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.RUNNING, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); clusterRunner.prepareForDataCollectorStart(); assertEquals("\"My_dummy_metrics\"", clusterRunner.getState().getMetrics()); pipelineStateStore.saveState("admin", NAME, "0", PipelineStatus.CONNECTING, null, attributes, - ExecutionMode.CLUSTER, null, 0, 0); + ExecutionMode.CLUSTER_BATCH, null, 0, 0); clusterRunner.prepareForStart(); assertNull(clusterRunner.getState().getMetrics()); } private void setState(PipelineStatus status) throws Exception { - pipelineStateStore.saveState("admin", NAME, "0", status, null, attributes, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("admin", NAME, "0", status, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); } @Test @@ -361,23 +361,22 @@ public void testGetParallelism() throws PipelineException, StageException { ClusterRunner clusterRunner = (ClusterRunner) createClusterRunner(); ClusterSourceInfo clusterSourceInfo = clusterRunner.getClusterSourceInfo(NAME, REV, - MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER) // creates ClusterMSource + MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER_BATCH) // creates ClusterMSource // which // has parallelism 25 ); Assert.assertEquals(25, clusterSourceInfo.getParallelism()); - Assert.assertEquals("ClusterMSource", clusterSourceInfo.getClusterSourceName()); } @Test public void testPipelineWithValidationIssues() throws PipelineException, StageException { ClusterRunner clusterRunner = (ClusterRunner) createClusterRunner(); - pipelineStateStore.saveState("admin", NAME, REV, PipelineStatus.STARTING, null, attributes, ExecutionMode.CLUSTER, + pipelineStateStore.saveState("admin", NAME, REV, PipelineStatus.STARTING, null, attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); try { MockStages.ClusterMSource.MOCK_VALIDATION_ISSUES = true; clusterRunner.getClusterSourceInfo(NAME, REV, - MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER)); + MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER_BATCH)); fail("Expected PipelineRuntimeException but didn't get any"); } catch (PipelineRuntimeException pe) { assertEquals(ContainerError.CONTAINER_0800, pe.getErrorCode()); @@ -392,6 +391,8 @@ public void testPipelineWithValidationIssues() throws PipelineException, StageEx @Test(timeout = 20000) public void testLoadingUnsupportedPipeline() throws Exception { Runner runner = createClusterRunnerForUnsupportedPipeline(); + pipelineStateStore.saveState("admin", TestUtil.HIGHER_VERSION_PIPELINE, REV, PipelineStatus.EDITED, null, attributes, ExecutionMode.CLUSTER_BATCH, + null, 0, 0); runner.start(); while(runner.getState().getStatus() != PipelineStatus.START_ERROR) { Thread.sleep(100); @@ -404,7 +405,7 @@ public void testLoadingUnsupportedPipeline() throws Exception { @Test public void tesOnDataCollectorStartUnsupportedPipeline1() throws Exception { pipelineStateStore.saveState("admin", TestUtil.HIGHER_VERSION_PIPELINE, "0", PipelineStatus.STARTING, null, - attributes, ExecutionMode.CLUSTER, null, 0, 0); + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); Runner clusterRunner = createClusterRunnerForUnsupportedPipeline(); clusterRunner.prepareForDataCollectorStart(); clusterProvider.submitTimesOut = true; @@ -420,7 +421,7 @@ public void tesOnDataCollectorStartUnsupportedPipeline1() throws Exception { @Test public void tesOnDataCollectorStartUnsupportedPipeline2() throws Exception { pipelineStateStore.saveState("admin", TestUtil.HIGHER_VERSION_PIPELINE, "0", PipelineStatus.RUNNING, null, - attributes, ExecutionMode.CLUSTER, null, 0, 0); + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); Runner clusterRunner = createClusterRunnerForUnsupportedPipeline(); clusterRunner.prepareForDataCollectorStart(); clusterProvider.submitTimesOut = true; @@ -443,11 +444,23 @@ public void testRunningMaxPipelines() throws Exception { new LockCache()); pipelineStoreTask.init(); pipelineStoreTask.create("admin", "a", "some desc"); + pipelineStateStore.saveState("admin", "a", "0", PipelineStatus.EDITED, null, + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); pipelineStoreTask.create("admin", "b", "some desc"); + pipelineStateStore.saveState("admin", "b", "0", PipelineStatus.EDITED, null, + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); pipelineStoreTask.create("admin", "c", "some desc"); + pipelineStateStore.saveState("admin", "c", "0", PipelineStatus.EDITED, null, + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); pipelineStoreTask.create("admin", "d", "some desc"); + pipelineStateStore.saveState("admin", "d", "0", PipelineStatus.EDITED, null, + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); pipelineStoreTask.create("admin", "e", "some desc"); + pipelineStateStore.saveState("admin", "e", "0", PipelineStatus.EDITED, null, + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); pipelineStoreTask.create("admin", "f", "some desc"); + pipelineStateStore.saveState("admin", "f", "0", PipelineStatus.EDITED, null, + attributes, ExecutionMode.CLUSTER_BATCH, null, 0, 0); //Only one runner can start pipeline at the max since the runner thread pool size is 3 Runner runner1 = createClusterRunner("a", pipelineStoreTask, resourceManager); diff --git a/container/src/test/java/com/streamsets/datacollector/execution/store/TestPipelineStateStore.java b/container/src/test/java/com/streamsets/datacollector/execution/store/TestPipelineStateStore.java index a40759dd39d..4c5b4925957 100644 --- a/container/src/test/java/com/streamsets/datacollector/execution/store/TestPipelineStateStore.java +++ b/container/src/test/java/com/streamsets/datacollector/execution/store/TestPipelineStateStore.java @@ -142,7 +142,7 @@ public void setUp() throws IOException { } private PipelineConfiguration createPipeline(UUID uuid) { - PipelineConfiguration pc = MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER); + PipelineConfiguration pc = MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER_BATCH); pc.setUuid(uuid); return pc; } @@ -169,7 +169,7 @@ public void testCreatePipeline() throws Exception { assertEquals("user3", pipelineState.getUser()); assertEquals("name1", pipelineState.getName()); assertEquals("0", pipelineState.getRev()); - assertEquals(ExecutionMode.CLUSTER, pipelineState.getExecutionMode()); + assertEquals(ExecutionMode.CLUSTER_BATCH, pipelineState.getExecutionMode()); pc0 = pipelineStoreTask.load("name1", "0"); pc0 = createPipeline(pc0.getUuid()); @@ -228,9 +228,9 @@ public void stateHistory() throws Exception { @Test public void stateChangeExecutionMode() throws Exception { - pipelineStateStore.saveState("user1", "aaa", "0", PipelineStatus.STOPPED, "Pipeline stopped", null, ExecutionMode.CLUSTER, null, 0, 0); + pipelineStateStore.saveState("user1", "aaa", "0", PipelineStatus.STOPPED, "Pipeline stopped", null, ExecutionMode.CLUSTER_BATCH, null, 0, 0); PipelineState pipelineState = pipelineStateStore.getState("aaa", "0"); - assertEquals(ExecutionMode.CLUSTER, pipelineState.getExecutionMode()); + assertEquals(ExecutionMode.CLUSTER_BATCH, pipelineState.getExecutionMode()); pipelineStateStore.saveState("user1", "aaa", "0", PipelineStatus.STOPPED, "Pipeline stopped", null, ExecutionMode.STANDALONE, null, 0, 0); pipelineState = pipelineStateStore.getState("aaa", "0"); assertEquals(ExecutionMode.STANDALONE, pipelineState.getExecutionMode()); diff --git a/container/src/test/java/com/streamsets/datacollector/metrics/TestMetricsAggregation.java b/container/src/test/java/com/streamsets/datacollector/metrics/TestMetricsAggregation.java index 3b80cf7d365..1d0ff39089b 100644 --- a/container/src/test/java/com/streamsets/datacollector/metrics/TestMetricsAggregation.java +++ b/container/src/test/java/com/streamsets/datacollector/metrics/TestMetricsAggregation.java @@ -76,7 +76,7 @@ public void setup() throws Exception { pipelineStateStore = Mockito.mock(PipelineStateStore.class); Mockito.when(pipelineStateStore.getState(Matchers.anyString(), Matchers.anyString())).thenReturn( new PipelineStateImpl("aaa", "samplePipeline", "1.0.0", - PipelineStatus.RUNNING, "The pipeline is not running", System.currentTimeMillis(), null, ExecutionMode.CLUSTER, null, 0, 0) + PipelineStatus.RUNNING, "The pipeline is not running", System.currentTimeMillis(), null, ExecutionMode.CLUSTER_BATCH, null, 0, 0) ); diff --git a/container/src/test/java/com/streamsets/datacollector/restapi/TestUtil.java b/container/src/test/java/com/streamsets/datacollector/restapi/TestUtil.java index 1dad821f7ef..7209057fa51 100644 --- a/container/src/test/java/com/streamsets/datacollector/restapi/TestUtil.java +++ b/container/src/test/java/com/streamsets/datacollector/restapi/TestUtil.java @@ -138,13 +138,13 @@ public static StageLibraryTask createMockStageLibrary() { StageDefinition sourceDef = new StageDefinition( MOCK_LIB_DEF, false, TSource.class, "source", 1, "label", "description", StageType.SOURCE, false, true, true, configDefs, null/*raw source definition*/, "", null, false ,1, - null, Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default()); + null, Arrays.asList(ExecutionMode.CLUSTER_BATCH, ExecutionMode.STANDALONE), false, new StageUpgrader.Default(), Collections.emptyList()); StageDefinition targetDef = new StageDefinition( MOCK_LIB_DEF, false, TTarget.class, "target", 1, "label", "description", StageType.TARGET, false, true, true, Collections.emptyList(), null/*raw source definition*/, - "TargetIcon.svg", null, false, 0, null, Arrays.asList(ExecutionMode.CLUSTER, + "TargetIcon.svg", null, false, 0, null, Arrays.asList(ExecutionMode.CLUSTER_BATCH, ExecutionMode.STANDALONE), false, - new StageUpgrader.Default()); + new StageUpgrader.Default(), Collections.emptyList()); Mockito.when(lib.getStage(Mockito.eq("library"), Mockito.eq("source"), Mockito.eq(false))) .thenReturn(sourceDef); Mockito.when(lib.getStage(Mockito.eq("library"), Mockito.eq("target"), Mockito.eq(false))) diff --git a/container/src/test/java/com/streamsets/datacollector/runner/MockStages.java b/container/src/test/java/com/streamsets/datacollector/runner/MockStages.java index 819fff15de7..c9b3f04e703 100644 --- a/container/src/test/java/com/streamsets/datacollector/runner/MockStages.java +++ b/container/src/test/java/com/streamsets/datacollector/runner/MockStages.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.streamsets.datacollector.cluster.ClusterModeConstants; import com.streamsets.datacollector.config.ConfigDefinition; import com.streamsets.datacollector.config.ModelDefinition; import com.streamsets.datacollector.config.ModelType; @@ -398,6 +399,16 @@ public void write(Batch batch) throws StageException { public static StageLibraryTask createStageLibrary(ClassLoader cl) { return new MockStageLibraryTask.Builder(cl).build(); } + + public static StageLibraryTask createClusterStreamingStageLibrary(ClassLoader cl) { + return new MockStageLibraryTask.ClusterStreamingBuilder(cl).build(); + } + + public static StageLibraryTask createClusterBatchStageLibrary(ClassLoader cl) { + return new MockStageLibraryTask.ClusterBatchBuilder(cl).build(); + } + + public static StageLibraryTask createStageLibrary() { return createStageLibrary(Thread.currentThread().getContextClassLoader()); } @@ -506,13 +517,13 @@ public Builder(ClassLoader cl) { false, MSource.class, "sourceName", 1, "sourceLabel", "sourceDesc", StageType.SOURCE, false, true, true, Collections.emptyList(), rawSourceDefinition, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList()); StageDefinition socDef = new StageDefinition(createLibraryDef(cl), false, MSourceOffsetCommitter.class, "sourceOffsetCommitterName", 1, "sourceOffsetCommitterLabel", "sourceDesc", StageType.SOURCE, false, true, true, Collections.emptyList(), null/*raw source definition*/, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default() + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList() ); StageDefinition pDef = new StageDefinition(createLibraryDef(cl), @@ -520,7 +531,7 @@ public Builder(ClassLoader cl) { "sourceDescription", StageType.PROCESSOR, false, true, true, Collections.emptyList(), null/*raw source definition*/, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList()); ModelDefinition m = new ModelDefinition(ModelType.FIELD_SELECTOR_MULTI_VALUE, null, Collections.emptyList(), Collections.emptyList(), null, null); @@ -533,7 +544,7 @@ public Builder(ClassLoader cl) { false, MTarget.class, "targetName", 1, "targetLabel", "targetDesc", StageType.TARGET, false, true, true, Arrays.asList(stageReqField), null/*raw source definition*/, "", null, false, 0, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default() + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList() ); ConfigDefinition reqField = new ConfigDefinition( @@ -546,7 +557,7 @@ public Builder(ClassLoader cl) { false, MTarget.class, "targetWithReqField", 1, "targetWithReqField", "targetWithReqField", StageType.TARGET, false, true, true, Arrays.asList(reqField), null/*raw source definition*/, "", null, false, 0, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default() + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList() ); //error target configurations @@ -560,7 +571,7 @@ public Builder(ClassLoader cl) { false, ETarget.class, "errorTarget", 1, "errorTarget", "Error Target", StageType.TARGET, true, false, true, Arrays.asList(errorTargetConf), null/*raw source definition*/, "", null, false, 0, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default() + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList() ); ConfigDefinition depConfDef = new ConfigDefinition( @@ -579,25 +590,25 @@ public Builder(ClassLoader cl) { false, MSource.class, "sourceWithConfigsName", 1, "sourceWithConfigsLabel", "sourceWithConfigsDesc", StageType.SOURCE, false, true, true, Lists.newArrayList(depConfDef, triggeredConfDef), null/*raw source definition*/, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList()); StageDefinition clusterStageDef = new StageDefinition(createLibraryDef(cl), false, ClusterMSource.class, "clusterSource", 1, "clusterSourceLabel", "clusterSourceDesc", StageType.SOURCE, false, true, true, Collections.emptyList(), null, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER), false, new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList()); StageDefinition clusterLibraryStageDef = new StageDefinition(createLibraryDef(cl), false, ClusterMSource.class, "clusterLibrarySource", 1, "clusterSourceLabel", "clusterSourceDesc", StageType.SOURCE, false, true, true, Collections.emptyList(), null, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList()); StageDefinition commonLibraryTargetDef = new StageDefinition(createLibraryDef(cl), false, MTarget.class, "commonLibraryTarget", 1, "commonLibraryTargetLabel", "commonLibraryTargetDesc", StageType.TARGET, false, true, true, Collections.emptyList(), null, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList()); ConfigDefinition regularConf = new ConfigDefinition( "regularConfName", ConfigDef.Type.NUMBER, "regularConfLabel", "regularConfDesc", 10, true, @@ -620,7 +631,7 @@ public Builder(ClassLoader cl) { false, ComplexSource.class, "complexStageName", 1, "complexStageLabel", "complexStageDesc", StageType.SOURCE, false, true, true, Lists.newArrayList(complexConf), null/*raw source definition*/, "", null, false, 1, null, - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default()); + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH), false, new StageUpgrader.Default(), Collections.emptyList()); StageDefinition[] stageDefs = new StageDefinition[] { sDef, socDef, pDef, tDef, targetWithReqField, swcDef, eDef, clusterStageDef, complexStage, @@ -651,7 +662,7 @@ public Builder overrideClass(String name, Class klass) { oldDef.hasOnRecordError(), oldDef.getConfigDefinitions(), oldDef.getRawSourceDefinition(), oldDef.getIcon(), oldDef.getConfigGroupDefinition(), oldDef.isVariableOutputStreams(), oldDef.getOutputStreams(), oldDef.getOutputStreamLabelProviderClass(), - Arrays.asList(ExecutionMode.CLUSTER, ExecutionMode.STANDALONE), false, new StageUpgrader.Default() + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE), false, new StageUpgrader.Default(), Collections.emptyList() ); stages.put(name, newDef); } else { @@ -664,6 +675,91 @@ public StageLibraryTask build() { return new MockStageLibraryTask(stages.values()); } } + + public static RawSourceDefinition getRawSourceDefinition() { + ConfigDefinition brokerHostConfig = new ConfigDefinition("brokerHost", ConfigDef.Type.STRING, "brokerHost", "", + "", true, "", "brokerHost", null, "", null, 10, Collections.emptyList(), + Collections.emptyList(), 0, 0, + "", 0, Collections.emptyList(), ConfigDef.Evaluation.IMPLICIT, Collections.>emptyMap()); + ConfigDefinition brokerPortConfig = new ConfigDefinition("brokerPort", ConfigDef.Type.NUMBER, "brokerPort", "", + "", true, "", "brokerPort", null, "", null, 10, Collections.emptyList(), + Collections.emptyList(), 0, 0, + "", 0, Collections.emptyList(), ConfigDef.Evaluation.IMPLICIT, Collections.>emptyMap()); + + RawSourceDefinition rawSourceDefinition = new RawSourceDefinition(MockRawSourcePreviewer.class.getName(), "*/*", + Arrays.asList(brokerHostConfig, brokerPortConfig)); + return rawSourceDefinition; + } + + public static StageDefinition getErrorStageDefinition(ClassLoader cl) { + //error target configurations + ConfigDefinition errorTargetConf = new ConfigDefinition( + "errorTargetConfName", ConfigDef.Type.STRING, "errorTargetConfLabel", "errorTargetConfDesc", + "/SDC_HOME/errorDir", true, "groupName", "errorTargetConfFieldName", null, "", null , 0, + Collections.emptyList(), Collections.emptyList(), Long.MIN_VALUE, Long.MAX_VALUE, "text/plain", 0, + Collections. emptyList(), ConfigDef.Evaluation.IMPLICIT, null); + + StageDefinition errorTargetStageDef = new StageDefinition(createLibraryDef(cl), + false, ETarget.class, "errorTarget", 1, "errorTarget", + "Error Target", StageType.TARGET, true, false, true, + Arrays.asList(errorTargetConf), null/*raw source definition*/, "", null, false, 0, null, + Arrays.asList(ExecutionMode.CLUSTER_STREAMING, ExecutionMode.CLUSTER_BATCH, ExecutionMode.STANDALONE), false, new StageUpgrader.Default(), Collections.emptyList() + ); + return errorTargetStageDef; + } + + public static class ClusterStreamingBuilder { + private final StageDefinition clusterStageDef; + private final StageDefinition errorTargetStageDef; + + public ClusterStreamingBuilder() { + this(Thread.currentThread().getContextClassLoader()); + } + + public ClusterStreamingBuilder(ClassLoader cl) { + clusterStageDef = + new StageDefinition(createLibraryDef(cl), false, MSource.class, "sourceName", 1, "sourceLabel", "sourceDesc", + StageType.SOURCE, false, true, true, Collections. emptyList(), getRawSourceDefinition(), + "", null, false, 1, null, Arrays.asList(ExecutionMode.CLUSTER_BATCH, ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE), false, + new StageUpgrader.Default(), Arrays.asList(ClusterModeConstants.SPARK_KAFKA_JAR_REGEX)); + errorTargetStageDef = getErrorStageDefinition(cl); + } + + public StageLibraryTask build() { + return new MockStageLibraryTask(ImmutableList.of(clusterStageDef, errorTargetStageDef)); + } + } + + public static class ClusterBatchBuilder { + private final StageDefinition clusterStageDef; + private final StageDefinition errorTargetStageDef; + + public ClusterBatchBuilder() { + this(Thread.currentThread().getContextClassLoader()); + } + + public ClusterBatchBuilder(ClassLoader cl) { + clusterStageDef = + new StageDefinition(createLibraryDef(cl), false, MSource.class, "sourceName", 1, "sourceLabel", "sourceDesc", + StageType.SOURCE, false, true, true, Collections. emptyList(), getRawSourceDefinition(), + "", null, false, 1, null, Arrays.asList(ExecutionMode.CLUSTER_BATCH, ExecutionMode.STANDALONE), false, + new StageUpgrader.Default(), Arrays.asList(ClusterModeConstants.AVRO_JAR_REGEX, ClusterModeConstants.AVRO_MAPRED_JAR_REGEX)); + errorTargetStageDef = getErrorStageDefinition(cl); + } + + public StageLibraryTask build() { + return new MockStageLibraryTask(ImmutableList.of(clusterStageDef, errorTargetStageDef)); + } + } + } + + @SuppressWarnings("unchecked") + public static List getSourceStageConfig() { + StageConfiguration source = new StageConfiguration("s", "default", "sourceName", 1, + Collections.emptyList(), null, Collections.emptyList(), ImmutableList.of("s")); + List stages = new ArrayList<>(); + stages.add(source); + return stages; } public static void resetStageCaptures() { diff --git a/container/src/test/java/com/streamsets/datacollector/validation/TestPipelineConfigurationValidator.java b/container/src/test/java/com/streamsets/datacollector/validation/TestPipelineConfigurationValidator.java index f67f83a8311..20ce9db36f3 100644 --- a/container/src/test/java/com/streamsets/datacollector/validation/TestPipelineConfigurationValidator.java +++ b/container/src/test/java/com/streamsets/datacollector/validation/TestPipelineConfigurationValidator.java @@ -104,7 +104,7 @@ public void testExecutionModes() { Assert.assertTrue(validator.getIssues().hasIssues()); // cluster only stage can preview and run as cluster - conf = MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER); + conf = MockStages.createPipelineConfigurationWithClusterOnlyStage(ExecutionMode.CLUSTER_BATCH); validator = new PipelineConfigurationValidator(lib, "name", conf); Assert.assertFalse(validator.validate().getIssues().hasIssues()); Assert.assertTrue(validator.canPreview()); diff --git a/dev-support/ci-run-all-libs.sh b/dev-support/ci-run-all-libs.sh index 79e71f34a9c..e2587d125a4 100755 --- a/dev-support/ci-run-all-libs.sh +++ b/dev-support/ci-run-all-libs.sh @@ -53,11 +53,11 @@ git show /opt/scripts/docker-delete-stopped-containers.sh || true /opt/scripts/docker-delete-local-images.sh || true # compile and install -mvn clean install -Pdist,all-libs,ui,rpm,miniIT -Drelease -Dtest=DoesNotExist -DfailIfNoTests=false +mvn clean install -U -Pdist,all-libs,ui,rpm,miniIT -Drelease -Dtest=DoesNotExist -DfailIfNoTests=false # package and run tests (if appropiate) set +e export JAVA_HOME=${TEST_JVM} -mvn package -fae -Pdist,ui,rpm,miniIT -Dmaven.main.skip=true -DlastModGranularityMs=604800000 ${TEST_OPTS[@]} +mvn package -U -fae -Pdist,ui,rpm,miniIT -Dmaven.main.skip=true -DlastModGranularityMs=604800000 ${TEST_OPTS[@]} exitCode=$? if [[ -n $PUBLISH_SDC_TAR ]] && [[ $exitCode -eq 0 ]] then diff --git a/kafka_source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/kafka/KafkaDSource.java b/kafka_source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/kafka/KafkaDSource.java index 0f34f5801f1..0f791e678c3 100644 --- a/kafka_source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/kafka/KafkaDSource.java +++ b/kafka_source-protolib/src/main/java/com/streamsets/pipeline/stage/origin/kafka/KafkaDSource.java @@ -17,6 +17,7 @@ */ package com.streamsets.pipeline.stage.origin.kafka; +import com.streamsets.pipeline.api.ExecutionMode; import com.streamsets.pipeline.api.ListBeanModel; import com.streamsets.pipeline.api.ConfigDef; import com.streamsets.pipeline.api.ConfigGroups; @@ -52,6 +53,8 @@ version = 2, label = "Kafka Consumer", description = "Reads data from Kafka", + execution = {ExecutionMode.CLUSTER_STREAMING, ExecutionMode.STANDALONE}, + libJarsRegex = {"spark-streaming-kafka.*"}, icon = "kafka.png", recordsByRef = true, upgrader = KafkaSourceUpgrader.class diff --git a/miniIT/src/test/java/com/streamsets/datacollector/flume/cluster/TestKafkaToFlume.java b/miniIT/src/test/java/com/streamsets/datacollector/flume/cluster/TestKafkaToFlume.java index 53921043f7b..498bb771fa9 100644 --- a/miniIT/src/test/java/com/streamsets/datacollector/flume/cluster/TestKafkaToFlume.java +++ b/miniIT/src/test/java/com/streamsets/datacollector/flume/cluster/TestKafkaToFlume.java @@ -135,7 +135,6 @@ private static String getPipelineJson() throws Exception { pipelineJson = pipelineJson.replaceAll("localhost:9092", KafkaTestUtil.getMetadataBrokerURI()); pipelineJson = pipelineJson.replaceAll("localhost:2181", KafkaTestUtil.getZkConnect()); pipelineJson = pipelineJson.replaceAll("localhost:9050", "localhost:" + flumePort); - pipelineJson = pipelineJson.replaceAll("STANDALONE", "CLUSTER"); return pipelineJson; } diff --git a/miniIT/src/test/java/com/streamsets/datacollector/hdfs/cluster/TestKafkaToHDFS.java b/miniIT/src/test/java/com/streamsets/datacollector/hdfs/cluster/TestKafkaToHDFS.java index c4b59e77fe6..4569e14d0da 100644 --- a/miniIT/src/test/java/com/streamsets/datacollector/hdfs/cluster/TestKafkaToHDFS.java +++ b/miniIT/src/test/java/com/streamsets/datacollector/hdfs/cluster/TestKafkaToHDFS.java @@ -131,8 +131,6 @@ private static String getPipelineJson() throws Exception { pipelineJson = pipelineJson.replace("topicName", TOPIC); pipelineJson = pipelineJson.replaceAll("localhost:9092", KafkaTestUtil.getMetadataBrokerURI()); pipelineJson = pipelineJson.replaceAll("localhost:2181", KafkaTestUtil.getZkConnect()); - pipelineJson = pipelineJson.replaceAll("STANDALONE", "CLUSTER"); - pipelineJson = pipelineJson.replaceAll("/uri", miniDFS.getURI().toString()); return pipelineJson; } diff --git a/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginMultiPartition.java b/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginMultiPartition.java index 860f9d294a7..6556d3105b5 100644 --- a/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginMultiPartition.java +++ b/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginMultiPartition.java @@ -71,7 +71,7 @@ private static String getPipelineJson() throws Exception { pipelineJson = pipelineJson.replace("topicName", TOPIC); pipelineJson = pipelineJson.replaceAll("localhost:9092", KafkaTestUtil.getMetadataBrokerURI()); pipelineJson = pipelineJson.replaceAll("localhost:2181", KafkaTestUtil.getZkConnect()); - pipelineJson = pipelineJson.replaceAll("STANDALONE", "CLUSTER"); + pipelineJson = pipelineJson.replaceAll("STANDALONE", "CLUSTER_STREAMING"); return pipelineJson; } diff --git a/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginSinglePartition.java b/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginSinglePartition.java index 5abe27859f9..cefc0fe024e 100644 --- a/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginSinglePartition.java +++ b/miniIT/src/test/java/com/streamsets/datacollector/kafka/cluster/TestKafkaOriginSinglePartition.java @@ -68,7 +68,7 @@ private static String getPipelineJson() throws Exception { pipelineJson = pipelineJson.replace("topicName", TOPIC); pipelineJson = pipelineJson.replaceAll("localhost:9092", KafkaTestUtil.getMetadataBrokerURI()); pipelineJson = pipelineJson.replaceAll("localhost:2181", KafkaTestUtil.getZkConnect()); - pipelineJson = pipelineJson.replaceAll("STANDALONE", "CLUSTER"); + pipelineJson = pipelineJson.replaceAll("STANDALONE", "CLUSTER_STREAMING"); return pipelineJson; } diff --git a/miniIT/src/test/resources/cluster_kafka_flume.json b/miniIT/src/test/resources/cluster_kafka_flume.json index 5c72888b246..a91ee13e938 100644 --- a/miniIT/src/test/resources/cluster_kafka_flume.json +++ b/miniIT/src/test/resources/cluster_kafka_flume.json @@ -4,7 +4,7 @@ "description" : "", "configuration" : [ { "name" : "executionMode", - "value" : "CLUSTER" + "value" : "CLUSTER_STREAMING" }, { "name" : "clusterSlaveMemory", "value" : 1024 diff --git a/miniIT/src/test/resources/cluster_kafka_hdfs.json b/miniIT/src/test/resources/cluster_kafka_hdfs.json index 530c34912ae..7f142685a4e 100644 --- a/miniIT/src/test/resources/cluster_kafka_hdfs.json +++ b/miniIT/src/test/resources/cluster_kafka_hdfs.json @@ -4,7 +4,7 @@ "description" : "", "configuration" : [ { "name" : "executionMode", - "value" : "CLUSTER" + "value" : "CLUSTER_STREAMING" }, { "name" : "clusterSlaveMemory", "value" : 1024 diff --git a/miniIT/src/test/resources/cluster_pipeline.json b/miniIT/src/test/resources/cluster_pipeline.json index f9a943f1d17..0ce7ea99da7 100644 --- a/miniIT/src/test/resources/cluster_pipeline.json +++ b/miniIT/src/test/resources/cluster_pipeline.json @@ -4,7 +4,7 @@ "description" : "", "configuration" : [ { "name" : "executionMode", - "value" : "CLUSTER" + "value" : "CLUSTER_STREAMING" }, { "name" : "clusterSlaveMemory", "value" : 1024 diff --git a/spark-bootstrap/src/main/java/com/streamsets/pipeline/spark/SparkStreamingBinding.java b/spark-bootstrap/src/main/java/com/streamsets/pipeline/spark/SparkStreamingBinding.java index 50df407a8fa..a4fc7c1fc4b 100644 --- a/spark-bootstrap/src/main/java/com/streamsets/pipeline/spark/SparkStreamingBinding.java +++ b/spark-bootstrap/src/main/java/com/streamsets/pipeline/spark/SparkStreamingBinding.java @@ -41,7 +41,7 @@ public class SparkStreamingBinding implements ClusterBinding { private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingBinding.class); private JavaStreamingContext ssc; - private Properties properties; + private final Properties properties; public SparkStreamingBinding(Properties properties) { this.properties = Utils.checkNotNull(properties, "Properties"); @@ -70,12 +70,8 @@ public void run() { }; Runtime.getRuntime().addShutdownHook(shutdownHookThread); LOG.info("Making calls through spark context "); - if ("kafka".equalsIgnoreCase(getProperty("cluster.source.name"))) { - JavaPairInputDStream dStream = createDirectStreamForKafka(); - dStream.foreachRDD(new SparkDriverFunction()); - } else { - throw new IllegalStateException("Property value " + getProperty("cluster.source.name") + " is invalid"); - } + JavaPairInputDStream dStream = createDirectStreamForKafka(); + dStream.foreachRDD(new SparkDriverFunction()); ssc.start(); }