Skip to content

Commit

Permalink
review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Aug 27, 2024
1 parent 00d0f79 commit 8ec01aa
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class DistributedExecutionManager extends ExecutionManager {

public record DistributedState(Map<WorkerId, List<EntityResult>> results, CountDownLatch executingLock) implements InternalState {

public DistributedState() {
this(new ConcurrentHashMap<>(), new CountDownLatch(1));
}

@Override
public Stream<EntityResult> streamQueryResults() {
return results.values().stream().flatMap(Collection::stream);
Expand Down Expand Up @@ -66,10 +70,10 @@ protected <E extends ManagedExecution & InternalExecution> void doExecute(E exec

log.info("Executing Query[{}] in Dataset[{}]", execution.getQueryId(), execution.getDataset());

addState(execution.getId(), new DistributedState(new ConcurrentHashMap<>(), new CountDownLatch(1)));
addState(execution.getId(), new DistributedState());

if (execution instanceof ManagedInternalForm<?> form) {
form.getSubQueries().values().forEach((query) -> addState(query.getId(), new DistributedState(new ConcurrentHashMap<>(), new CountDownLatch(1))));
form.getSubQueries().values().forEach((query) -> addState(query.getId(), new DistributedState()));
}

final WorkerHandler workerHandler = getWorkerHandler(execution.getId().getDataset());
Expand Down

0 comments on commit 8ec01aa

Please sign in to comment.