Skip to content

Commit

Permalink
Merge pull request #3519 from ingef/feature/extract-execution-synchro…
Browse files Browse the repository at this point in the history
…nization

Feature/extract execution synchronization
  • Loading branch information
thoniTUB authored Aug 27, 2024
2 parents 0854f74 + 8ec01aa commit c178dfa
Show file tree
Hide file tree
Showing 48 changed files with 653 additions and 568 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Validator;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;

import com.bakdata.conquery.apiv1.execution.ExecutionStatus;
import com.bakdata.conquery.apiv1.execution.FullExecutionStatus;
Expand Down Expand Up @@ -61,6 +67,7 @@
import com.bakdata.conquery.models.execution.ExecutionState;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.i18n.I18n;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.identifiable.ids.specific.GroupId;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.identifiable.mapping.IdPrinter;
Expand All @@ -82,14 +89,6 @@
import com.bakdata.conquery.util.QueryUtils;
import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector;
import com.bakdata.conquery.util.io.IdColumnUtil;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.MutableClassToInstanceMap;
import jakarta.inject.Inject;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Validator;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -112,22 +111,23 @@ public class QueryProcessor {
public Stream<ExecutionStatus> getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) {
final Collection<ManagedExecution> allQueries = storage.getAllExecutions();

return getQueriesFiltered(dataset, RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders);
return getQueriesFiltered(dataset.getId(), RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders);
}

public Stream<ExecutionStatus> getQueriesFiltered(Dataset datasetId, UriBuilder uriBuilder, Subject subject, Collection<ManagedExecution> allQueries, boolean allProviders) {
public Stream<ExecutionStatus> getQueriesFiltered(DatasetId datasetId, UriBuilder uriBuilder, Subject subject, Collection<ManagedExecution> allQueries, boolean allProviders) {

return allQueries.stream()
// The following only checks the dataset, under which the query was submitted, but a query can target more that
// one dataset.
.filter(q -> q.getDataset().equals(datasetId))
.filter(q -> q.getDataset().getId().equals(datasetId))
// to exclude subtypes from somewhere else
.filter(QueryProcessor::canFrontendRender)
.filter(Predicate.not(ManagedExecution::isSystem))
.filter(q -> q.getState().equals(ExecutionState.DONE) || q.getState().equals(ExecutionState.NEW))
.filter(q -> subject.isPermitted(q, Ability.READ))
.map(mq -> {
final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject);
Namespace namespace = datasetRegistry.get(mq.getDataset().getId());
final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject, namespace);
if (mq.isReadyToDownload()) {
status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders));
}
Expand Down Expand Up @@ -207,7 +207,7 @@ public void cancel(Subject subject, Dataset dataset, ManagedExecution query) {
log.info("User[{}] cancelled Query[{}]", subject.getId(), query.getId());

final ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager();
executionManager.cancelQuery(dataset, query);
executionManager.cancelQuery(query);
}

public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatch patch) {
Expand Down Expand Up @@ -275,14 +275,19 @@ public void deleteQuery(Subject subject, ManagedExecution execution) {
storage.removeExecution(execution.getId());
}

public ExecutionState awaitDone(ManagedExecution query, int time, TimeUnit unit) {
final Namespace namespace = datasetRegistry.get(query.getDataset().getId());
return namespace.getExecutionManager().awaitDone(query, time, unit);
}

public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject subject, UriBuilder url, Boolean allProviders) {
final Namespace namespace = datasetRegistry.get(query.getDataset().getId());

query.initExecutable(namespace, config);

final FullExecutionStatus status = query.buildStatusFull(subject);
final FullExecutionStatus status = query.buildStatusFull(subject, namespace);

if (query.isReadyToDownload() && subject.isPermitted(query.getDataset(), Ability.DOWNLOAD)) {
if (query.isReadyToDownload() && subject.isPermitted(namespace.getDataset(), Ability.DOWNLOAD)) {
status.setResultUrls(getResultAssets(config.getResultProviders(), query, url, allProviders));
}
return status;
Expand Down Expand Up @@ -317,7 +322,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext
execution =
((ManagedQuery) namespace
.getExecutionManager()
.createExecution(query, subject.getUser(), dataset, false));
.createExecution(query, subject.getUser(), namespace, false));

execution.setLastResultCount((long) statistic.getResolved().size());

Expand All @@ -338,7 +343,8 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri
subject.authorize(dataset, Ability.ENTITY_PREVIEW);
subject.authorize(dataset, Ability.PRESERVE_ID);

final PreviewConfig previewConfig = datasetRegistry.get(dataset.getId()).getPreviewConfig();
Namespace namespace = datasetRegistry.get(dataset.getId());
final PreviewConfig previewConfig = namespace.getPreviewConfig();
final EntityPreviewForm form =
EntityPreviewForm.create(entity, idKind, dateRange, sources, previewConfig.getSelects(), previewConfig.getTimeStratifiedSelects(), datasetRegistry);

Expand All @@ -350,7 +356,7 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri
final EntityPreviewExecution execution = (EntityPreviewExecution) postQuery(dataset, form, subject, true);


if (execution.awaitDone(10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
if (namespace.getExecutionManager().awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
log.warn("Still waiting for {} after 10 Seconds.", execution.getId());
throw new ConqueryError.ExecutionProcessingTimeoutError();
}
Expand All @@ -361,7 +367,7 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri
}


final FullExecutionStatus status = execution.buildStatusFull(subject);
final FullExecutionStatus status = execution.buildStatusFull(subject, namespace);
status.setResultUrls(getResultAssets(config.getResultProviders(), execution, uriBuilder, false));
return status;
}
Expand All @@ -377,21 +383,23 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje
// This maps works as long as we have query visitors that are not configured in anyway.
// So adding a visitor twice would replace the previous one but both would have yielded the same result.
// For the future a better data structure might be desired that also regards similar QueryVisitors of different configuration
final ClassToInstanceMap<QueryVisitor> visitors = MutableClassToInstanceMap.create();
final List<QueryVisitor> visitors = new ArrayList<>();
query.addVisitors(visitors);

// Initialize checks that need to traverse the query tree
visitors.putInstance(QueryUtils.OnlyReusingChecker.class, new QueryUtils.OnlyReusingChecker());
visitors.putInstance(NamespacedIdentifiableCollector.class, new NamespacedIdentifiableCollector());
QueryUtils.OnlyReusingChecker onlyReusingChecker = new QueryUtils.OnlyReusingChecker();
visitors.add(onlyReusingChecker);
NamespacedIdentifiableCollector namespacedIdentifiableCollector = new NamespacedIdentifiableCollector();
visitors.add(namespacedIdentifiableCollector);

final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(subject, storage).map(Group::getName).orElse("none");

visitors.putInstance(ExecutionMetrics.QueryMetricsReporter.class, new ExecutionMetrics.QueryMetricsReporter(primaryGroupName));
ExecutionMetrics.QueryMetricsReporter queryMetricsReporter = new ExecutionMetrics.QueryMetricsReporter(primaryGroupName);
visitors.add(queryMetricsReporter);


// Chain all Consumers
Consumer<Visitable> consumerChain = QueryUtils.getNoOpEntryPoint();
for (QueryVisitor visitor : visitors.values()) {
for (QueryVisitor visitor : visitors) {
consumerChain = consumerChain.andThen(visitor);
}

Expand All @@ -402,7 +410,8 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje
query.authorize(subject, dataset, visitors, storage);
// After all authorization checks we can now use the actual subject to invoke the query and do not to bubble down the Userish in methods

ExecutionMetrics.reportNamespacedIds(visitors.getInstance(NamespacedIdentifiableCollector.class).getIdentifiables(), primaryGroupName);

ExecutionMetrics.reportNamespacedIds(namespacedIdentifiableCollector.getIdentifiables(), primaryGroupName);

ExecutionMetrics.reportQueryClassUsage(query.getClass(), primaryGroupName);

Expand All @@ -412,7 +421,7 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje

// If this is only a re-executing query, try to execute the underlying query instead.
{
final Optional<ManagedExecutionId> executionId = visitors.getInstance(QueryUtils.OnlyReusingChecker.class).getOnlyReused();
final Optional<ManagedExecutionId> executionId = onlyReusingChecker.getOnlyReused();

final Optional<ManagedExecution>
execution =
Expand All @@ -424,7 +433,7 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje
}

// Execute the query
return executionManager.runQuery(namespace, query, subject.getUser(), dataset, config, system);
return executionManager.runQuery(namespace, query, subject.getUser(), config, system);
}

/**
Expand Down Expand Up @@ -458,7 +467,7 @@ private ManagedExecution tryReuse(QueryDescription query, ManagedExecutionId exe
if (!user.isOwner(execution)) {
final ManagedExecution
newExecution =
executionManager.createExecution(execution.getSubmitted(), user, execution.getDataset(), false);
executionManager.createExecution(execution.getSubmitted(), user, namespace, false);
newExecution.setLabel(execution.getLabel());
newExecution.setTags(execution.getTags().clone());
storage.updateExecution(newExecution);
Expand Down Expand Up @@ -511,7 +520,7 @@ public Stream<Map<String, String>> resolveEntities(Subject subject, List<FilterV

final ManagedExecution execution = postQuery(dataset, query, subject, true);

if (execution.awaitDone(10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
if (namespace.getExecutionManager().awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) {
log.warn("Still waiting for {} after 10 Seconds.", execution.getId());
throw new ConqueryError.ExecutionProcessingTimeoutError();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.apiv1.forms;

import java.util.List;
import javax.annotation.Nullable;

import com.bakdata.conquery.apiv1.query.QueryDescription;
Expand All @@ -15,7 +16,6 @@
import com.bakdata.conquery.models.query.visitor.QueryVisitor;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ClassToInstanceMap;
import lombok.EqualsAndHashCode;
import lombok.NonNull;

Expand All @@ -41,7 +41,7 @@ public String getFormType() {


@Override
public void authorize(Subject subject, Dataset submittedDataset, @NonNull ClassToInstanceMap<QueryVisitor> visitors, MetaStorage storage) {
public void authorize(Subject subject, Dataset submittedDataset, @NonNull List<QueryVisitor> visitors, MetaStorage storage) {
QueryDescription.super.authorize(subject, submittedDataset, visitors, storage);
// Check if subject is allowed to create this form
final FormType formType = FormScanner.resolveFormType(getFormType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import java.time.LocalDateTime;
import java.util.UUID;

import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.models.auth.entities.User;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.forms.configs.FormConfig;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.util.VariableDefaultValue;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -37,8 +36,8 @@ public class FormConfigAPI {
private UUID formId = UUID.randomUUID();
@VariableDefaultValue @Builder.Default
private LocalDateTime creationTime = LocalDateTime.now();
public FormConfig intern(User owner, Dataset dataset) {

public FormConfig intern(User owner, DatasetId dataset) {
FormConfig intern = new FormConfig();
intern.setFormId(formId);
intern.setFormType(formType);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.apiv1.query;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -25,9 +26,7 @@
import com.bakdata.conquery.util.QueryUtils.ExternalIdChecker;
import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.ClassToInstanceMap;
import lombok.NonNull;
import org.jetbrains.annotations.NotNull;

@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type")
@CPSBase
Expand Down Expand Up @@ -62,25 +61,22 @@ public interface QueryDescription extends Visitable {
* All visitors are concatenated so only a single traverse needs to be done.
* @param visitors The structure to which new visitors need to be added.
*/
default void addVisitors(@NonNull ClassToInstanceMap<QueryVisitor> visitors) {
default void addVisitors(@NonNull List<QueryVisitor> visitors) {
// Register visitors for permission checks
visitors.putInstance(NamespacedIdentifiableCollector.class, new NamespacedIdentifiableCollector());
visitors.putInstance(QueryUtils.ExternalIdChecker.class, new QueryUtils.ExternalIdChecker());
visitors.add(new QueryUtils.ExternalIdChecker());
}

/**
* Check implementation specific permissions. Is called after all visitors have been registered and executed.
*/
default void authorize(Subject subject, Dataset submittedDataset, @NonNull ClassToInstanceMap<QueryVisitor> visitors, MetaStorage storage) {
default void authorize(Subject subject, Dataset submittedDataset, List<QueryVisitor> visitors, MetaStorage storage) {
authorizeQuery(this, subject, submittedDataset, visitors, storage);
}

public static void authorizeQuery(QueryDescription queryDescription, Subject subject, Dataset submittedDataset, @NotNull ClassToInstanceMap<QueryVisitor> visitors, MetaStorage storage) {
static void authorizeQuery(QueryDescription queryDescription, Subject subject, Dataset submittedDataset, List<QueryVisitor> visitors, MetaStorage storage) {
NamespacedIdentifiableCollector nsIdCollector = QueryUtils.getVisitor(visitors, NamespacedIdentifiableCollector.class);
ExternalIdChecker externalIdChecker = QueryUtils.getVisitor(visitors, ExternalIdChecker.class);
if (nsIdCollector == null) {
throw new IllegalStateException();
}

// Generate DatasetPermissions
final Set<Dataset> datasets = nsIdCollector.getIdentifiables().stream()
.map(NamespacedIdentifiable::getDataset)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
package com.bakdata.conquery.io.result;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;

import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;

import com.bakdata.conquery.apiv1.execution.ResultAsset;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.io.external.form.ExternalFormBackendApi;
import com.bakdata.conquery.models.auth.entities.User;
import com.bakdata.conquery.models.query.ExecutionManager;
import com.fasterxml.jackson.annotation.JsonIgnore;
import it.unimi.dsi.fastutil.Pair;

/**
* Interface for executions, whose final result is produced externally.
*/
public interface ExternalResult {
public interface ExternalState extends ExecutionManager.State {

/**
* Returns the api object for the external form backend.
*/
ExternalFormBackendApi getApi();

/**
* Sets the map of results which reference the result assets of an external execution.
*/
void setResultsAssetMap(List<Pair<ResultAsset, AssetBuilder>> assetMap);

/**
* Implementations should use the {@link com.bakdata.eva.resources.ExternalResultResource#getDownloadURL(UriBuilder, ManagedExecution, String)}
* to build the url for the asset.
* The provided assetId is the one that is used by {@link ExternalResult#fetchExternalResult(String)} to retrieve the download.
* Returns assert builders for all results registered by an {@link com.bakdata.conquery.models.forms.managed.ExternalExecution} (see {@link AssetBuilder}).
* The provided assetId is the one that is used by {@link ExternalState#fetchExternalResult(String)} to retrieve the download.
*/
@JsonIgnore
Stream<AssetBuilder> getResultAssets();
Expand All @@ -37,6 +48,7 @@ public interface ExternalResult {
*/
@FunctionalInterface
interface AssetBuilder extends Function<UriBuilder, ResultAsset> {

}

User getServiceUser();
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
package com.bakdata.conquery.io.result.external;

import jakarta.inject.Inject;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response;

import com.bakdata.conquery.io.result.ExternalResult;
import com.bakdata.conquery.io.result.ExternalState;
import com.bakdata.conquery.io.result.ResultUtil;
import com.bakdata.conquery.models.auth.entities.Subject;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.forms.managed.ExternalExecution;
import com.bakdata.conquery.models.query.ExecutionManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor(onConstructor_ = @Inject)
public class ExternalResultProcessor {

public <T extends ManagedExecution & ExternalResult> Response getResult(Subject subject, ManagedExecution execution, String fileName) {
private final DatasetRegistry<?> datasetRegistry;

ResultUtil.authorizeExecutable(subject, execution);

if (!(execution instanceof ExternalResult)) {
throw new WebApplicationException("The execution exists, but produces not a zipped result", Response.Status.CONFLICT);
public Response getResult(Subject subject, ExternalExecution execution, String fileName) {

}
ResultUtil.authorizeExecutable(subject, execution);

T externalExecution = (T) execution;
ExecutionManager executionManager = datasetRegistry.get(execution.getDataset().getId()).getExecutionManager();
ExternalState externalResult = executionManager.getResult(execution.getId());

return externalExecution.fetchExternalResult(fileName);
return externalResult.fetchExternalResult(fileName);
}
}
Loading

0 comments on commit c178dfa

Please sign in to comment.