-
Notifications
You must be signed in to change notification settings - Fork 9
Added test for Issue #18 that verifies each partition gets its own PUD #63
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
* Copyright 2016 International Business Machines Corp. | ||
* | ||
* See the NOTICE file distributed with this work for additional information | ||
* regarding copyright ownership. Licensed under the Apache License, | ||
* Version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.ibm.jbatch.tck.artifacts.specialized; | ||
|
||
import java.io.Serializable; | ||
|
||
import javax.batch.api.BatchProperty; | ||
import javax.batch.api.chunk.AbstractItemReader; | ||
import javax.batch.runtime.context.JobContext; | ||
import javax.batch.runtime.context.StepContext; | ||
import javax.inject.Inject; | ||
|
||
import com.ibm.jbatch.tck.artifacts.basicchunk.BasicItem; | ||
|
||
/*NOTE: Code for this class is taken substantially from basicchunk.BasicReader*/ | ||
@javax.inject.Named("PUDPartitionReader") | ||
public class PUDPartitionReader extends AbstractItemReader { | ||
|
||
@Inject | ||
JobContext jobCtx; | ||
|
||
@Inject | ||
StepContext stepCtx; | ||
|
||
@Inject @BatchProperty(name="partition.number") | ||
String partitionNumber; | ||
|
||
@Inject @BatchProperty(name="execution.number") | ||
String executionNumber; | ||
|
||
@Inject | ||
@BatchProperty(name = "number.of.items.to.be.read") | ||
String injectedNumberOfItemsToBeRead; | ||
//Default: read 10 items | ||
private int numberOfItemsToBeRead = 10; | ||
|
||
@Inject | ||
@BatchProperty(name = "throw.reader.exception.for.these.items") | ||
String injectedThrowReaderExceptionForTheseItems; | ||
//Default: don't throw any exceptions | ||
private int[] throwReaderExceptionForTheseItems = {}; | ||
|
||
private String PUDString; | ||
private int currentItemId = -1; | ||
private BasicItem currentItem = null; | ||
|
||
@Override | ||
public void open(Serializable checkpoint) { | ||
|
||
PUDString = "PUD for Partition: " +partitionNumber; | ||
|
||
// Set on the first execution; on later executions it will need to be obtained | ||
// from the job repository's persistent store. | ||
if (checkpoint == null) { | ||
stepCtx.setPersistentUserData(PUDString); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed this since if you're going to set it every time doesn't it render the check in readItem() less interesting in exec #2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a dup of the previous comment which I thought had been lost. |
||
if (injectedNumberOfItemsToBeRead != null) { | ||
numberOfItemsToBeRead = Integer.parseInt(injectedNumberOfItemsToBeRead); | ||
} | ||
|
||
if (injectedThrowReaderExceptionForTheseItems != null) { | ||
String[] exceptionsStringArray = injectedThrowReaderExceptionForTheseItems.split(","); | ||
throwReaderExceptionForTheseItems = new int[exceptionsStringArray.length]; | ||
for (int i = 0; i < exceptionsStringArray.length; i++) { | ||
throwReaderExceptionForTheseItems[i] = Integer.parseInt(exceptionsStringArray[i]); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public BasicItem readItem() throws Exception { | ||
|
||
if (executionNumber.equals("2")) { | ||
if (!stepCtx.getPersistentUserData().equals(PUDString)) { | ||
throw new Exception("BadPersistentUserData: PUD for partition "+partitionNumber+" is expected to be "+PUDString+", but found "+stepCtx.getPersistentUserData()); | ||
} | ||
} | ||
|
||
//Code below is take from BasicReader | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe encapsulate all in another method... not essential, just a thought. |
||
|
||
/* Note that BasicReader has no concept of rolling back after a retryable exception is thrown. | ||
* Example: chunk size is 2, we plan to read 10 items (#0-#9), but a retryable exception is thrown while reading item #1 | ||
* In this case, the reader goes on to read #2 when it should be rolling back to #0, and so the writer will never receive | ||
* #0, even though it was previously read successfully */ | ||
|
||
currentItemId++; | ||
|
||
if (currentItemId < numberOfItemsToBeRead) { | ||
currentItem = new BasicItem(currentItemId); | ||
if (readerExceptionShouldBeThrownForCurrentItem()) { | ||
//set the job exit status so we can determine which exception was last thrown | ||
jobCtx.setExitStatus("Exception:Item#" + currentItem.getId()); | ||
throw new Exception("Exception thrown for item " + currentItem.getId()); | ||
} | ||
currentItem.setRead(true); | ||
return currentItem; | ||
} | ||
|
||
return null; | ||
} | ||
|
||
private boolean readerExceptionShouldBeThrownForCurrentItem() { | ||
|
||
for (int i: throwReaderExceptionForTheseItems) { | ||
if (currentItem.getId()==i) { return true; } | ||
} | ||
|
||
return false; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/** | ||
* Copyright 2016 International Business Machines Corp. | ||
* | ||
* See the NOTICE file distributed with this work for additional information | ||
* regarding copyright ownership. Licensed under the Apache License, | ||
* Version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.ibm.jbatch.tck.artifacts.specialized; | ||
|
||
import javax.batch.api.partition.AbstractPartitionReducer; | ||
import javax.batch.runtime.context.StepContext; | ||
import javax.inject.Inject; | ||
|
||
@javax.inject.Named("PUDPartitionReducer") | ||
public class PUDPartitionReducer extends AbstractPartitionReducer { | ||
|
||
@Inject | ||
StepContext stepCtx; | ||
|
||
public final String TOP_LEVEL_PUD = "This is the Persistent User Data for the top-level stepCtx of the partitioned step!"; | ||
|
||
@Override | ||
public void beginPartitionedStep() throws Exception { | ||
stepCtx.setPersistentUserData(TOP_LEVEL_PUD); | ||
} | ||
|
||
@Override | ||
public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception { | ||
if (!stepCtx.getPersistentUserData().equals(TOP_LEVEL_PUD)) { | ||
throw new Exception("Unexpected PUD at the top level of the Partitioned Step!"); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import org.testng.annotations.BeforeMethod; | ||
import org.testng.annotations.Test; | ||
|
||
import com.ibm.jbatch.tck.ann.*; | ||
import com.ibm.jbatch.tck.utils.JobOperatorBridge; | ||
import com.ibm.jbatch.tck.utils.TCKJobExecutionWrapper; | ||
|
||
|
@@ -598,6 +599,72 @@ public void testPartitionedMapperOverrideTrueSamePartitionNumOnRestart() throws | |
} | ||
} | ||
|
||
@TCKTest( | ||
versions = {"1.1.WORKING"}, | ||
assertions = {"Each partition of a partitioned step has its own unique Persistent User Data."}, | ||
specRefs = { | ||
@SpecRef( | ||
version = "1.0RevA", section = "9.4.1.1", | ||
citations = "For a partitioned step, there is one StepContext for the parent step/thread; there is a distinct StepContext for each sub-thread " | ||
+ "and each StepContext has its own distinct persistent user data for each sub-thread.", | ||
notes = "See 2. StepContext" | ||
), | ||
@SpecRef( | ||
version = "1.0", section = "10.9.2", | ||
citations = "The setPersistentUserData method stores a persistent data object into the current step. [...] This data is saved as part of a step's " | ||
+ "checkpoint. [...] It is available upon restart.", | ||
notes = "APIRef for StepContext" | ||
) | ||
}, | ||
apiRefs = { @APIRef(className="javax.batch.runtime.context.StepContext", methodNames={"setPersistentUserData", "getPersistentUserData"}) }, | ||
issueRefs = {"https://github.com/WASdev/standards.jsr352.tck/issues/18"}, | ||
strategy = "See comments in the code for this test" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious.. did you just thing it reads better visually or was it all the backslashing that steered you to doing it this way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the last discussion we had about the annotations was that we should make them smaller, and that the test strategy aka implementation details don't need to show up in the TCK test coverage report |
||
) | ||
@Test | ||
@org.junit.Test | ||
public void testPartitionedStepPersistentUserData() throws Exception { | ||
String METHOD = "testPartitionedStepPersistentUserData"; | ||
begin(METHOD); | ||
|
||
try { | ||
/* Test Strategy: | ||
* - Job is made up of one chunk step, split into two partitions | ||
* - Each partition will process 3 items | ||
* - The item-count for the step is set to 1, so each partition should checkpoint after each | ||
* item it reads. | ||
* | ||
* - During Job Execution #1, set the PUD for each partition while reading item 1 | ||
* fail on purpose while reading item 2 (now that the partitions have already check-pointed) | ||
* - During Job Execution #2, check that the PUD has been persisted from previous job execution | ||
* - We also check that the partition level PUDs do not bubble up into the top-level of the step | ||
* by setting and verifying a top-level PUD with the PartitionReducer | ||
*/ | ||
|
||
Properties jobParams = new Properties(); | ||
|
||
//Job Execution #1 | ||
jobParams.setProperty("execution.number", "1"); | ||
jobParams.setProperty("number.of.items.to.be.read", "3"); | ||
jobParams.setProperty("throw.reader.exception.for.these.items", "1"); //The second item | ||
|
||
Reporter.log("Locate job XML file: partitioned_step_persistent_user_data.xml<p>"); | ||
Reporter.log("Invoke startJobAndWaitForResult<p>"); | ||
JobExecution jobExec1 = jobOp.startJobAndWaitForResult("partitioned_step_persistent_user_data", jobParams); | ||
assertWithMessage("Expected job execution 1 to fail", BatchStatus.FAILED, jobExec1.getBatchStatus()); | ||
|
||
//Job Execution #2 | ||
jobParams.setProperty("execution.number", "2"); | ||
jobParams.setProperty("throw.reader.exception.for.these.items", ""); | ||
|
||
Reporter.log("Invoke restartJobAndWaitForResult<p>"); | ||
JobExecution jobExec2 = jobOp.restartJobAndWaitForResult(jobExec1.getExecutionId(), jobParams); | ||
assertWithMessage("Expected job execution 2 to complete", BatchStatus.COMPLETED, jobExec2.getBatchStatus()); | ||
|
||
} catch (Exception e) { | ||
handleException(METHOD, e); | ||
} | ||
} | ||
|
||
private static void handleException(String methodName, Exception e) throws Exception { | ||
Reporter.log("Caught exception: " + e.getMessage()+"<p>"); | ||
Reporter.log(methodName + " failed<p>"); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- Copyright 2016 International Business Machines Corp. See the NOTICE file distributed with this work | ||
for additional information regarding copyright ownership. Licensed under the Apache License, Version | ||
2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain | ||
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law | ||
or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language | ||
governing permissions and limitations under the License. --> | ||
<job id="partitioned_step_persistent_user_data" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> | ||
<step id="step1"> | ||
<chunk item-count="1"> | ||
<reader ref="PUDPartitionReader"> | ||
<properties> | ||
<property name="execution.number" value="#{jobParameters['execution.number']}"/> | ||
<property name="number.of.items.to.be.read" value="#{jobParameters['number.of.items.to.be.read']}"/> | ||
<property name="throw.reader.exception.for.these.items" value="#{jobParameters['throw.reader.exception.for.these.items']}"/> | ||
<property name="partition.number" value="#{partitionPlan['partition.number']}"/> | ||
</properties> | ||
</reader> | ||
<processor ref="basicProcessor"/> | ||
<writer ref="basicWriter"/> | ||
</chunk> | ||
<partition> | ||
<plan partitions="2"> | ||
<properties partition="0"> | ||
<property name="partition.number" value="0"/> | ||
</properties> | ||
<properties partition="1"> | ||
<property name="partition.number" value="1"/> | ||
</properties> | ||
</plan> | ||
<reducer ref="PUDPartitionReducer"/> | ||
</partition> | ||
</step> | ||
</job> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initializing the PUDString in open() assures partitionNumber will have been injected. But if you're going to set it on the restart too it renders the later check not as interesting. You probably want to guard the set with if (checkpoint==null) {