Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Jenkins Full E2E Integ Test to perform Transformations #1182

Merged
merged 21 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies {
testImplementation group: 'org.testcontainers', name: 'toxiproxy'
testImplementation group: 'org.mockito', name: 'mockito-core'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testImplementation group: 'org.json', name: 'json'

testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package org.opensearch.migrations.bulkload;

import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.http.SearchClusterRequests;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Network;


@Slf4j
@Tag("isolatedTest")
public class CustomRfsTransformationTest extends SourceTestBase {

public static final String TARGET_DOCKER_HOSTNAME = "target";
public static final String SNAPSHOT_NAME = "test_snapshot";

@Test
public void testCustomTransformationProducesDesiredTargetClusterState() {
String nameTransformation = createIndexNameTransformation("geonames", "geonames_transformed");
var expectedSourceMap = new HashMap<String, Integer>();
expectedSourceMap.put("geonames", 1);
var expectedTargetMap = new HashMap<String, Integer>();
expectedTargetMap.put("geonames_transformed", 1);
String[] transformationArgs = {
"--doc-transformer-config",
nameTransformation,
};
int totalSourceShards = 1;
Consumer<ClusterOperations> loadDataIntoSource = cluster -> {
// Number of default shards is different across different versions on ES/OS.
// So we explicitly set it.
String body = String.format(
"{" +
" \"settings\": {" +
" \"index\": {" +
" \"number_of_shards\": %d," +
" \"number_of_replicas\": 0" +
" }" +
" }" +
"}",
totalSourceShards
);
cluster.createIndex("geonames", body);
cluster.createDocument("geonames", "111", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}");
};
runTestProcess(
transformationArgs,
expectedSourceMap,
expectedTargetMap,
loadDataIntoSource,
totalSourceShards,
SourceTestBase::runProcessAgainstTarget
);
}

@SneakyThrows
private void runTestProcess(
String[] transformationArgs,
Map<String, Integer> expectedSourceDocs,
Map<String, Integer> expectedTargetDocs,
Consumer<ClusterOperations> preloadDataOperations,
Integer numberOfShards,
Function<String[], Integer> processRunner)
{
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");

try (
var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2)
.withAccessToHost(true);
var network = Network.newNetwork();
var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
.withAccessToHost(true)
.withNetwork(network)
.withNetworkAliases(TARGET_DOCKER_HOSTNAME);
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(esSourceContainer::start),
CompletableFuture.runAsync(osTargetContainer::start)
).join();

var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl());
preloadDataOperations.accept(sourceClusterOperations);

// Create the snapshot from the source cluster
var args = new CreateSnapshot.Args();
args.snapshotName = SNAPSHOT_NAME;
args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR;
args.sourceArgs.host = esSourceContainer.getUrl();

var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext());
snapshotCreator.run();
esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

String[] processArgs = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
tempDirSnapshot.toString(),
"--lucene-dir",
tempDirLucene.toString(),
"--target-host",
osTargetContainer.getUrl(),
"--documents-per-bulk-request",
"5",
"--max-connections",
"4",
"--source-version",
"ES_7_10"
};
String[] completeArgs = Stream.concat(Arrays.stream(processArgs), Arrays.stream(transformationArgs)).toArray(String[]::new);

// Perform RFS process for each shard
for(int i = 0; i < numberOfShards; i++) {
int exitCode = processRunner.apply(completeArgs);
log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log();
}

// Assert doc count on the source and target cluster match expected
validateFinalClusterDocs(
esSourceContainer,
osTargetContainer,
DocumentMigrationTestContext.factory().noOtelTracking(),
expectedSourceDocs,
expectedTargetDocs
);
} finally {
deleteTree(tempDirSnapshot);
}
}

// Create a simple Jolt transform which matches documents of a given index name in a snpahost and changes that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking nit - typo (snpahost)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed now 👍

// index name to a desired index name when migrated to the target cluster
private static String createIndexNameTransformation(String existingIndexName, String newIndexName) {
JSONArray rootArray = new JSONArray();
JSONObject firstObject = new JSONObject();
JSONArray jsonConditionalTransformerProvider = new JSONArray();

// JsonJMESPathPredicateProvider object
JSONObject jsonJMESPathPredicateProvider = new JSONObject();
jsonJMESPathPredicateProvider.put("script", String.format("index._index == '%s'", existingIndexName));
JSONObject jsonJMESPathPredicateWrapper = new JSONObject();
jsonJMESPathPredicateWrapper.put("JsonJMESPathPredicateProvider", jsonJMESPathPredicateProvider);
jsonConditionalTransformerProvider.put(jsonJMESPathPredicateWrapper);

JSONArray transformerList = new JSONArray();

// First JsonJoltTransformerProvider
JSONObject firstJoltTransformer = new JSONObject();
JSONObject firstJoltScript = new JSONObject();
firstJoltScript.put("operation", "modify-overwrite-beta");
firstJoltScript.put("spec", new JSONObject().put("index", new JSONObject().put("\\_index", newIndexName)));
firstJoltTransformer.put("JsonJoltTransformerProvider", new JSONObject().put("script", firstJoltScript));
transformerList.put(firstJoltTransformer);

jsonConditionalTransformerProvider.put(transformerList);
firstObject.put("JsonConditionalTransformerProvider", jsonConditionalTransformerProvider);
rootArray.put(firstObject);
return rootArray.toString();
}

private static void validateFinalClusterDocs(
SearchClusterContainer esSourceContainer,
SearchClusterContainer osTargetContainer,
DocumentMigrationTestContext context,
Map<String, Integer> expectedSourceDocs,
Map<String, Integer> expectedTargetDocs
) {
var targetClient = new RestClient(ConnectionContextTestParams.builder()
.host(osTargetContainer.getUrl())
.build()
.toConnectionContext()
);
var sourceClient = new RestClient(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
);

var requests = new SearchClusterRequests(context);
var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient);
var refreshResponse = targetClient.get("_refresh", context.createUnboundRequestContext());
Assertions.assertEquals(200, refreshResponse.statusCode);
var targetMap = requests.getMapOfIndexAndDocCount(targetClient);

MatcherAssert.assertThat(sourceMap, Matchers.equalTo(expectedSourceDocs));
MatcherAssert.assertThat(targetMap, Matchers.equalTo(expectedTargetDocs));
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package org.opensearch.migrations.bulkload;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,7 +22,6 @@
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -124,8 +118,8 @@ private void testProcess(int expectedExitCode, Function<RunData, Integer> proces
var proxyContainer = new ToxiProxyWrapper(network)
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> esSourceContainer.start()),
CompletableFuture.runAsync(() -> osTargetContainer.start()),
CompletableFuture.runAsync(esSourceContainer::start),
CompletableFuture.runAsync(osTargetContainer::start),
CompletableFuture.runAsync(() -> proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT))
).join();

Expand Down Expand Up @@ -180,36 +174,7 @@ private static int runProcessAgainstToxicTarget(
}

int timeoutSeconds = 90;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, failHow);

var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

return process.exitValue();
}


@NotNull
private static ProcessBuilder setupProcess(
Path tempDirSnapshot,
Path tempDirLucene,
String targetAddress,
FailHow failHow
) {
String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

String[] args = {
String[] processArgs = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
Expand All @@ -228,51 +193,21 @@ private static ProcessBuilder setupProcess(
"ES_7_10",
"--initial-lease-duration",
failHow == FailHow.NEVER ? "PT10M" : "PT1S" };
ProcessBuilder processBuilder = setupProcess(processArgs);

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}")
.addArgument(() -> Arrays.toString(args))
.log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
"-cp",
classpath,
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput();
return processBuilder;
}

@NotNull
private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
var process = processBuilder.start();

log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
var readerThread = new Thread(() -> {
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) break;
} catch (IOException e) {
log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log();
return;
}
String finalLine = line;
log.atInfo()
.setMessage("from sub-process [{}]: {}")
.addArgument(() -> process.toHandle().pid())
.addArgument(finalLine)
.log();
var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
});
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

// Kill the process and fail if we have to wait too long
readerThread.start();
return process;
return process.exitValue();
}

}
Loading
Loading