Skip to content

Commit

Permalink
SDC-1715 Make cluster mode generic
Browse files Browse the repository at this point in the history
Change-Id: I13a03fbe04da3bb2d25f6e387aaed30b6fa4b939
Reviewed-on: https://review.streamsets.net/1357
Tested-by: StreamSets CI <[email protected]>
Reviewed-by: Virag Kothari <[email protected]>
  • Loading branch information
viragkothari committed Sep 18, 2015
1 parent fbd923f commit 6ec038d
Show file tree
Hide file tree
Showing 39 changed files with 337 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
#
cluster.jar.blacklist.regex_com.streamsets.pipeline.stage.origin.kafka.KafkaDSource=^spark-streaming-kafka.*

execution.mode_com.streamsets.pipeline.stage.origin.kafka.KafkaDSource=CLUSTER
execution.mode_com.streamsets.pipeline.stage.origin.kafka.KafkaDSource=CLUSTER_STREAMING
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.streamsets.pipeline.config.LogModeChooserValues;
import com.streamsets.pipeline.configurablestage.DClusterSourceOffsetCommitter;
import com.streamsets.pipeline.lib.parser.log.RegExConfig;

import java.util.List;
import java.util.Map;

Expand All @@ -45,7 +46,8 @@
version = 2,
label = "Hadoop FS",
description = "Reads data from Hadoop file system",
execution = ExecutionMode.CLUSTER,
execution = ExecutionMode.CLUSTER_BATCH,
libJarsRegex = {"avro-\\d+.*", "avro-mapred.*"},
icon = "hdfs.png",
privateClassLoader = true,
upgrader = ClusterHdfsSourceUpgrader.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,20 @@ Stage<Source.Context> createStage() {

@Override
public String getName() {
initializeClusterSource();
return clusterSource.getName();
if (initializeClusterSource()) {
return clusterSource.getName();
} else {
return null;
}
}

@Override
public boolean isInBatchMode() {
initializeClusterSource();
return clusterSource.isInBatchMode();
if (initializeClusterSource()) {
return clusterSource.isInBatchMode();
} else {
return false;
}
}

/**
Expand All @@ -63,8 +69,9 @@ public boolean isInBatchMode() {
*/
@Override
public void put(List<Map.Entry> batch) throws InterruptedException {
initializeClusterSource();
clusterSource.put(batch);
if (initializeClusterSource()) {
clusterSource.put(batch);
}
}

private boolean initializeClusterSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ public class ClusterModeConstants {
public static final String USER_LIBS = "user-libs";

public static final String NUM_EXECUTORS_KEY = "num-executors";
public static final String CLUSTER_SOURCE_NAME = "cluster.source.name";
public static final String CLUSTER_PIPELINE_NAME = "cluster.pipeline.name";
public static final String CLUSTER_PIPELINE_REV = "cluster.pipeline.rev";
public static final String CLUSTER_PIPELINE_USER = "cluster.pipeline.user";

public static final String SPARK_KAFKA_JAR_PREFIX = "spark-streaming-kafka";
public static final String CLUSTER_SOURCE_BATCHMODE = "cluster.source.batchmode";
public static final String AVRO_MAPRED_JAR_PREFIX = "avro-mapred";
public static final String SPARK_KAFKA_JAR_REGEX = "spark-streaming-kafka.*";
public static final String AVRO_MAPRED_JAR_REGEX = "avro-mapred.*";
public static final String AVRO_JAR_REGEX = "avro-\\d+.*";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.streamsets.datacollector.cluster;

import static com.streamsets.datacollector.definition.StageLibraryDefinitionExtractor.DATA_COLLECTOR_LIBRARY_PROPERTIES;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
Expand Down Expand Up @@ -90,9 +89,9 @@
public class ClusterProviderImpl implements ClusterProvider {
static final Pattern YARN_APPLICATION_ID_REGEX = Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)");
static final Pattern NO_VALID_CREDENTIALS = Pattern.compile("(No valid credentials provided.*)");
private static final String CLUSTER_TYPE = "CLUSTER_TYPE";
private static final String CLUSTER_TYPE_SPARK = "spark";
private static final String CLUSTER_TYPE_MAPREDUCE = "mr";
public static final String CLUSTER_TYPE = "CLUSTER_TYPE";
public static final String CLUSTER_TYPE_SPARK = "spark";
public static final String CLUSTER_TYPE_MAPREDUCE = "mr";
private static final String CLUSTER_TYPE_YARN = "yarn";
private static final String KERBEROS_AUTH = "KERBEROS_AUTH";
private static final String KERBEROS_KEYTAB = "KERBEROS_KEYTAB";
Expand Down Expand Up @@ -410,10 +409,7 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce
// order is important here as we don't want error stage
// configs overriding source stage configs
String clusterToken = UUID.randomUUID().toString();
ClusterOrigin clusterOrigin = null;
String pathToSparkKafkaJar = null;
String pathToAvroMapredJar = null;
String pathToAvroJar = null;
List<String> jarsToShip = new ArrayList<String>();
List<Issue> errors = new ArrayList<>();
PipelineBean pipelineBean = PipelineBeanCreator.get().create(false, stageLibrary, pipelineConfiguration, errors);
if (!errors.isEmpty()) {
Expand All @@ -424,9 +420,10 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce
for (StageBean stageBean : pipelineBean.getStages()) {
pipelineConfigurations.add(stageBean.getConfiguration());
}
boolean isBatch = false;
for (StageConfiguration stageConf : pipelineConfigurations.build()) {
StageDefinition stageDef = stageLibrary.getStage(stageConf.getLibrary(), stageConf.getStageName(),
false);
false);
if (stageConf.getInputLanes().isEmpty()) {
for (Config conf : stageConf.getConfiguration()) {
if (conf.getValue() != null) {
Expand Down Expand Up @@ -458,37 +455,29 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce
}
}
}
// find the spark-kafka jar
for (URL jarUrl : ((URLClassLoader)stageDef.getStageClassLoader()).getURLs()) {
File jarFile = new File(jarUrl.getPath());
if (jarFile.getName().startsWith(ClusterModeConstants.SPARK_KAFKA_JAR_PREFIX)) {
pathToSparkKafkaJar = jarFile.getAbsolutePath();
}
}

for (URL jarUrl : ((URLClassLoader) stageDef.getStageClassLoader()).getURLs()) {
File jarFile = new File(jarUrl.getPath());
if (jarFile.getName().startsWith(ClusterModeConstants.AVRO_MAPRED_JAR_PREFIX)) {
pathToAvroMapredJar = jarFile.getAbsolutePath();
}
ExecutionMode executionMode = PipelineBeanCreator.get().getExecutionMode(pipelineConfiguration, new ArrayList<Issue>());
if (executionMode == ExecutionMode.CLUSTER_BATCH) {
isBatch = true;
} else if (executionMode == ExecutionMode.CLUSTER_STREAMING) {
isBatch = false;
} else {
throw new IllegalStateException(Utils.format("Unsupported execution mode '{}' for cluster mode",
executionMode));
}
for (URL jarUrl : ((URLClassLoader) stageDef.getStageClassLoader()).getURLs()) {
File jarFile = new File(jarUrl.getPath());
Pattern pattern = Pattern.compile(ClusterModeConstants.AVRO_JAR_REGEX);
Matcher matcher = pattern.matcher(jarFile.getName());
if (matcher.matches()) {
pathToAvroJar = jarFile.getAbsolutePath();
List<String> libJarsRegex = stageDef.getLibJarsRegex();
if (!libJarsRegex.isEmpty()) {
for (URL jarUrl : ((URLClassLoader) stageDef.getStageClassLoader()).getURLs()) {
File jarFile = new File(jarUrl.getPath());
for (String libJar : libJarsRegex) {
Pattern pattern = Pattern.compile(libJar);
Matcher matcher = pattern.matcher(jarFile.getName());
if (matcher.matches()) {
jarsToShip.add(jarFile.getAbsolutePath());
}
}
}
}
}
try {
clusterOrigin = ClusterOrigin.valueOf(Strings.nullToEmpty(sourceInfo.get(ClusterModeConstants.
CLUSTER_SOURCE_NAME)).toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException ex) {
String msg = Utils.format("Illegal value '{}' for '{}'", sourceInfo.
get(ClusterModeConstants.CLUSTER_SOURCE_NAME), ClusterModeConstants.CLUSTER_SOURCE_NAME);
throw new IllegalArgumentException(msg, ex);
}
String type = StageLibraryUtils.getLibraryType(stageDef.getStageClassLoader());
String name = StageLibraryUtils.getLibraryName(stageDef.getStageClassLoader());
if (ClusterModeConstants.STREAMSETS_LIBS.equals(type)) {
Expand All @@ -500,10 +489,6 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce
throw new IllegalStateException(Utils.format("Error unknown stage library type: '{}'", type));
}
}
if (clusterOrigin == ClusterOrigin.KAFKA) {
Utils.checkState(pathToSparkKafkaJar != null, "Could not find spark kafka jar");
}

LOG.info("stagingDir = '{}'", stagingDir);
LOG.info("bootstrapDir = '{}'", bootstrapDir);
LOG.info("etcDir = '{}'", etcDir);
Expand Down Expand Up @@ -554,18 +539,13 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce
String msg = errorString("serializing etc directory: {}", ex);
throw new RuntimeException(msg, ex);
}
boolean isBatch = Boolean.parseBoolean(Utils.checkNotNull(sourceInfo.get(
ClusterModeConstants.CLUSTER_SOURCE_BATCHMODE), ClusterModeConstants.CLUSTER_SOURCE_BATCHMODE)
.trim().toLowerCase(Locale.ENGLISH));
File bootstrapJar = getBootstrapJar(new File(bootstrapDir, "main"), "streamsets-datacollector-bootstrap");
File clusterBootstrapJar = getBootstrapJar(new File(bootstrapDir, "spark"),
"streamsets-datacollector-spark-bootstrap");
File log4jProperties = new File(stagingDir, "log4j.properties");
InputStream clusterLog4jProperties = null;
try {
if (isBatch) {
Utils.checkState(pathToAvroJar != null, "Could not find avro jar");
Utils.checkState(pathToAvroMapredJar != null, "Could not find avro-mapred jar");
clusterLog4jProperties = Utils.checkNotNull(getClass().getResourceAsStream("/cluster-mr-log4j.properties"),
"Cluster Log4J Properties");
} else {
Expand Down Expand Up @@ -593,15 +573,14 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce
args = generateMRArgs(clusterManager.getAbsolutePath(), String.valueOf(config.clusterSlaveMemory),
config.clusterSlaveJavaOpts, libsTarGz.getAbsolutePath(), etcTarGz.getAbsolutePath(),
resourcesTarGz.getAbsolutePath(), log4jProperties.getAbsolutePath(), bootstrapJar.getAbsolutePath(),
sdcPropertiesFile.getAbsolutePath(), clusterBootstrapJar.getAbsolutePath(), pathToAvroJar,
pathToAvroMapredJar);
sdcPropertiesFile.getAbsolutePath(), clusterBootstrapJar.getAbsolutePath(), jarsToShip);
} else {
LOG.info("Submitting Spark Job");
environment.put(CLUSTER_TYPE, CLUSTER_TYPE_SPARK);
args = generateSparkArgs(clusterManager.getAbsolutePath(), String.valueOf(config.clusterSlaveMemory),
config.clusterSlaveJavaOpts, numExecutors, libsTarGz.getAbsolutePath(), etcTarGz.getAbsolutePath(),
resourcesTarGz.getAbsolutePath(), log4jProperties.getAbsolutePath(), bootstrapJar.getAbsolutePath(),
pathToSparkKafkaJar, clusterBootstrapJar.getAbsolutePath());
jarsToShip, clusterBootstrapJar.getAbsolutePath());
}
SystemProcess process = systemProcessFactory.create(ClusterProviderImpl.class.getSimpleName(), outputDir, args);
LOG.info("Starting: " + process);
Expand Down Expand Up @@ -656,7 +635,7 @@ private ApplicationState startPipelineInternal(SystemProcessFactory systemProce
private List<String> generateMRArgs(String clusterManager, String slaveMemory, String javaOpts,
String libsTarGz, String etcTarGz, String resourcesTarGz, String log4jProperties,
String bootstrapJar, String sdcPropertiesFile,
String clusterBootstrapJar, String pathToAvroJar, String pathToAvroMapredJar) {
String clusterBootstrapJar, List<String> jarsToShip) {
List<String> args = new ArrayList<>();
args.add(clusterManager);
args.add("start");
Expand All @@ -668,15 +647,20 @@ private List<String> generateMRArgs(String clusterManager, String slaveMemory, S
args.add("-D");
args.add("mapreduce.job.log4j-properties-file=" + log4jProperties);
args.add("-libjars");
args.add(Joiner.on(",").join(bootstrapJar, pathToAvroMapredJar, pathToAvroJar));
String libJarString = bootstrapJar;
for (String jarToShip: jarsToShip) {
libJarString = libJarString + "," + jarToShip;
}
args.add(libJarString);
args.add(sdcPropertiesFile);
args.add(Joiner.on(" ").join(String.format("-Xmx%sm", slaveMemory), javaOpts,
"-javaagent:./" + (new File(bootstrapJar)).getName()));
return args;
}

private List<String> generateSparkArgs(String clusterManager, String slaveMemory, String javaOpts,
String numExecutors, String libsTarGz, String etcTarGz, String resourcesTarGz,
String log4jProperties, String bootstrapJar, String pathToSparkKafkaJar,
String log4jProperties, String bootstrapJar, List<String> jarsToShip,
String clusterBootstrapJar) {
List<String> args = new ArrayList<>();
args.add(clusterManager);
Expand All @@ -702,7 +686,11 @@ private List<String> generateSparkArgs(String clusterManager, String slaveMemory
args.add("--files");
args.add(log4jProperties);
args.add("--jars");
args.add(Joiner.on(",").skipNulls().join(bootstrapJar, pathToSparkKafkaJar));
String libJarString = bootstrapJar;
for (String jarToShip: jarsToShip) {
libJarString = libJarString + "," + jarToShip;
}
args.add(libJarString);
// use our javaagent and java opt configs
args.add("--conf");
args.add("spark.executor.extraJavaOptions=" + Joiner.on(" ").join("-javaagent:./" + (new File(bootstrapJar)).getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
public class ExecutionModeChooserValues extends BaseEnumChooserValues {

public ExecutionModeChooserValues() {
super(ExecutionMode.STANDALONE, ExecutionMode.CLUSTER);
super(ExecutionMode.STANDALONE, ExecutionMode.CLUSTER_BATCH, ExecutionMode.CLUSTER_STREAMING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,18 @@ public class StageDefinition {
private final boolean preconditions;
private final boolean onRecordError;
private final RawSourceDefinition rawSourceDefinition;
private List<ConfigDefinition> configDefinitions;
private Map<String, ConfigDefinition> configDefinitionsMap;
private final List<ConfigDefinition> configDefinitions;
private final Map<String, ConfigDefinition> configDefinitionsMap;
private final String icon;
private final ConfigGroupDefinition configGroupDefinition;
private final boolean variableOutputStreams;
private final int outputStreams;
private final String outputStreamLabelProviderClass;
private List<String> outputStreamLabels;
private List<ExecutionMode> executionModes;
private final List<ExecutionMode> executionModes;
private final boolean recordsByRef;
private final StageUpgrader upgrader;
private final List<String> libJarsRegex;

// localized version
private StageDefinition(StageLibraryDefinition libraryDefinition, boolean privateClassLoader, ClassLoader classLoader,
Expand All @@ -66,7 +67,7 @@ private StageDefinition(StageLibraryDefinition libraryDefinition, boolean privat
boolean onRecordError, List<ConfigDefinition> configDefinitions, RawSourceDefinition rawSourceDefinition,
String icon, ConfigGroupDefinition configGroupDefinition, boolean variableOutputStreams, int outputStreams,
List<String> outputStreamLabels, List<ExecutionMode> executionModes, boolean recordsByRef,
StageUpgrader upgrader) {
StageUpgrader upgrader, List<String> libJarsRegex) {
this.libraryDefinition = libraryDefinition;
this.privateClassLoader = privateClassLoader;
this.classLoader = classLoader;
Expand Down Expand Up @@ -103,6 +104,8 @@ private StageDefinition(StageLibraryDefinition libraryDefinition, boolean privat
this.executionModes = executionModes;
this.recordsByRef = recordsByRef;
this.upgrader = upgrader;
this.libJarsRegex = libJarsRegex;

}

public StageDefinition(StageDefinition def, ClassLoader classLoader) {
Expand Down Expand Up @@ -133,6 +136,7 @@ public StageDefinition(StageDefinition def, ClassLoader classLoader) {
executionModes = def.executionModes;
recordsByRef = def.recordsByRef;
upgrader = def.upgrader;
libJarsRegex = def.libJarsRegex;
}

public StageDefinition(StageLibraryDefinition libraryDefinition, boolean privateClassLoader, Class klass,
Expand All @@ -141,7 +145,7 @@ public StageDefinition(StageLibraryDefinition libraryDefinition, boolean private
List<ConfigDefinition> configDefinitions, RawSourceDefinition rawSourceDefinition, String icon,
ConfigGroupDefinition configGroupDefinition, boolean variableOutputStreams, int outputStreams,
String outputStreamLabelProviderClass, List<ExecutionMode> executionModes, boolean recordsByRef,
StageUpgrader upgrader) {
StageUpgrader upgrader, List<String> libJarsRegex) {
this.libraryDefinition = libraryDefinition;
this.privateClassLoader = privateClassLoader;
this.classLoader = libraryDefinition.getClassLoader();
Expand Down Expand Up @@ -177,6 +181,7 @@ public StageDefinition(StageLibraryDefinition libraryDefinition, boolean private
this.executionModes = executionModes;
this.recordsByRef = recordsByRef;
this.upgrader = upgrader;
this.libJarsRegex = libJarsRegex;
}

public List<ExecutionMode> getLibraryExecutionModes() {
Expand Down Expand Up @@ -298,6 +303,10 @@ public List<ExecutionMode> getExecutionModes() {
return executionModes;
}

public List<String> getLibJarsRegex() {
return libJarsRegex;
}

public boolean getRecordsByRef() {
return recordsByRef;
}
Expand Down Expand Up @@ -400,7 +409,7 @@ public StageDefinition localize() {
getVersion(), label, description, getType(), isErrorStage(),
hasPreconditions(), hasOnRecordError(), configDefs, rawSourceDef, getIcon(), groupDefs,
isVariableOutputStreams(), getOutputStreams(), streamLabels, executionModes,
recordsByRef, upgrader);
recordsByRef, upgrader, libJarsRegex);
}

private List<String> _getOutputStreamLabels(ClassLoader classLoader, boolean localized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public class PipelineConfigBean implements Stage {
displayPosition = 100,
group = "CLUSTER",
dependsOn = "executionMode",
triggeredByValue = "CLUSTER"
triggeredByValue = {"CLUSTER_BATCH", "CLUSTER_STREAMING"}
)
public long clusterSlaveMemory;

Expand All @@ -184,7 +184,7 @@ public class PipelineConfigBean implements Stage {
displayPosition = 110,
group = "CLUSTER",
dependsOn = "executionMode",
triggeredByValue = "CLUSTER"
triggeredByValue = {"CLUSTER_BATCH", "CLUSTER_STREAMING"}
)
public String clusterSlaveJavaOpts;

Expand All @@ -198,7 +198,7 @@ public class PipelineConfigBean implements Stage {
displayPosition = 120,
group = "CLUSTER",
dependsOn = "executionMode",
triggeredByValue = "CLUSTER"
triggeredByValue = {"CLUSTER_BATCH", "CLUSTER_STREAMING"}
)
public Map clusterLauncherEnv;

Expand Down
Loading

0 comments on commit 6ec038d

Please sign in to comment.