Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

FALCON-2342 testDeRegistration timeout fixed using waitFor function #416

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.oozie.client.WorkflowJob;
import org.joda.time.DateTime;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -73,6 +75,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
private static DAGEngine mockDagEngine;
private static Process process;
private volatile boolean failed = false;
private static final Logger LOG = LoggerFactory.getLogger(SchedulerServiceTest.class);

@BeforeMethod
public void setup() throws FalconException {
Expand Down Expand Up @@ -269,7 +272,9 @@ public void testDeRegistration() throws Exception {
stateStore.putExecutionInstance(new InstanceState(instance3));
scheduler.unregister(handler, instance3.getId());

Thread.sleep(100);
waitFor(500,instance1);
waitFor(500,instance2);
waitFor(500,instance3);
Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
// Second instance should not run.
Expand Down Expand Up @@ -351,5 +356,48 @@ public PRIORITY getPriority() {
return PRIORITY.MEDIUM;
}
}

/**
* Wait for a condition, expressed via a evaluation of instance to become true.
*
* @param timeout maximum time in milliseconds to wait for the instance to become true.
* @param instance instance waiting on.
* @return the waited time.
*/
public long waitFor(int timeout, ExecutionInstance instance) {
long started = System.currentTimeMillis();
long mustEnd = System.currentTimeMillis() + timeout;
long lastEcho = 0;

try {
long waiting = mustEnd - System.currentTimeMillis();
LOG.info("Waiting up to [{}] msec", waiting);
while (!(evaluate(instance)) && System.currentTimeMillis() < mustEnd) {
if ((System.currentTimeMillis() - lastEcho) > 40) {
waiting = mustEnd - System.currentTimeMillis();
LOG.info("Waiting up to [{}] msec", waiting);
lastEcho = System.currentTimeMillis();
}
Thread.sleep(50);
}
if (!evaluate(instance)) {
LOG.info("Waiting timed out after [{}] msec", timeout);
}
return System.currentTimeMillis() - started;
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}

public boolean evaluate(ExecutionInstance instance) {
try {
((MockDAGEngine) mockDagEngine).getTotalRuns(instance).equals(1);
return true;
}
catch(Exception ex) {
return false;
}
}
}