Skip to content

Commit

Permalink
Refactor based on PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Dec 12, 2024
1 parent 4ee028b commit b9b543b
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void generate(WorkloadOptions options) {
// This workload creates ALL documents in memory, schedules them and waits for completion.
// If larger scale is needed remove the toList() calls and stream all data.
var allDocs = new ArrayList<CompletableFuture<?>>();
for (var workload : options.workloads) {
for (var workload : options.getWorkloads()) {
var workloadInstance = workload.getNewInstance().get();
var docs = workloadInstance
.indexNames()
Expand All @@ -43,12 +43,12 @@ public void generate(WorkloadOptions options) {

private List<CompletableFuture<?>> generateDocs(String indexName, Workload workload, WorkloadOptions options) {
// This happens inline to be sure the index exists before docs are indexed on it
var indexRequestDoc = workload.createIndex(options.index.indexSettings.deepCopy());
var indexRequestDoc = workload.createIndex(options.getIndex().indexSettings.deepCopy());
log.atInfo().setMessage("Creating index {} with {}").addArgument(indexName).addArgument(indexRequestDoc).log();
client.createIndex(indexName, indexRequestDoc, null);

var docIdCounter = new AtomicInteger(0);
var allDocs = workload.createDocs(options.totalDocs)
var allDocs = workload.createDocs(options.getTotalDocs())
.map(doc -> {
log.atTrace().setMessage("Created doc for index {}: {}")
.addArgument(indexName)
Expand All @@ -59,14 +59,14 @@ private List<CompletableFuture<?>> generateDocs(String indexName, Workload workl
.collect(Collectors.toList());

var bulkDocGroups = new ArrayList<List<BulkDocSection>>();
for (int i = 0; i < allDocs.size(); i += options.maxBulkBatchSize) {
bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.maxBulkBatchSize, allDocs.size())));
for (int i = 0; i < allDocs.size(); i += options.getMaxBulkBatchSize()) {
bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.getMaxBulkBatchSize(), allDocs.size())));
}

return bulkDocGroups.stream()
.map(docs -> {
var sendFuture = client.sendBulkRequest(indexName, docs, null).toFuture();
if (options.refreshAfterEachWrite) {
if (options.isRefreshAfterEachWrite()) {
sendFuture.thenRun(() -> client.refresh(null));
// Requests will be sent in parallel unless we wait for completion
// This allows more segments to be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@
import org.opensearch.migrations.data.workloads.Workloads;

import com.beust.jcommander.Parameter;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class WorkloadOptions {
@Parameter(names = { "--workloads", "-w" }, description = "The list of workloads to run, defaults to all available workloads.", required = false)
public List<Workloads> workloads = Arrays.asList(Workloads.values());
private List<Workloads> workloads = Arrays.asList(Workloads.values());

@Parameter(names = { "--docs-per-workload-count" }, description = "The number of documents per workload")
public int totalDocs = 1000;
private int totalDocs = 1000;

@Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests")
public int maxBulkBatchSize = 50;
private int maxBulkBatchSize = 50;

public final IndexOptions index = new IndexOptions();
private final IndexOptions index = new IndexOptions();

public boolean refreshAfterEachWrite = false;
private boolean refreshAfterEachWrite = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ private static void exitOnLeaseTimeout(
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

protected static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
Instant leaseExpirationTime) {
if (workItemTimeProvider.getLeaseAcquisitionTimeRef().get() == null ||
workItemTimeProvider.getDocumentMigraionStartTimeRef().get() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,114 +2,47 @@

import java.time.Duration;
import java.time.Instant;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.*;

class RfsMigrateDocumentsTest {

private static final Duration TEST_INITIAL_LEASE_DURATION = Duration.ofMinutes(1);
private static final double DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = .025d;
private static final double INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = .1d;

private static class TestClass extends RfsMigrateDocuments {
public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
Instant leaseExpirationTime) {
return RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);
}
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

// Lease at 40 minutes, shard prep 59 seconds, successor lease should be decreased since shard prep is < 2.5%
// and exponent is > 0
var existingLeaseExponent = 2;
var shardPrepTime = Duration.ofSeconds(59);
Duration initialLeaseDuration = Duration.ofMinutes(10);
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent - 1, successorNextAcquisitionLeaseExponent, "Should decrease successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThresholdWith0Exponent() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(1);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(0, successorNextAcquisitionLeaseExponent, "Should return 0 for successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanUpperThreshold() {
@ParameterizedTest
@MethodSource("provideTestParameters")
void testGetSuccessorNextAcquisitionLeaseExponent(int existingLeaseExponent, int expectedSuccessorExponent, double shardPrepFraction, String message) {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(59);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));
int initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);
Duration leaseDuration = TEST_INITIAL_LEASE_DURATION.multipliedBy(initialLeaseMultiple);

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is less than 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_EqualToUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(60);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);
Duration shardPrepTime = Duration.ofNanos((long)(leaseDuration.toNanos() * shardPrepFraction));

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));
Instant leaseExpirationTime = Instant.EPOCH.plus(leaseDuration);

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);
int successorNextAcquisitionLeaseExponent = RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent(
workItemTimeProvider, TEST_INITIAL_LEASE_DURATION, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent when shard prep time is equal to 10% of lease duration");
Assertions.assertEquals(expectedSuccessorExponent, successorNextAcquisitionLeaseExponent, message);
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_ExceedsUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(61);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent + 1, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is greater than to 10% of lease duration");
static Stream<Arguments> provideTestParameters() {
return Stream.of(
Arguments.of(2, 1, DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should decrease successorExponent when shard prep time is less than decrease threshold for lease duration"),
Arguments.of(0, 0, DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should return 0 for successorExponent when shard prep time is less than decrease threshold for lease duration and existingLeaseExponent is 0"),
Arguments.of(1, 1, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should return existingLeaseExponent when shard prep time is less than increase threshold for lease duration"),
Arguments.of(1, 1, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD, "Should return existingLeaseExponent when shard prep time is equal to increase threshold for lease duration"),
Arguments.of(1, 2, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD + 0.001, "Should return existingLeaseExponent + 1 when shard prep time is greater than increase threshold for lease duration")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec
);
sourceClusterOperations.createIndex("geonames", body);

workloadOptions.totalDocs = indexDocCount;
workloadOptions.workloads = List.of(Workloads.GEONAMES);
workloadOptions.index.indexSettings.put(IndexOptions.PROP_NUMBER_OF_SHARDS, shards);
workloadOptions.setTotalDocs(indexDocCount);
workloadOptions.setWorkloads(List.of(Workloads.GEONAMES));
workloadOptions.getIndex().indexSettings.put(IndexOptions.PROP_NUMBER_OF_SHARDS, shards);
// Segments will be created on each refresh which tests segment ordering logic
workloadOptions.refreshAfterEachWrite = forceMoreSegments;
workloadOptions.maxBulkBatchSize = forceMoreSegments ? 10 : 1000;
workloadOptions.setRefreshAfterEachWrite(forceMoreSegments);
workloadOptions.setMaxBulkBatchSize(forceMoreSegments ? 10 : 1000);
generator.generate(workloadOptions);

// Create the snapshot from the source cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.opensearch.migrations.bulkload.common.BulkDocSection;
import org.opensearch.migrations.bulkload.common.DocumentReindexer;
Expand Down Expand Up @@ -73,9 +74,9 @@ protected DirectoryReader getReader() {
}

@Override
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase) {
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase, Supplier<String> getSegmentReaderDebugInfo) {
ingestedDocuments.incrementAndGet();
return super.getDocument(reader, luceneDocId, isLive, segmentDocBase);
return super.getDocument(reader, luceneDocId, isLive, segmentDocBase, () -> "TestReaderWrapper(" + getSegmentReaderDebugInfo.get() + ")");
}
};

Expand Down
Loading

0 comments on commit b9b543b

Please sign in to comment.