diff --git a/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/artifacts/specialized/PUDPartitionReader.java b/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/artifacts/specialized/PUDPartitionReader.java new file mode 100644 index 0000000..5a61500 --- /dev/null +++ b/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/artifacts/specialized/PUDPartitionReader.java @@ -0,0 +1,125 @@ +/* + * Copyright 2016 International Business Machines Corp. + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package com.ibm.jbatch.tck.artifacts.specialized; + +import java.io.Serializable; + +import javax.batch.api.BatchProperty; +import javax.batch.api.chunk.AbstractItemReader; +import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; +import javax.inject.Inject; + +import com.ibm.jbatch.tck.artifacts.basicchunk.BasicItem; + +/*NOTE: Code for this class is taken substantially from basicchunk.BasicReader*/ +@javax.inject.Named("PUDPartitionReader") +public class PUDPartitionReader extends AbstractItemReader { + + @Inject + JobContext jobCtx; + + @Inject + StepContext stepCtx; + + @Inject @BatchProperty(name="partition.number") + String partitionNumber; + + @Inject @BatchProperty(name="execution.number") + String executionNumber; + + @Inject + @BatchProperty(name = "number.of.items.to.be.read") + String injectedNumberOfItemsToBeRead; + //Default: read 10 items + private int numberOfItemsToBeRead = 10; + + @Inject + @BatchProperty(name = "throw.reader.exception.for.these.items") + String injectedThrowReaderExceptionForTheseItems; + //Default: don't throw any exceptions + private int[] throwReaderExceptionForTheseItems = {}; + + private String PUDString; + private int currentItemId = -1; + private BasicItem currentItem = null; + + @Override + public void open(Serializable checkpoint) { + + PUDString = "PUD for Partition: " +partitionNumber; + + // Set on the first execution; on later executions it will need to be obtained + // from the job repository's persistent store. + if (checkpoint == null) { + stepCtx.setPersistentUserData(PUDString); + } + + if (injectedNumberOfItemsToBeRead != null) { + numberOfItemsToBeRead = Integer.parseInt(injectedNumberOfItemsToBeRead); + } + + if (injectedThrowReaderExceptionForTheseItems != null) { + String[] exceptionsStringArray = injectedThrowReaderExceptionForTheseItems.split(","); + throwReaderExceptionForTheseItems = new int[exceptionsStringArray.length]; + for (int i = 0; i < exceptionsStringArray.length; i++) { + throwReaderExceptionForTheseItems[i] = Integer.parseInt(exceptionsStringArray[i]); + } + } + } + + @Override + public BasicItem readItem() throws Exception { + + if (executionNumber.equals("2")) { + if (!stepCtx.getPersistentUserData().equals(PUDString)) { + throw new Exception("BadPersistentUserData: PUD for partition "+partitionNumber+" is expected to be "+PUDString+", but found "+stepCtx.getPersistentUserData()); + } + } + + //Code below is take from BasicReader + + /* Note that BasicReader has no concept of rolling back after a retryable exception is thrown. + * Example: chunk size is 2, we plan to read 10 items (#0-#9), but a retryable exception is thrown while reading item #1 + * In this case, the reader goes on to read #2 when it should be rolling back to #0, and so the writer will never receive + * #0, even though it was previously read successfully */ + + currentItemId++; + + if (currentItemId < numberOfItemsToBeRead) { + currentItem = new BasicItem(currentItemId); + if (readerExceptionShouldBeThrownForCurrentItem()) { + //set the job exit status so we can determine which exception was last thrown + jobCtx.setExitStatus("Exception:Item#" + currentItem.getId()); + throw new Exception("Exception thrown for item " + currentItem.getId()); + } + currentItem.setRead(true); + return currentItem; + } + + return null; + } + + private boolean readerExceptionShouldBeThrownForCurrentItem() { + + for (int i: throwReaderExceptionForTheseItems) { + if (currentItem.getId()==i) { return true; } + } + + return false; + } +} diff --git a/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/artifacts/specialized/PUDPartitionReducer.java b/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/artifacts/specialized/PUDPartitionReducer.java new file mode 100644 index 0000000..1004a6c --- /dev/null +++ b/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/artifacts/specialized/PUDPartitionReducer.java @@ -0,0 +1,43 @@ +/** + * Copyright 2016 International Business Machines Corp. + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.jbatch.tck.artifacts.specialized; + +import javax.batch.api.partition.AbstractPartitionReducer; +import javax.batch.runtime.context.StepContext; +import javax.inject.Inject; + +@javax.inject.Named("PUDPartitionReducer") +public class PUDPartitionReducer extends AbstractPartitionReducer { + + @Inject + StepContext stepCtx; + + public final String TOP_LEVEL_PUD = "This is the Persistent User Data for the top-level stepCtx of the partitioned step!"; + + @Override + public void beginPartitionedStep() throws Exception { + stepCtx.setPersistentUserData(TOP_LEVEL_PUD); + } + + @Override + public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception { + if (!stepCtx.getPersistentUserData().equals(TOP_LEVEL_PUD)) { + throw new Exception("Unexpected PUD at the top level of the Partitioned Step!"); + } + } + +} diff --git a/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/tests/jslxml/ParallelExecutionTests.java b/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/tests/jslxml/ParallelExecutionTests.java index 2f0f2b2..f20849f 100755 --- a/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/tests/jslxml/ParallelExecutionTests.java +++ b/com.ibm.jbatch.tck/src/main/java/com/ibm/jbatch/tck/tests/jslxml/ParallelExecutionTests.java @@ -35,6 +35,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.ibm.jbatch.tck.ann.*; import com.ibm.jbatch.tck.utils.JobOperatorBridge; import com.ibm.jbatch.tck.utils.TCKJobExecutionWrapper; @@ -598,6 +599,72 @@ public void testPartitionedMapperOverrideTrueSamePartitionNumOnRestart() throws } } + @TCKTest( + versions = {"1.1.WORKING"}, + assertions = {"Each partition of a partitioned step has its own unique Persistent User Data."}, + specRefs = { + @SpecRef( + version = "1.0RevA", section = "9.4.1.1", + citations = "For a partitioned step, there is one StepContext for the parent step/thread; there is a distinct StepContext for each sub-thread " + + "and each StepContext has its own distinct persistent user data for each sub-thread.", + notes = "See 2. StepContext" + ), + @SpecRef( + version = "1.0", section = "10.9.2", + citations = "The setPersistentUserData method stores a persistent data object into the current step. [...] This data is saved as part of a step's " + + "checkpoint. [...] It is available upon restart.", + notes = "APIRef for StepContext" + ) + }, + apiRefs = { @APIRef(className="javax.batch.runtime.context.StepContext", methodNames={"setPersistentUserData", "getPersistentUserData"}) }, + issueRefs = {"https://github.com/WASdev/standards.jsr352.tck/issues/18"}, + strategy = "See comments in the code for this test" + ) + @Test + @org.junit.Test + public void testPartitionedStepPersistentUserData() throws Exception { + String METHOD = "testPartitionedStepPersistentUserData"; + begin(METHOD); + + try { + /* Test Strategy: + * - Job is made up of one chunk step, split into two partitions + * - Each partition will process 3 items + * - The item-count for the step is set to 1, so each partition should checkpoint after each + * item it reads. + * + * - During Job Execution #1, set the PUD for each partition while reading item 1 + * fail on purpose while reading item 2 (now that the partitions have already check-pointed) + * - During Job Execution #2, check that the PUD has been persisted from previous job execution + * - We also check that the partition level PUDs do not bubble up into the top-level of the step + * by setting and verifying a top-level PUD with the PartitionReducer + */ + + Properties jobParams = new Properties(); + + //Job Execution #1 + jobParams.setProperty("execution.number", "1"); + jobParams.setProperty("number.of.items.to.be.read", "3"); + jobParams.setProperty("throw.reader.exception.for.these.items", "1"); //The second item + + Reporter.log("Locate job XML file: partitioned_step_persistent_user_data.xml

"); + Reporter.log("Invoke startJobAndWaitForResult

"); + JobExecution jobExec1 = jobOp.startJobAndWaitForResult("partitioned_step_persistent_user_data", jobParams); + assertWithMessage("Expected job execution 1 to fail", BatchStatus.FAILED, jobExec1.getBatchStatus()); + + //Job Execution #2 + jobParams.setProperty("execution.number", "2"); + jobParams.setProperty("throw.reader.exception.for.these.items", ""); + + Reporter.log("Invoke restartJobAndWaitForResult

"); + JobExecution jobExec2 = jobOp.restartJobAndWaitForResult(jobExec1.getExecutionId(), jobParams); + assertWithMessage("Expected job execution 2 to complete", BatchStatus.COMPLETED, jobExec2.getBatchStatus()); + + } catch (Exception e) { + handleException(METHOD, e); + } + } + private static void handleException(String methodName, Exception e) throws Exception { Reporter.log("Caught exception: " + e.getMessage()+"

"); Reporter.log(methodName + " failed

"); diff --git a/com.ibm.jbatch.tck/src/main/resources/META-INF/batch-jobs/partitioned_step_persistent_user_data.xml b/com.ibm.jbatch.tck/src/main/resources/META-INF/batch-jobs/partitioned_step_persistent_user_data.xml new file mode 100644 index 0000000..d936ed4 --- /dev/null +++ b/com.ibm.jbatch.tck/src/main/resources/META-INF/batch-jobs/partitioned_step_persistent_user_data.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/com.ibm.jbatch.tck/src/main/resources/META-INF/batch.xml b/com.ibm.jbatch.tck/src/main/resources/META-INF/batch.xml index baf1e80..b7500d5 100755 --- a/com.ibm.jbatch.tck/src/main/resources/META-INF/batch.xml +++ b/com.ibm.jbatch.tck/src/main/resources/META-INF/batch.xml @@ -113,6 +113,8 @@ + +