Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

FALCON-2168 Add functionality to user to rerun with user's own lib paths to an instance #288

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
4 changes: 0 additions & 4 deletions cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,6 @@ public void entityCommand(CommandLine commandLine, FalconClient client) throws I
validateNotEmpty(filePath, FILE_PATH_OPT);
validateColo(optionsList);
result = client.validate(entityType, filePath, skipDryRun, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, userProps).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT);
colo = getColo(colo);
Expand Down
29 changes: 17 additions & 12 deletions cli/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Set;

import static org.apache.falcon.client.FalconCLIConstants.LIB_OPT;
import static org.apache.falcon.client.FalconCLIConstants.RUNNING_OPT;
import static org.apache.falcon.client.FalconCLIConstants.RUNNING_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.LIST_OPT;
Expand All @@ -53,7 +54,7 @@
import static org.apache.falcon.client.FalconCLIConstants.LOG_OPT;
import static org.apache.falcon.client.FalconCLIConstants.LOG_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.PARARMS_OPT;
import static org.apache.falcon.client.FalconCLIConstants.PARARMS_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.PARAMS_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.LISTING_OPT;
import static org.apache.falcon.client.FalconCLIConstants.LISTING_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.DEPENDENCY_OPT_DESCRIPTION;
Expand All @@ -69,8 +70,8 @@
import static org.apache.falcon.client.FalconCLIConstants.RUNID_OPT;
import static org.apache.falcon.client.FalconCLIConstants.CLUSTERS_OPT;
import static org.apache.falcon.client.FalconCLIConstants.CLUSTERS_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.SOURCECLUSTER_OPT;
import static org.apache.falcon.client.FalconCLIConstants.SOURCECLUSTER_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.SOURCE_CLUSTER_OPT;
import static org.apache.falcon.client.FalconCLIConstants.SOURCE_CLUSTER_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.FILE_PATH_OPT;
import static org.apache.falcon.client.FalconCLIConstants.FILE_PATH_OPT_DESCRIPTION;
import static org.apache.falcon.client.FalconCLIConstants.TYPE_OPT;
Expand Down Expand Up @@ -130,7 +131,7 @@ public Options createInstanceOptions() {
Option resume = new Option(RESUME_OPT, false, RESUME_OPT_DESCRIPTION);
Option rerun = new Option(RERUN_OPT, false, RERUN_OPT_DESCRIPTION);
Option logs = new Option(LOG_OPT, false, LOG_OPT_DESCRIPTION);
Option params = new Option(PARARMS_OPT, false, PARARMS_OPT_DESCRIPTION);
Option params = new Option(PARARMS_OPT, false, PARAMS_OPT_DESCRIPTION);
Option listing = new Option(LISTING_OPT, false, LISTING_OPT_DESCRIPTION);
Option dependency = new Option(DEPENDENCY_OPT, false, DEPENDENCY_OPT_DESCRIPTION);
Option triage = new Option(TRIAGE_OPT, false, TRIAGE_OPT_DESCRIPTION);
Expand All @@ -156,9 +157,9 @@ public Options createInstanceOptions() {
Option url = new Option(URL_OPTION, true, URL_OPTION_DESCRIPTION);
Option start = new Option(START_OPT, true, START_OPT_DESCRIPTION);
Option end = new Option(END_OPT, true, END_OPT_DESCRIPTION);
Option runid = new Option(RUNID_OPT, true, RUNID_OPT_DESCRIPTION);
Option runId = new Option(RUNID_OPT, true, RUNID_OPT_DESCRIPTION);
Option clusters = new Option(CLUSTERS_OPT, true, CLUSTERS_OPT_DESCRIPTION);
Option sourceClusters = new Option(SOURCECLUSTER_OPT, true, SOURCECLUSTER_OPT_DESCRIPTION);
Option sourceClusters = new Option(SOURCE_CLUSTER_OPT, true, SOURCE_CLUSTER_OPT_DESCRIPTION);
Option filePath = new Option(FILE_PATH_OPT, true, FILE_PATH_OPT_DESCRIPTION);
Option entityType = new Option(TYPE_OPT, true, TYPE_OPT_DESCRIPTION);
Option entityName = new Option(ENTITY_NAME_OPT, true, ENTITY_NAME_OPT_DESCRIPTION);
Expand All @@ -177,7 +178,7 @@ public Options createInstanceOptions() {

Option allAttempts = new Option(ALL_ATTEMPTS, false, ALL_ATTEMPTS_DESCRIPTION);
Option instanceStatus = new Option(FalconCLIConstants.INSTANCE_STATUS_OPT, true, "Instance status");
Option nameSubsequence = new Option(FalconCLIConstants.NAMESEQ_OPT, true, "Subsequence of entity name");
Option nameSubSequence = new Option(FalconCLIConstants.NAMESEQ_OPT, true, "SubSequence of entity name");
Option tagKeywords = new Option(FalconCLIConstants.TAGKEYS_OPT, true, "Keywords in tags");

instanceOptions.addOption(url);
Expand All @@ -187,7 +188,7 @@ public Options createInstanceOptions() {
instanceOptions.addOption(filePath);
instanceOptions.addOption(entityType);
instanceOptions.addOption(entityName);
instanceOptions.addOption(runid);
instanceOptions.addOption(runId);
instanceOptions.addOption(clusters);
instanceOptions.addOption(sourceClusters);
instanceOptions.addOption(colo);
Expand All @@ -202,7 +203,7 @@ public Options createInstanceOptions() {
instanceOptions.addOption(debug);
instanceOptions.addOption(instanceTime);
instanceOptions.addOption(instanceStatus);
instanceOptions.addOption(nameSubsequence);
instanceOptions.addOption(nameSubSequence);
instanceOptions.addOption(tagKeywords);
instanceOptions.addOption(allAttempts);

Expand All @@ -221,14 +222,15 @@ public void instanceCommand(CommandLine commandLine, FalconClient client) throws
String instanceTime = commandLine.getOptionValue(INSTANCE_TIME_OPT);
String start = commandLine.getOptionValue(FalconCLIConstants.START_OPT);
String end = commandLine.getOptionValue(FalconCLIConstants.END_OPT);
String lib = commandLine.getOptionValue(FalconCLIConstants.LIB_OPT);
String status = commandLine.getOptionValue(FalconCLIConstants.INSTANCE_STATUS_OPT);
String nameSubsequence = commandLine.getOptionValue(FalconCLIConstants.NAMESEQ_OPT);
String tagKeywords = commandLine.getOptionValue(FalconCLIConstants.TAGKEYS_OPT);
String filePath = commandLine.getOptionValue(FalconCLIConstants.FILE_PATH_OPT);
String runId = commandLine.getOptionValue(RUNID_OPT);
String colo = commandLine.getOptionValue(FalconCLIConstants.COLO_OPT);
String clusters = commandLine.getOptionValue(CLUSTERS_OPT);
String sourceClusters = commandLine.getOptionValue(SOURCECLUSTER_OPT);
String sourceClusters = commandLine.getOptionValue(SOURCE_CLUSTER_OPT);
List<LifeCycle> lifeCycles = getLifeCycle(commandLine.getOptionValue(LIFECYCLE_OPT));
String filterBy = commandLine.getOptionValue(FalconCLIConstants.FILTER_BY_OPT);
String orderBy = commandLine.getOptionValue(FalconCLIConstants.ORDER_BY_OPT);
Expand All @@ -244,6 +246,9 @@ public void instanceCommand(CommandLine commandLine, FalconClient client) throws
if (!optionsList.contains(SEARCH_OPT)) {
validateInstanceCommands(optionsList, entity, type, colo);
}
if (optionsList.contains(LIB_OPT)) {
validateNotEmpty(lib, LIB_OPT);
}

if (optionsList.contains(TRIAGE_OPT)) {
validateNotEmpty(colo, FalconCLIConstants.COLO_OPT);
Expand Down Expand Up @@ -300,7 +305,7 @@ public void instanceCommand(CommandLine commandLine, FalconClient client) throws
isForced = true;
}
result = ResponseHelper.getString(client.rerunInstances(type, entity, start, end, filePath, colo,
clusters, sourceClusters, lifeCycles, isForced, doAsUser));
clusters, sourceClusters, lifeCycles, isForced, doAsUser, lib));
} else if (optionsList.contains(LOG_OPT)) {
ValidationUtil.validateOrderBy(orderBy, instanceAction);
ValidationUtil.validateFilterBy(filterBy, instanceAction);
Expand Down Expand Up @@ -341,7 +346,7 @@ private void validateInstanceCommands(Set<String> optionsList,
}
}

if (optionsList.contains(SOURCECLUSTER_OPT)) {
if (optionsList.contains(SOURCE_CLUSTER_OPT)) {
if (optionsList.contains(RUNNING_OPT)
|| optionsList.contains(LOG_OPT)
|| optionsList.contains(FalconCLIConstants.STATUS_OPT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,14 @@ public abstract InstancesResult resumeInstances(String type, String entity, Stri
* process.
* @param isForced <optional param> can be used to forcefully rerun the entire instance.
* @param doAsUser proxy user
* @param lib can be used to rerun the instance with comma separated lib paths.
* @return Results of the rerun command.
* @throws IOException
*/
public abstract InstancesResult rerunInstances(String type, String entity, String start, String end,
String filePath, String colo, String clusters,
String sourceClusters, List<LifeCycle> lifeCycles, Boolean isForced,
String doAsUser) throws IOException;
String doAsUser, String lib) throws IOException;

/**
* Get summary of instance/instances of an entity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ private FalconCLIConstants(){
+ " process in the range start time and optional end time";
public static final String RERUN_OPT_DESCRIPTION = "Reruns process instances for a given process in the"
+ " range start time and optional end time and overrides properties present in job.properties file";
public static final String LIB_OPT_DESCRIPTION = "List of comma separated lib paths to be used for the rerun "
+ "of the given instances";
public static final String LOG_OPT_DESCRIPTION = "Logs print the logs for process instances for a given"
+ " process in the range start time and optional end time";
public static final String PARARMS_OPT_DESCRIPTION = "Displays the workflow parameters for a given instance"
public static final String PARAMS_OPT_DESCRIPTION = "Displays the workflow parameters for a given instance"
+ " of specified nominal time start time represents nominal time and end time is not considered";
public static final String LISTING_OPT_DESCRIPTION = "Displays feed listing and their status between a"
+ " start and end time range.";
Expand All @@ -145,7 +147,7 @@ private FalconCLIConstants(){
+ "runid, defaults to 0";
public static final String CLUSTERS_OPT_DESCRIPTION = "clusters is optional for commands kill, suspend and "
+ "resume, should not be specified for other commands";
public static final String SOURCECLUSTER_OPT_DESCRIPTION = " source cluster is optional for commands kill, "
public static final String SOURCE_CLUSTER_OPT_DESCRIPTION = " source cluster is optional for commands kill, "
+ "suspend and resume, should not be specified for other commands (required for only feed)";
public static final String FILE_PATH_OPT_DESCRIPTION = "Path to job.properties file is required for rerun "
+ "command, it should contain name=value pair for properties to override for rerun";
Expand All @@ -168,9 +170,10 @@ private FalconCLIConstants(){
public static final String RUNNING_OPT = "running";
public static final String KILL_OPT = "kill";
public static final String RERUN_OPT = "rerun";
public static final String LIB_OPT = "lib";
public static final String LOG_OPT = "logs";
public static final String CLUSTERS_OPT = "clusters";
public static final String SOURCECLUSTER_OPT = "sourceClusters";
public static final String SOURCE_CLUSTER_OPT = "sourceClusters";
public static final String LIFECYCLE_OPT = "lifecycle";
public static final String PARARMS_OPT = "params";
public static final String LISTING_OPT = "listing";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class FalconClient extends AbstractFalconClient {


public static final String PATH = "path";
public static final String LIB = "lib";
public static final String COLO = "colo";
private static final String KEY = "key";
private static final String VALUE = "value";
Expand Down Expand Up @@ -632,7 +633,7 @@ public InstancesResult resumeInstances(String type, String entity, String start,
public InstancesResult rerunInstances(String type, String entity, String start,
String end, String filePath, String colo,
String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
Boolean isForced, String doAsUser) throws IOException {
Boolean isForced, String doAsUser, String lib) throws IOException {

StringBuilder buffer = new StringBuilder();
if (filePath != null) {
Expand All @@ -653,7 +654,7 @@ public InstancesResult rerunInstances(String type, String entity, String start,
ClientResponse clientResponse = new ResourceBuilder().path(Instances.RERUN.path, type, entity)
.addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo)
.addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(FORCE, isForced)
.addQueryParam(USER, doAsUser).call(Instances.RERUN, props);
.addQueryParam(USER, doAsUser).addQueryParam(LIB, lib).call(Instances.RERUN, props);
return getResponse(InstancesResult.class, clientResponse);
}

Expand Down
1 change: 1 addition & 0 deletions docs/src/site/twiki/falconcli/RerunInstance.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED.
If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun
If one wants to pass specific lib paths to be used for rerunning specified instances, -lib should be passed along with -rerun passing comma separated lib paths with -lib option.
Additionally, you can also specify properties to override via a properties file.

Usage:
Expand Down
3 changes: 2 additions & 1 deletion docs/src/site/twiki/restapi/InstanceRerun.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Rerun instances of an entity. On issuing a rerun, by default the execution resum
* lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
* force <optional param> can be used to forcefully rerun the entire instance.
* doAs <optional query param> allows the current user to impersonate the user passed in doAs when interacting with the Falcon system.
* lib <optional query param> allows the user to rerun the instance with specified comma separated lib paths.

---++ Results
Results of the rerun command.
Expand Down Expand Up @@ -44,7 +45,7 @@ POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&star
</verbatim>

<verbatim>
POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2013-04-03T07:00Z&end=2014-04-03T07:00Z&force=true&doAs=joe
POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2013-04-03T07:00Z&end=2014-04-03T07:00Z&force=true&doAs=joe&lib=lib1,lib2
</verbatim>
---+++ Result
<verbatim>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.oozie.client.OozieClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -908,15 +909,18 @@ private InstancesResult.WorkflowStatus getProcessInstanceStatus(Process process,

public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr,
HttpServletRequest request, String colo, List<LifeCycle> lifeCycles,
Boolean isForced) {
Boolean isForced, String lib) {
Properties props = getProperties(request);
return reRunInstance(type, entity, startStr, endStr, props, colo, lifeCycles, isForced);
return reRunInstance(type, entity, startStr, endStr, props, colo, lifeCycles, isForced, lib);
}

public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, Properties props,
String colo, List<LifeCycle> lifeCycles, Boolean isForced) {
String colo, List<LifeCycle> lifeCycles, Boolean isForced, String lib) {
checkColo(colo);
checkType(type);
if (StringUtils.isNotBlank(lib)) {
props.put(OozieClient.LIBPATH, lib);
}
if (StartupProperties.isServerInSafeMode()) {
throwSafemodeException("RERUN");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ protected InstancesResult doExecute(String colo) throws FalconException {
* @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for
* process.
* @param isForced <optional param> can be used to forcefully rerun the entire instance.
* @param lib <optional param> can be used to pass specific lib paths to rerun the instance.
* @return Results of the rerun command.
*/
@POST
Expand All @@ -535,14 +536,15 @@ public InstancesResult reRunInstance(
@Context HttpServletRequest request,
@Dimension("colo") @QueryParam("colo") String colo,
@Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
@Dimension("force") @QueryParam("force") final Boolean isForced) {
@Dimension("force") @QueryParam("force") final Boolean isForced,
@Dimension("lib") @QueryParam("lib") final String lib) {

final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("reRunInstance",
type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles, isForced);
type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles, isForced, lib);
}
}.execute(colo, type, entity);
}
Expand Down
Loading