Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
Exponential backoff for retries (same thread)
Browse files Browse the repository at this point in the history
  • Loading branch information
cdbartholomew committed Apr 22, 2024
1 parent a685e71 commit c1366af
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -882,11 +882,13 @@ private static void runProcessorAgent(
sourceRecord, List.of(), null));
}
case RETRY -> {
long backoffTime =
((StandardErrorsHandler) errorsHandler)
.getBackoffTime();
log.error(
"Retryable error while processing the records, retrying",
error);
// retry the single record (this leads to out-of-order
// processing)
"Retryable error while processing the records, retrying in {} ms",
backoffTime);
Thread.sleep(backoffTime);
runProcessorAgent(
processor,
List.of(sourceRecord),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class StandardErrorsHandler implements ErrorsHandler {

private final int retries;
private final String onFailureAction;
// Add base delay in milliseconds
private static final long BASE_DELAY = 50;

private final AtomicInteger failures = new AtomicInteger(0);

Expand All @@ -40,6 +42,12 @@ public StandardErrorsHandler(Map<String, Object> configuration) {
this.onFailureAction = configuration.getOrDefault("onFailure", FAIL).toString();
}

// Method to calculate backoff time
public long getBackoffTime() {
int attempt = failures.get();
return (long) (BASE_DELAY * Math.pow(2, attempt - 1));
}

@Override
public ErrorsProcessingOutcome handleErrors(Record sourceRecord, Throwable error) {
// no stacktrace here, it's too verbose
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.runner.code.SingleRecordAgentProcessor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -141,6 +142,24 @@ void someGoodSomeFailedWithSkip() throws Exception {
source.expectUncommitted(0);
}

@Test
void someGoodSomeFailedWithRetry() throws Exception {
SimpleSource source =
new SimpleSource(
List.of(
SimpleRecord.of("key", "process-me"),
SimpleRecord.of("key", "fail-me")));
AgentSink sink = new SimpleSink();
FailingAgentProcessor processor = new FailingAgentProcessor(Set.of("fail-me"), 3);
StandardErrorsHandler errorHandler =
new StandardErrorsHandler(Map.of("retries", 5, "onFailure", "fail"));
AgentContext context = createMockAgentContext();
AgentRunner.runMainLoop(
source, processor, sink, context, errorHandler, source::hasMoreRecords);
processor.expectExecutions(5);
source.expectUncommitted(0);
}

@Test
void someGoodSomeFailedWithSkipAndBatching() throws Exception {
SimpleSource source =
Expand Down Expand Up @@ -180,6 +199,26 @@ void someFailedSomeGoodWithSkipAndBatching() throws Exception {
source.expectUncommitted(0);
}

@Test
void someFailedSomeGoodWithRetryAndBatching() throws Exception {
SimpleSource source =
new SimpleSource(
2,
List.of(
SimpleRecord.of("key", "fail-me"),
SimpleRecord.of("key", "process-me")));
AgentSink sink = new SimpleSink();
FailingAgentProcessor processor = new FailingAgentProcessor(Set.of("fail-me"), 2);
StandardErrorsHandler errorHandler =
new StandardErrorsHandler(Map.of("retries", 3, "onFailure", "fail"));
AgentContext context = createMockAgentContext();
AgentRunner.runMainLoop(
source, processor, sink, context, errorHandler, source::hasMoreRecords);
// all the records are processed in one batch
processor.expectExecutions(4);
source.expectUncommitted(0);
}

private static class SimpleSink extends AbstractAgentCode implements AgentSink {
@Override
public CompletableFuture<?> write(Record record) {
Expand Down Expand Up @@ -234,6 +273,44 @@ synchronized void expectUncommitted(int count) {
}
}

public class FailingAgentProcessor extends SingleRecordAgentProcessor {
private final Set<String> failOnContent;
private final int maxFailures; // Maximum number of times to fail before succeeding
private final Map<String, Integer> failureCounts = new HashMap<>();
private int executionCount;

public FailingAgentProcessor(Set<String> failOnContent, int maxFailures) {
this.failOnContent = failOnContent;
this.maxFailures = maxFailures;
}

@Override
public List<Record> processRecord(Record record) {
log.info("Processing {}", record.value());
executionCount++;
String recordValue = (String) record.value();

// Update failure count for this record
int currentFailures = failureCounts.getOrDefault(recordValue, 0);
failureCounts.put(recordValue, currentFailures + 1);

// Check if the record should fail this time
if (failOnContent.contains(recordValue) && currentFailures < maxFailures) {
log.info(
"Record {} failed intentionally, attempt {}", recordValue, currentFailures);
throw new RuntimeException("Failed on " + recordValue);
}

// If it has failed the maximum times, or it's not set to fail, it processes
// successfully
return List.of(record);
}

void expectExecutions(int count) {
assertEquals(count, executionCount);
}
}

private static class SimpleAgentProcessor extends SingleRecordAgentProcessor {

private final Set<String> failOnContent;
Expand Down

0 comments on commit c1366af

Please sign in to comment.