diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java index 7bb95fa2f2..7ea5ebe8b3 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java @@ -21,6 +21,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import jakarta.inject.Inject; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.validation.Validator; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.ExecutionStatus; import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; @@ -61,6 +67,7 @@ import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.i18n.I18n; +import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.GroupId; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.identifiable.mapping.IdPrinter; @@ -82,14 +89,6 @@ import com.bakdata.conquery.util.QueryUtils; import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector; import com.bakdata.conquery.util.io.IdColumnUtil; -import com.google.common.collect.ClassToInstanceMap; -import com.google.common.collect.MutableClassToInstanceMap; -import jakarta.inject.Inject; -import jakarta.servlet.http.HttpServletRequest; -import jakarta.validation.Validator; -import jakarta.ws.rs.BadRequestException; -import jakarta.ws.rs.core.Response; -import jakarta.ws.rs.core.UriBuilder; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -112,22 +111,23 @@ public class QueryProcessor { public Stream getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) { final Collection allQueries = storage.getAllExecutions(); - return getQueriesFiltered(dataset, RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders); + return getQueriesFiltered(dataset.getId(), RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders); } - public Stream getQueriesFiltered(Dataset datasetId, UriBuilder uriBuilder, Subject subject, Collection allQueries, boolean allProviders) { + public Stream getQueriesFiltered(DatasetId datasetId, UriBuilder uriBuilder, Subject subject, Collection allQueries, boolean allProviders) { return allQueries.stream() // The following only checks the dataset, under which the query was submitted, but a query can target more that // one dataset. - .filter(q -> q.getDataset().equals(datasetId)) + .filter(q -> q.getDataset().getId().equals(datasetId)) // to exclude subtypes from somewhere else .filter(QueryProcessor::canFrontendRender) .filter(Predicate.not(ManagedExecution::isSystem)) .filter(q -> q.getState().equals(ExecutionState.DONE) || q.getState().equals(ExecutionState.NEW)) .filter(q -> subject.isPermitted(q, Ability.READ)) .map(mq -> { - final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject); + Namespace namespace = datasetRegistry.get(mq.getDataset().getId()); + final OverviewExecutionStatus status = mq.buildStatusOverview(uriBuilder.clone(), subject, namespace); if (mq.isReadyToDownload()) { status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders)); } @@ -207,7 +207,7 @@ public void cancel(Subject subject, Dataset dataset, ManagedExecution query) { log.info("User[{}] cancelled Query[{}]", subject.getId(), query.getId()); final ExecutionManager executionManager = datasetRegistry.get(dataset.getId()).getExecutionManager(); - executionManager.cancelQuery(dataset, query); + executionManager.cancelQuery(query); } public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatch patch) { @@ -275,14 +275,19 @@ public void deleteQuery(Subject subject, ManagedExecution execution) { storage.removeExecution(execution.getId()); } + public ExecutionState awaitDone(ManagedExecution query, int time, TimeUnit unit) { + final Namespace namespace = datasetRegistry.get(query.getDataset().getId()); + return namespace.getExecutionManager().awaitDone(query, time, unit); + } + public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject subject, UriBuilder url, Boolean allProviders) { final Namespace namespace = datasetRegistry.get(query.getDataset().getId()); query.initExecutable(namespace, config); - final FullExecutionStatus status = query.buildStatusFull(subject); + final FullExecutionStatus status = query.buildStatusFull(subject, namespace); - if (query.isReadyToDownload() && subject.isPermitted(query.getDataset(), Ability.DOWNLOAD)) { + if (query.isReadyToDownload() && subject.isPermitted(namespace.getDataset(), Ability.DOWNLOAD)) { status.setResultUrls(getResultAssets(config.getResultProviders(), query, url, allProviders)); } return status; @@ -317,7 +322,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext execution = ((ManagedQuery) namespace .getExecutionManager() - .createExecution(query, subject.getUser(), dataset, false)); + .createExecution(query, subject.getUser(), namespace, false)); execution.setLastResultCount((long) statistic.getResolved().size()); @@ -338,7 +343,8 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri subject.authorize(dataset, Ability.ENTITY_PREVIEW); subject.authorize(dataset, Ability.PRESERVE_ID); - final PreviewConfig previewConfig = datasetRegistry.get(dataset.getId()).getPreviewConfig(); + Namespace namespace = datasetRegistry.get(dataset.getId()); + final PreviewConfig previewConfig = namespace.getPreviewConfig(); final EntityPreviewForm form = EntityPreviewForm.create(entity, idKind, dateRange, sources, previewConfig.getSelects(), previewConfig.getTimeStratifiedSelects(), datasetRegistry); @@ -350,7 +356,7 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri final EntityPreviewExecution execution = (EntityPreviewExecution) postQuery(dataset, form, subject, true); - if (execution.awaitDone(10, TimeUnit.SECONDS) == ExecutionState.RUNNING) { + if (namespace.getExecutionManager().awaitDone(execution, 10, TimeUnit.SECONDS) == ExecutionState.RUNNING) { log.warn("Still waiting for {} after 10 Seconds.", execution.getId()); throw new ConqueryError.ExecutionProcessingTimeoutError(); } @@ -361,7 +367,7 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri } - final FullExecutionStatus status = execution.buildStatusFull(subject); + final FullExecutionStatus status = execution.buildStatusFull(subject, namespace); status.setResultUrls(getResultAssets(config.getResultProviders(), execution, uriBuilder, false)); return status; } @@ -377,21 +383,23 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje // This maps works as long as we have query visitors that are not configured in anyway. // So adding a visitor twice would replace the previous one but both would have yielded the same result. // For the future a better data structure might be desired that also regards similar QueryVisitors of different configuration - final ClassToInstanceMap visitors = MutableClassToInstanceMap.create(); + final List visitors = new ArrayList<>(); query.addVisitors(visitors); // Initialize checks that need to traverse the query tree - visitors.putInstance(QueryUtils.OnlyReusingChecker.class, new QueryUtils.OnlyReusingChecker()); - visitors.putInstance(NamespacedIdentifiableCollector.class, new NamespacedIdentifiableCollector()); + QueryUtils.OnlyReusingChecker onlyReusingChecker = new QueryUtils.OnlyReusingChecker(); + visitors.add(onlyReusingChecker); + NamespacedIdentifiableCollector namespacedIdentifiableCollector = new NamespacedIdentifiableCollector(); + visitors.add(namespacedIdentifiableCollector); final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(subject, storage).map(Group::getName).orElse("none"); - - visitors.putInstance(ExecutionMetrics.QueryMetricsReporter.class, new ExecutionMetrics.QueryMetricsReporter(primaryGroupName)); + ExecutionMetrics.QueryMetricsReporter queryMetricsReporter = new ExecutionMetrics.QueryMetricsReporter(primaryGroupName); + visitors.add(queryMetricsReporter); // Chain all Consumers Consumer consumerChain = QueryUtils.getNoOpEntryPoint(); - for (QueryVisitor visitor : visitors.values()) { + for (QueryVisitor visitor : visitors) { consumerChain = consumerChain.andThen(visitor); } @@ -402,7 +410,8 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje query.authorize(subject, dataset, visitors, storage); // After all authorization checks we can now use the actual subject to invoke the query and do not to bubble down the Userish in methods - ExecutionMetrics.reportNamespacedIds(visitors.getInstance(NamespacedIdentifiableCollector.class).getIdentifiables(), primaryGroupName); + + ExecutionMetrics.reportNamespacedIds(namespacedIdentifiableCollector.getIdentifiables(), primaryGroupName); ExecutionMetrics.reportQueryClassUsage(query.getClass(), primaryGroupName); @@ -412,7 +421,7 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje // If this is only a re-executing query, try to execute the underlying query instead. { - final Optional executionId = visitors.getInstance(QueryUtils.OnlyReusingChecker.class).getOnlyReused(); + final Optional executionId = onlyReusingChecker.getOnlyReused(); final Optional execution = @@ -424,7 +433,7 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje } // Execute the query - return executionManager.runQuery(namespace, query, subject.getUser(), dataset, config, system); + return executionManager.runQuery(namespace, query, subject.getUser(), config, system); } /** @@ -458,7 +467,7 @@ private ManagedExecution tryReuse(QueryDescription query, ManagedExecutionId exe if (!user.isOwner(execution)) { final ManagedExecution newExecution = - executionManager.createExecution(execution.getSubmitted(), user, execution.getDataset(), false); + executionManager.createExecution(execution.getSubmitted(), user, namespace, false); newExecution.setLabel(execution.getLabel()); newExecution.setTags(execution.getTags().clone()); storage.updateExecution(newExecution); @@ -511,7 +520,7 @@ public Stream> resolveEntities(Subject subject, List visitors, MetaStorage storage) { + public void authorize(Subject subject, Dataset submittedDataset, @NonNull List visitors, MetaStorage storage) { QueryDescription.super.authorize(subject, submittedDataset, visitors, storage); // Check if subject is allowed to create this form final FormType formType = FormScanner.resolveFormType(getFormType()); diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/FormConfigAPI.java b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/FormConfigAPI.java index dc105a6286..71fbff1ada 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/forms/FormConfigAPI.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/forms/FormConfigAPI.java @@ -2,13 +2,12 @@ import java.time.LocalDateTime; import java.util.UUID; - import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import com.bakdata.conquery.models.auth.entities.User; -import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.forms.configs.FormConfig; +import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.util.VariableDefaultValue; import com.fasterxml.jackson.databind.JsonNode; import lombok.AllArgsConstructor; @@ -37,8 +36,8 @@ public class FormConfigAPI { private UUID formId = UUID.randomUUID(); @VariableDefaultValue @Builder.Default private LocalDateTime creationTime = LocalDateTime.now(); - - public FormConfig intern(User owner, Dataset dataset) { + + public FormConfig intern(User owner, DatasetId dataset) { FormConfig intern = new FormConfig(); intern.setFormId(formId); intern.setFormType(formType); diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java index 091d78a5e3..73a0282b40 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/QueryDescription.java @@ -1,5 +1,6 @@ package com.bakdata.conquery.apiv1.query; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -25,9 +26,7 @@ import com.bakdata.conquery.util.QueryUtils.ExternalIdChecker; import com.bakdata.conquery.util.QueryUtils.NamespacedIdentifiableCollector; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.common.collect.ClassToInstanceMap; import lombok.NonNull; -import org.jetbrains.annotations.NotNull; @JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type") @CPSBase @@ -62,25 +61,22 @@ public interface QueryDescription extends Visitable { * All visitors are concatenated so only a single traverse needs to be done. * @param visitors The structure to which new visitors need to be added. */ - default void addVisitors(@NonNull ClassToInstanceMap visitors) { + default void addVisitors(@NonNull List visitors) { // Register visitors for permission checks - visitors.putInstance(NamespacedIdentifiableCollector.class, new NamespacedIdentifiableCollector()); - visitors.putInstance(QueryUtils.ExternalIdChecker.class, new QueryUtils.ExternalIdChecker()); + visitors.add(new QueryUtils.ExternalIdChecker()); } /** * Check implementation specific permissions. Is called after all visitors have been registered and executed. */ - default void authorize(Subject subject, Dataset submittedDataset, @NonNull ClassToInstanceMap visitors, MetaStorage storage) { + default void authorize(Subject subject, Dataset submittedDataset, List visitors, MetaStorage storage) { authorizeQuery(this, subject, submittedDataset, visitors, storage); } - public static void authorizeQuery(QueryDescription queryDescription, Subject subject, Dataset submittedDataset, @NotNull ClassToInstanceMap visitors, MetaStorage storage) { + static void authorizeQuery(QueryDescription queryDescription, Subject subject, Dataset submittedDataset, List visitors, MetaStorage storage) { NamespacedIdentifiableCollector nsIdCollector = QueryUtils.getVisitor(visitors, NamespacedIdentifiableCollector.class); ExternalIdChecker externalIdChecker = QueryUtils.getVisitor(visitors, ExternalIdChecker.class); - if (nsIdCollector == null) { - throw new IllegalStateException(); - } + // Generate DatasetPermissions final Set datasets = nsIdCollector.getIdentifiables().stream() .map(NamespacedIdentifiable::getDataset) diff --git a/backend/src/main/java/com/bakdata/conquery/io/result/ExternalResult.java b/backend/src/main/java/com/bakdata/conquery/io/result/ExternalState.java similarity index 55% rename from backend/src/main/java/com/bakdata/conquery/io/result/ExternalResult.java rename to backend/src/main/java/com/bakdata/conquery/io/result/ExternalState.java index f52f1f6b28..65db0fc592 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/result/ExternalResult.java +++ b/backend/src/main/java/com/bakdata/conquery/io/result/ExternalState.java @@ -1,25 +1,36 @@ package com.bakdata.conquery.io.result; +import java.util.List; import java.util.function.Function; import java.util.stream.Stream; - import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.ResultAsset; -import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.io.external.form.ExternalFormBackendApi; +import com.bakdata.conquery.models.auth.entities.User; +import com.bakdata.conquery.models.query.ExecutionManager; import com.fasterxml.jackson.annotation.JsonIgnore; +import it.unimi.dsi.fastutil.Pair; /** * Interface for executions, whose final result is produced externally. */ -public interface ExternalResult { +public interface ExternalState extends ExecutionManager.State { + + /** + * Returns the api object for the external form backend. + */ + ExternalFormBackendApi getApi(); + /** + * Sets the map of results which reference the result assets of an external execution. + */ + void setResultsAssetMap(List> assetMap); /** - * Implementations should use the {@link com.bakdata.eva.resources.ExternalResultResource#getDownloadURL(UriBuilder, ManagedExecution, String)} - * to build the url for the asset. - * The provided assetId is the one that is used by {@link ExternalResult#fetchExternalResult(String)} to retrieve the download. + * Returns assert builders for all results registered by an {@link com.bakdata.conquery.models.forms.managed.ExternalExecution} (see {@link AssetBuilder}). + * The provided assetId is the one that is used by {@link ExternalState#fetchExternalResult(String)} to retrieve the download. */ @JsonIgnore Stream getResultAssets(); @@ -37,6 +48,7 @@ public interface ExternalResult { */ @FunctionalInterface interface AssetBuilder extends Function { - } + + User getServiceUser(); } diff --git a/backend/src/main/java/com/bakdata/conquery/io/result/external/ExternalResultProcessor.java b/backend/src/main/java/com/bakdata/conquery/io/result/external/ExternalResultProcessor.java index 50eceee008..c8ed12696c 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/result/external/ExternalResultProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/io/result/external/ExternalResultProcessor.java @@ -1,29 +1,28 @@ package com.bakdata.conquery.io.result.external; import jakarta.inject.Inject; -import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.Response; -import com.bakdata.conquery.io.result.ExternalResult; +import com.bakdata.conquery.io.result.ExternalState; import com.bakdata.conquery.io.result.ResultUtil; import com.bakdata.conquery.models.auth.entities.Subject; -import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.forms.managed.ExternalExecution; +import com.bakdata.conquery.models.query.ExecutionManager; +import com.bakdata.conquery.models.worker.DatasetRegistry; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor(onConstructor_ = @Inject) public class ExternalResultProcessor { - public Response getResult(Subject subject, ManagedExecution execution, String fileName) { + private final DatasetRegistry datasetRegistry; - ResultUtil.authorizeExecutable(subject, execution); - - if (!(execution instanceof ExternalResult)) { - throw new WebApplicationException("The execution exists, but produces not a zipped result", Response.Status.CONFLICT); + public Response getResult(Subject subject, ExternalExecution execution, String fileName) { - } + ResultUtil.authorizeExecutable(subject, execution); - T externalExecution = (T) execution; + ExecutionManager executionManager = datasetRegistry.get(execution.getDataset().getId()).getExecutionManager(); + ExternalState externalResult = executionManager.getResult(execution.getId()); - return externalExecution.fetchExternalResult(fileName); + return externalResult.fetchExternalResult(fileName); } } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java index 79581354d0..11926f411c 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java @@ -22,7 +22,6 @@ import com.bakdata.conquery.sql.conversion.dialect.SqlDialectFactory; import com.bakdata.conquery.sql.execution.ResultSetProcessor; import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory; -import com.bakdata.conquery.sql.execution.SqlExecutionResult; import com.bakdata.conquery.sql.execution.SqlExecutionService; import io.dropwizard.core.setup.Environment; import lombok.RequiredArgsConstructor; @@ -54,7 +53,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, resultSetProcessor); NodeConversions nodeConversions = new NodeConversions(idColumns, sqlDialect, dslContext, databaseConfig, sqlExecutionService); SqlConverter sqlConverter = new SqlConverter(nodeConversions, config); - ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage); + ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage); SqlStorageHandler sqlStorageHandler = new SqlStorageHandler(sqlExecutionService); SqlEntityResolver sqlEntityResolver = new SqlEntityResolver(idColumns, dslContext, sqlDialect, sqlExecutionService); diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/DatasetPermission.java b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/DatasetPermission.java index db68212eae..d3e9d3719b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/DatasetPermission.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/DatasetPermission.java @@ -4,6 +4,7 @@ import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; +import org.jetbrains.annotations.TestOnly; @CPSType(id = "DATASET", base = StringPermissionBuilder.class) public class DatasetPermission extends StringPermissionBuilder { @@ -38,7 +39,7 @@ public static ConqueryPermission onInstance(Set abilities, DatasetId in return INSTANCE.instancePermission(abilities, instance); } - @Deprecated + @TestOnly public static ConqueryPermission onInstance(Ability ability, DatasetId instance) { return INSTANCE.instancePermission(ability, instance); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/ExecutionPermission.java b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/ExecutionPermission.java index 83cfda3f87..4c0b55bb54 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/ExecutionPermission.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/ExecutionPermission.java @@ -5,6 +5,7 @@ import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; +import org.jetbrains.annotations.TestOnly; @CPSType(id = "EXECUTION", base = StringPermissionBuilder.class) public class ExecutionPermission extends StringPermissionBuilder { @@ -43,7 +44,7 @@ public Set getAllowedAbilities() { } //// Helper functions - @Deprecated + @TestOnly public static ConqueryPermission onInstance(Ability ability, ManagedExecutionId instance) { return INSTANCE.instancePermission(ability, instance); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormConfigPermission.java b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormConfigPermission.java index bbb683a939..519c0c0a2c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormConfigPermission.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormConfigPermission.java @@ -5,6 +5,7 @@ import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.identifiable.ids.specific.FormConfigId; +import org.jetbrains.annotations.TestOnly; @CPSType(id = "FORM_CONFIG", base = StringPermissionBuilder.class) public class FormConfigPermission extends StringPermissionBuilder { @@ -36,7 +37,7 @@ public Set getAllowedAbilities() { } //// Helper functions - @Deprecated + @TestOnly public static ConqueryPermission onInstance(Set abilities, FormConfigId instance) { return INSTANCE.instancePermission(abilities, instance); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormPermission.java b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormPermission.java index 242b17d3bf..91daa31c9c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormPermission.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/permissions/FormPermission.java @@ -1,9 +1,10 @@ package com.bakdata.conquery.models.auth.permissions; -import com.bakdata.conquery.io.cps.CPSType; - import java.util.Set; +import com.bakdata.conquery.io.cps.CPSType; +import org.jetbrains.annotations.TestOnly; + /** * Permission to restrict the usage of a specific form type. * The forms are programmatically distinguished from their CPSType. @@ -34,7 +35,7 @@ public static ConqueryPermission onInstance(Set abilities, String insta return INSTANCE.instancePermission(abilities, instance); } - @Deprecated + @TestOnly public static ConqueryPermission onInstance(Ability ability, String instance) { return INSTANCE.instancePermission(ability, instance); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java b/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java index ffef2f45ec..fa364c7fd6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ExternalResultProvider.java @@ -2,16 +2,15 @@ import java.util.Collection; import java.util.Collections; - import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.ResultAsset; import com.bakdata.conquery.commands.ManagerNode; import com.bakdata.conquery.io.cps.CPSType; -import com.bakdata.conquery.io.result.ExternalResult; import com.bakdata.conquery.io.result.ResultRender.ResultRendererProvider; import com.bakdata.conquery.io.result.external.ExternalResultProcessor; import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.forms.managed.ExternalExecution; import com.bakdata.conquery.resources.api.ResultExternalResource; import io.dropwizard.jersey.DropwizardResourceConfig; import lombok.Getter; @@ -28,7 +27,7 @@ public class ExternalResultProvider implements ResultRendererProvider { @Override public Collection generateResultURLs(ManagedExecution exec, UriBuilder uriBuilder, boolean allProviders) { - if (!(exec instanceof ExternalResult)) { + if (!(exec instanceof ExternalExecution)) { return Collections.emptyList(); } @@ -36,7 +35,7 @@ public Collection generateResultURLs(ManagedExecution exec, UriBuil return Collections.emptyList(); } - return ((ExternalResult) exec).getResultAssets().map(assetBuilder -> assetBuilder.apply(uriBuilder.clone())).toList(); + return ((ExternalExecution) exec).getResultAssets().map(assetBuilder -> assetBuilder.apply(uriBuilder.clone())).toList(); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java b/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java index 2474583b01..f56322b3a4 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/execution/InternalExecution.java @@ -1,18 +1,11 @@ package com.bakdata.conquery.models.execution; -import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; -import com.bakdata.conquery.models.query.results.ShardResult; - /** * This interface must be implemented if a {@link ManagedExecution} requires direct computation using the query engine on the shard nodes. * - * @param The type of result, the execution assumes to be returned from the shards. */ -public interface InternalExecution { +public interface InternalExecution { + - /** - * The message sent to shard nodes, to execute the query - */ - WorkerMessage createExecutionMessage(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java index ae1887bef7..3993f344b9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java @@ -8,11 +8,9 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - -import javax.annotation.Nullable; +import jakarta.validation.constraints.NotNull; +import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.ExecutionStatus; import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; @@ -39,6 +37,7 @@ import com.bakdata.conquery.models.identifiable.IdentifiableImpl; import com.bakdata.conquery.models.identifiable.ids.specific.GroupId; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; +import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.Visitable; import com.bakdata.conquery.models.worker.Namespace; @@ -51,12 +50,10 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.OptBoolean; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import jakarta.validation.constraints.NotNull; -import jakarta.ws.rs.core.UriBuilder; import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.Setter; import lombok.ToString; @@ -71,6 +68,7 @@ @CPSBase @JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type") @EqualsAndHashCode(callSuper = false) +@NoArgsConstructor(access = AccessLevel.PROTECTED) public abstract class ManagedExecution extends IdentifiableImpl implements Taggable, Shareable, Labelable, Owned, Visitable { /** @@ -85,7 +83,6 @@ public abstract class ManagedExecution extends IdentifiableImpl permittedGroups = new ArrayList<>(); - for (Group group : storage.getAllGroups()) { + for (Group group : getMetaStorage().getAllGroups()) { for (Permission perm : group.getPermissions()) { if (perm.implies(createPermission(Ability.READ.asSet()))) { permittedGroups.add(group.getId()); @@ -358,14 +327,14 @@ private void setAdditionalFieldsForStatusWithGroups(FullExecutionStatus status) status.setGroups(permittedGroups); } - protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject, FullExecutionStatus status) { + protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject, FullExecutionStatus status, Namespace namespace) { // Implementation specific } /** * Sets additional fields of an {@link ExecutionStatus} when a more specific status is requested. */ - protected void setAdditionalFieldsForStatusWithSource(Subject subject, FullExecutionStatus status) { + protected void setAdditionalFieldsForStatusWithSource(Subject subject, FullExecutionStatus status, Namespace namespace) { QueryDescription query = getSubmitted(); status.setCanExpand(canSubjectExpand(subject, query)); @@ -442,7 +411,7 @@ public ConqueryPermission createPermission(Set abilities) { return ExecutionPermission.onInstance(abilities, getId()); } - public void reset() { + public void reset(ExecutionManager executionManager) { // This avoids endless loops with already reset queries if(getState().equals(ExecutionState.NEW)){ return; diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/Owned.java b/backend/src/main/java/com/bakdata/conquery/models/execution/Owned.java index cc319cf49f..1c77dd5911 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/execution/Owned.java +++ b/backend/src/main/java/com/bakdata/conquery/models/execution/Owned.java @@ -1,7 +1,7 @@ package com.bakdata.conquery.models.execution; -import com.bakdata.conquery.models.auth.permissions.Authorized; import com.bakdata.conquery.models.auth.entities.User; +import com.bakdata.conquery.models.auth.permissions.Authorized; public interface Owned extends Authorized { User getOwner(); diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java b/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java index 12299bd15c..baa4706bf9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java @@ -10,13 +10,11 @@ import java.util.Set; import java.util.UUID; import java.util.function.Consumer; - import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import com.bakdata.conquery.apiv1.FormConfigPatch; import com.bakdata.conquery.io.jackson.serializer.MetaIdRef; -import com.bakdata.conquery.io.jackson.serializer.NsIdRef; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.models.auth.entities.Group; import com.bakdata.conquery.models.auth.entities.Subject; @@ -24,12 +22,12 @@ import com.bakdata.conquery.models.auth.permissions.Ability; import com.bakdata.conquery.models.auth.permissions.ConqueryPermission; import com.bakdata.conquery.models.auth.permissions.FormConfigPermission; -import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.execution.Labelable; import com.bakdata.conquery.models.execution.Owned; import com.bakdata.conquery.models.execution.Shareable; import com.bakdata.conquery.models.execution.Taggable; import com.bakdata.conquery.models.identifiable.IdentifiableImpl; +import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.FormConfigId; import com.bakdata.conquery.models.identifiable.ids.specific.GroupId; import com.bakdata.conquery.util.VariableDefaultValue; @@ -56,8 +54,7 @@ @FieldNameConstants public class FormConfig extends IdentifiableImpl implements Shareable, Labelable, Taggable, Owned { - @NsIdRef - protected Dataset dataset; + protected DatasetId dataset; @NotEmpty private String formType; @VariableDefaultValue @NonNull @@ -86,14 +83,14 @@ public FormConfig(String formType, JsonNode values) { @Override public FormConfigId createId() { - return new FormConfigId(dataset.getId(), formType, formId); + return new FormConfigId(dataset, formType, formId); } /** * Provides an overview (meta data) of this form configuration without the * actual form field values. */ - public FormConfigOverviewRepresentation overview(Subject subject) { + public FormConfigOverviewRepresentation overview(MetaStorage storage, Subject subject) { String ownerName = Optional.ofNullable(owner).map(User::getLabel).orElse(null); return FormConfigOverviewRepresentation.builder() @@ -124,7 +121,6 @@ public FormConfigFullRepresentation fullRepresentation(MetaStorage storage, Subj for(Permission perm : group.getPermissions()) { if(perm.implies(createPermission(Ability.READ.asSet()))) { permittedGroups.add(group.getId()); - continue; } } } @@ -192,7 +188,7 @@ public static class FormConfigFullRepresentation extends FormConfigOverviewRepre } public Consumer valueSetter() { - return (patch) -> {setValues(patch.getValues());}; + return (patch) -> setValues(patch.getValues()); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/frontendconfiguration/FormConfigProcessor.java b/backend/src/main/java/com/bakdata/conquery/models/forms/frontendconfiguration/FormConfigProcessor.java index 4ade864697..6000c90819 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/frontendconfiguration/FormConfigProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/frontendconfiguration/FormConfigProcessor.java @@ -5,7 +5,6 @@ import java.util.List; import java.util.Set; import java.util.stream.Stream; - import jakarta.inject.Inject; import jakarta.validation.Validator; @@ -87,12 +86,12 @@ public Stream getConfigsByFormType(@NonNull Su final Set formTypesFinal = requestedFormType; final Stream stream = storage.getAllFormConfigs().stream() - .filter(c -> dataset.equals(c.getDataset())) + .filter(c -> dataset.getId().equals(c.getDataset())) .filter(c -> formTypesFinal.contains(c.getFormType())) .filter(c -> subject.isPermitted(c, Ability.READ)); - return stream.map(c -> c.overview(subject)); + return stream.map(c -> c.overview(storage, subject)); } /** @@ -117,7 +116,7 @@ public FormConfig addConfig(Subject subject, Dataset targetDataset, FormConfigAP subject.authorize(namespace.getDataset(), Ability.READ); - final FormConfig internalConfig = config.intern(storage.getUser(subject.getId()), targetDataset); + final FormConfig internalConfig = config.intern(storage.getUser(subject.getId()), targetDataset.getId()); // Add the config immediately to the submitted dataset addConfigToDataset(internalConfig); diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java index f110eeeb87..f70e84f3fa 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ExternalExecution.java @@ -3,20 +3,18 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.stream.Stream; -import jakarta.ws.rs.core.Response; - import com.bakdata.conquery.apiv1.execution.ExecutionStatus; import com.bakdata.conquery.apiv1.execution.ResultAsset; import com.bakdata.conquery.apiv1.forms.ExternalForm; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.io.external.form.ExternalFormBackendApi; import com.bakdata.conquery.io.external.form.ExternalTaskState; -import com.bakdata.conquery.io.result.ExternalResult; +import com.bakdata.conquery.io.result.ExternalState; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.models.auth.entities.Subject; import com.bakdata.conquery.models.auth.entities.User; @@ -25,16 +23,19 @@ import com.bakdata.conquery.models.error.ConqueryError; import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.query.ExecutionManager; +import com.bakdata.conquery.models.query.ExternalStateImpl; +import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.resources.api.ResultExternalResource; import com.bakdata.conquery.util.AuthUtil; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.OptBoolean; import com.google.common.base.Preconditions; import com.google.common.collect.MoreCollectors; import it.unimi.dsi.fastutil.Pair; +import lombok.AccessLevel; import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -46,101 +47,90 @@ @Slf4j @CPSType(id = "EXTERNAL_EXECUTION", base = ManagedExecution.class) @EqualsAndHashCode(callSuper = true, doNotUseGetters = true) -public class ExternalExecution extends ManagedForm implements ExternalResult { +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class ExternalExecution extends ManagedForm { + @Getter private UUID externalTaskId; @JsonIgnore @EqualsAndHashCode.Exclude - private ExternalFormBackendApi api; - - @JsonIgnore - @EqualsAndHashCode.Exclude - private FormBackendConfig formBackendConfig; - - @JsonIgnore - @EqualsAndHashCode.Exclude - private User serviceUser; - + private ExecutionManager executionManager; - /** - * Pairs of external result assets (internal url) and their internal asset builder. - * The internal asset builder generates the asset url with the context of a user request. - */ - @JsonIgnore - private List> resultsAssetMap = Collections.emptyList(); - - @JsonCreator - protected ExternalExecution(@JacksonInject(useInput = OptBoolean.FALSE) MetaStorage storage) { - super(storage); - } - public ExternalExecution(ExternalForm form, User user, Dataset dataset, MetaStorage storage) { - super(form, user, dataset, storage); + public ExternalExecution(ExternalForm form, User user, Dataset dataset, MetaStorage metaStorage) { + super(form, user, dataset, metaStorage); } @Override - protected void doInitExecutable() { - formBackendConfig = getConfig().getPluginConfigs(FormBackendConfig.class) - .filter(c -> c.supportsFormType(getSubmittedForm().getFormType())) - .collect(MoreCollectors.onlyElement()); - - api = formBackendConfig.createApi(); + protected void doInitExecutable(Namespace namespace) { + executionManager = namespace.getExecutionManager(); } - @Override public void start() { synchronized (this) { if (externalTaskId != null) { - syncExternalState(); + syncExternalState(executionManager); } if (getState() == ExecutionState.RUNNING) { throw new ConqueryError.ExecutionProcessingError(); } - super.start(); // Create service user - serviceUser = formBackendConfig.createServiceUser(getOwner(), getDataset()); + Dataset dataset = getNamespace().getDataset(); + User originalUser = getOwner(); + FormBackendConfig formBackendConfig = getConfig().getPluginConfigs(FormBackendConfig.class) + .filter(c -> c.supportsFormType(getSubmittedForm().getFormType())) + .collect(MoreCollectors.onlyElement()); + User serviceUser = formBackendConfig.createServiceUser(originalUser, dataset); + ExternalFormBackendApi api = formBackendConfig.createApi(); - final ExternalTaskState externalTaskState = api.postForm(getSubmitted(), getOwner(), serviceUser, getDataset()); + final ExternalTaskState externalTaskState = api.postForm(getSubmitted(), originalUser, serviceUser, dataset); + + executionManager.addState(this.getId(), new ExternalStateImpl(new CountDownLatch(0), api, serviceUser)); externalTaskId = externalTaskState.getId(); + + super.start(); } } - private synchronized void syncExternalState() { + private synchronized void syncExternalState(ExecutionManager executionManager) { Preconditions.checkNotNull(externalTaskId, "Cannot check external task, because no Id is present"); - final ExternalTaskState formState = api.getFormState(externalTaskId); + ExternalState state = this.executionManager.getResult(this.getId()); + final ExternalTaskState formState = state.getApi().getFormState(externalTaskId); - updateStatus(formState); + updateStatus(formState, executionManager); } - private void updateStatus(ExternalTaskState formState) { + private void updateStatus(ExternalTaskState formState, ExecutionManager executionManager) { switch (formState.getStatus()) { case RUNNING -> { setState(ExecutionState.RUNNING); setProgress(formState.getProgress().floatValue()); } - case FAILURE -> fail(formState.getError()); + case FAILURE -> fail(formState.getError(), executionManager); case SUCCESS -> { - resultsAssetMap = registerResultAssets(formState); - finish(ExecutionState.DONE); + List> resultsAssetMap = registerResultAssets(formState); + ExternalState state = this.executionManager.getResult(this.getId()); + state.setResultsAssetMap(resultsAssetMap); + finish(ExecutionState.DONE, executionManager); } - case CANCELLED -> reset(); + case CANCELLED -> reset(executionManager); } } - private List> registerResultAssets(ExternalTaskState response) { - final List> assetMap = new ArrayList<>(); + private List> registerResultAssets(ExternalTaskState response) { + final List> assetMap = new ArrayList<>(); response.getResults().forEach(asset -> assetMap.add(Pair.of(asset, createResultAssetBuilder(asset)))); return assetMap; } @@ -148,7 +138,7 @@ private List> registerResultAssets(ExternalTaskS /** * The {@link ResultAsset} is request-dependent, so we can prepare only builder here which takes an url builder. */ - private AssetBuilder createResultAssetBuilder(ResultAsset asset) { + private ExternalState.AssetBuilder createResultAssetBuilder(ResultAsset asset) { return (uriBuilder) -> { try { final URI externalDownloadURL = ResultExternalResource.getDownloadURL(uriBuilder.clone(), this, asset.getAssetId()); @@ -161,10 +151,10 @@ private AssetBuilder createResultAssetBuilder(ResultAsset asset) { } @Override - public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) { - syncExternalState(); + public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status, Namespace namespace) { + syncExternalState(namespace.getExecutionManager()); - super.setStatusBase(subject, status); + super.setStatusBase(subject, status, namespace); } @Override @@ -172,33 +162,29 @@ public void cancel() { //TODO this is no longer called as the ExecutionManager used to call this. Preconditions.checkNotNull(externalTaskId, "Cannot check external task, because no Id is present"); - updateStatus(api.cancelTask(externalTaskId)); - } - - @Override - public Stream getResultAssets() { - return resultsAssetMap.stream().map(Pair::value); + ExternalState state = executionManager.getResult(this.getId()); + updateStatus( state.getApi().cancelTask(externalTaskId), executionManager); } @Override - public Response fetchExternalResult(String assetId) { - final ResultAsset resultRef = resultsAssetMap.stream() - .map(Pair::key).filter(a -> a.getAssetId().equals(assetId)) - .collect(MoreCollectors.onlyElement()); - - return api.getResult(resultRef.url()); - } - - @Override - public void finish(ExecutionState executionState) { + public void finish(ExecutionState executionState, ExecutionManager executionManager) { if (getState().equals(executionState)) { return; } + ExternalState state = executionManager.getResult(this.getId()); + User serviceUser = state.getServiceUser(); + + super.finish(executionState, executionManager); - super.finish(executionState); synchronized (this) { - AuthUtil.cleanUpUserAndBelongings(serviceUser, getStorage()); - serviceUser = null; + AuthUtil.cleanUpUserAndBelongings(serviceUser, getMetaStorage()); } + + } + + @JsonIgnore + public Stream getResultAssets() { + ExternalState state = executionManager.getResult(this.getId()); + return state.getResultAssets(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java index 410460a5ac..cf04d068e5 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedForm.java @@ -12,12 +12,12 @@ import com.bakdata.conquery.models.forms.configs.FormConfig; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.Visitable; -import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.OptBoolean; import com.fasterxml.jackson.databind.DatabindContext; +import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ @Slf4j @EqualsAndHashCode(callSuper = true) @CPSType(id = "MANAGED_FORM", base = ManagedExecution.class) +@NoArgsConstructor(access = AccessLevel.PROTECTED) public abstract class ManagedForm extends ManagedExecution { /** @@ -44,10 +45,6 @@ public abstract class ManagedForm extends ManagedExecution { @Getter private Form submittedForm; - protected ManagedForm(@JacksonInject(useInput = OptBoolean.FALSE) MetaStorage storage) { - super(storage); - } - protected ManagedForm(F submittedForm, User owner, Dataset submittedDataset, MetaStorage storage) { super(owner, submittedDataset, storage); this.submittedForm = submittedForm; @@ -66,9 +63,9 @@ public void start() { .tags(this.getTags()) .values(getSubmittedForm().getValues()).build(); - final FormConfig formConfig = build.intern(getOwner(), getDataset()); + final FormConfig formConfig = build.intern(getOwner(), getDataset().getId()); - getStorage().addFormConfig(formConfig); + getMetaStorage().addFormConfig(formConfig); } } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java index 44247be5b8..d8ca727319 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java @@ -20,8 +20,6 @@ import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.identifiable.IdMap; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; -import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; -import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteForm; import com.bakdata.conquery.models.query.ColumnDescriptor; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.query.PrintSettings; @@ -29,12 +27,12 @@ import com.bakdata.conquery.models.query.SingleTableResult; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; -import com.bakdata.conquery.models.query.results.FormShardResult; -import com.fasterxml.jackson.annotation.JacksonInject; +import com.bakdata.conquery.models.worker.Namespace; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.OptBoolean; +import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -46,7 +44,8 @@ @CPSType(base = ManagedExecution.class, id = "INTERNAL_FORM") @Getter @EqualsAndHashCode(callSuper = true) -public class ManagedInternalForm extends ManagedForm implements SingleTableResult, InternalExecution { +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class ManagedInternalForm extends ManagedForm implements SingleTableResult, InternalExecution { /** @@ -64,10 +63,6 @@ public class ManagedInternalForm extends ManagedF @EqualsAndHashCode.Exclude private final IdMap flatSubQueries = new IdMap<>(); - public ManagedInternalForm(@JacksonInject(useInput = OptBoolean.FALSE) MetaStorage storage) { - super(storage); - } - public ManagedInternalForm(F form, User user, Dataset submittedDataset, MetaStorage storage) { super(form, user, submittedDataset, storage); } @@ -78,9 +73,9 @@ public ManagedQuery getSubQuery(ManagedExecutionId subQueryId) { } @Override - public void doInitExecutable() { + public void doInitExecutable(Namespace namespace) { // Convert sub queries to sub executions - getSubmitted().resolve(new QueryResolveContext(getNamespace(), getConfig(), getStorage(), null)); + getSubmitted().resolve(new QueryResolveContext(getNamespace(), getConfig(), getMetaStorage(), null)); subQueries = createSubExecutions(); // Initialize sub executions @@ -93,7 +88,7 @@ private Map createSubExecutions() { .entrySet() .stream().collect(Collectors.toMap( Map.Entry::getKey, - e -> e.getValue().toManagedExecution(getOwner(), getDataset(), getStorage()) + e -> e.getValue().toManagedExecution(getOwner(), getDataset(), getMetaStorage()) )); } @@ -102,7 +97,7 @@ private Map createSubExecutions() { @Override public void start() { synchronized (this) { - subQueries.values().stream().forEach(flatSubQueries::add); + subQueries.values().forEach(flatSubQueries::add); } flatSubQueries.values().forEach(ManagedQuery::start); super.start(); @@ -114,7 +109,7 @@ public List generateColumnDescriptions(boolean isInitialized, } - protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject, FullExecutionStatus status) { + protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject, FullExecutionStatus status, Namespace namespace) { // Set the ColumnDescription if the Form only consits of a single subquery if (subQueries == null) { // If subqueries was not set the Execution was not initialized, do it manually @@ -164,13 +159,6 @@ public long resultRowCount() { return subQueries.values().iterator().next().resultRowCount(); } - @Override - public WorkerMessage createExecutionMessage() { - return new ExecuteForm(getId(), flatSubQueries.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getQuery()))); - } - - public boolean allSubQueriesDone() { synchronized (this) { return flatSubQueries.values().stream().allMatch(q -> q.getState().equals(ExecutionState.DONE)); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java index 7db39b2938..091b26e91c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/DistributedExecutionManager.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import java.util.stream.Stream; import com.bakdata.conquery.io.storage.MetaStorage; @@ -12,32 +14,46 @@ import com.bakdata.conquery.mode.cluster.ClusterState; import com.bakdata.conquery.models.auth.AuthorizationHelper; import com.bakdata.conquery.models.auth.entities.Group; -import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.forms.managed.ManagedInternalForm; +import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; import com.bakdata.conquery.models.messages.namespaces.specific.CancelQuery; +import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteForm; +import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteQuery; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.query.results.ShardResult; -import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.models.worker.WorkerHandler; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.NotImplementedException; @Slf4j -public class DistributedExecutionManager extends ExecutionManager { +public class DistributedExecutionManager extends ExecutionManager { - public record DistributedResult(Map> results) implements Result { + public record DistributedState(Map> results, CountDownLatch executingLock) implements InternalState { - public DistributedResult() { - this(new ConcurrentHashMap<>()); + public DistributedState() { + this(new ConcurrentHashMap<>(), new CountDownLatch(1)); } @Override public Stream streamQueryResults() { return results.values().stream().flatMap(Collection::stream); } + + @Override + public CountDownLatch getExecutingLock() { + return executingLock; + } + + public boolean allResultsArrived(Set allWorkers) { + Set finishedWorkers = results.keySet(); + return finishedWorkers.equals(allWorkers); + } } private final ClusterState clusterState; @@ -50,19 +66,37 @@ public DistributedExecutionManager(MetaStorage storage, ClusterState state) { @Override - protected void doExecute(Namespace namespace, InternalExecution internalExecution) { - ManagedExecution execution = (ManagedExecution & InternalExecution) internalExecution; + protected void doExecute(E execution) { + + log.info("Executing Query[{}] in Dataset[{}]", execution.getQueryId(), execution.getDataset()); + + addState(execution.getId(), new DistributedState()); + + if (execution instanceof ManagedInternalForm form) { + form.getSubQueries().values().forEach((query) -> addState(query.getId(), new DistributedState())); + } + + final WorkerHandler workerHandler = getWorkerHandler(execution.getId().getDataset()); - log.info("Executing Query[{}] in Dataset[{}]", execution.getQueryId(), namespace.getDataset().getId()); + workerHandler.sendToAll(createExecutionMessage(execution)); + } - final WorkerHandler workerHandler = getWorkerHandler(execution); + private WorkerMessage createExecutionMessage(ManagedExecution execution) { + if (execution instanceof ManagedQuery mq) { + return new ExecuteQuery(mq.getId(), mq.getQuery()); + } + else if (execution instanceof ManagedInternalForm form) { + return new ExecuteForm(form.getId(), form.getFlatSubQueries() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getQuery()))); + } + throw new NotImplementedException("Unable to build execution message for " + execution.getClass()); - workerHandler.sendToAll(internalExecution.createExecutionMessage()); } - private WorkerHandler getWorkerHandler(ManagedExecution execution) { - return clusterState.getWorkerHandlers() - .get(execution.getDataset().getId()); + private WorkerHandler getWorkerHandler(DatasetId datasetId) { + return clusterState.getWorkerHandlers().get(datasetId); } /** @@ -71,41 +105,43 @@ private WorkerHandler getWorkerHandler(ManagedExecution execution) { * @implNote subQueries of Forms are managed by the form itself, so need to be passed from outside. */ @SneakyThrows - public > void handleQueryResult(R result, E query) { + public void handleQueryResult(R result, E execution) { log.debug("Received Result[size={}] for Query[{}]", result.getResults().size(), result.getQueryId()); log.trace("Received Result\n{}", result.getResults()); - if (query.getState() != ExecutionState.RUNNING) { - log.warn("Received result for Query[{}] that is not RUNNING but {}", query.getId(), query.getState()); + if (execution.getState() != ExecutionState.RUNNING) { + log.warn("Received result for Query[{}] that is not RUNNING but {}", execution.getId(), execution.getState()); return; } if (result.getError().isPresent()) { - query.fail(result.getError().get()); + execution.fail(result.getError().get(), this); } else { // We don't collect all results together into a fat list as that would cause lots of huge re-allocations for little gain. - final DistributedResult results = getResult(query, DistributedResult::new); - results.results.put(result.getWorkerId(), result.getResults()); - - final Set finishedWorkers = results.results.keySet(); + State state = getResult(execution.getId()); + if (!(state instanceof DistributedState distributedState)) { + throw new IllegalStateException("Expected execution '%s' to be of type %s, but was %s".formatted(execution.getId(), DistributedState.class, state.getClass())); + } + distributedState.results.put(result.getWorkerId(), result.getResults()); // If all known workers have returned a result, the query is DONE. - if (finishedWorkers.equals(getWorkerHandler(query).getAllWorkerIds())) { - query.finish(ExecutionState.DONE); + if (distributedState.allResultsArrived(getWorkerHandler(execution.getDataset().getId()).getAllWorkerIds())) { + execution.finish(ExecutionState.DONE, this); + } } // State changed to DONE or FAILED - if (query.getState() != ExecutionState.RUNNING) { - final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(query.getOwner(), getStorage()).map(Group::getName).orElse("none"); + if (execution.getState() != ExecutionState.RUNNING) { + final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(execution.getOwner(), getStorage()).map(Group::getName).orElse("none"); ExecutionMetrics.getRunningQueriesCounter(primaryGroupName).dec(); - ExecutionMetrics.getQueryStateCounter(query.getState(), primaryGroupName).inc(); - ExecutionMetrics.getQueriesTimeHistogram(primaryGroupName).update(query.getExecutionTime().toMillis()); + ExecutionMetrics.getQueryStateCounter(execution.getState(), primaryGroupName).inc(); + ExecutionMetrics.getQueriesTimeHistogram(primaryGroupName).update(execution.getExecutionTime().toMillis()); /* This log is here to prevent an NPE which could occur when no strong reference to result.getResults() existed anymore after the query finished and immediately was reset */ @@ -115,11 +151,11 @@ public } @Override - public void cancelQuery(Dataset dataset, ManagedExecution query) { + public void doCancelQuery(ManagedExecution execution) { log.debug("Sending cancel message to all workers."); - query.cancel(); - getWorkerHandler(query).sendToAll(new CancelQuery(query.getId())); + execution.cancel(); + getWorkerHandler(execution.createId().getDataset()).sendToAll(new CancelQuery(execution.getId())); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java index 4f560a0cbf..d295f1561c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java @@ -1,8 +1,10 @@ package com.bakdata.conquery.models.query; +import java.util.NoSuchElementException; +import java.util.Objects; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import com.bakdata.conquery.apiv1.query.QueryDescription; @@ -12,30 +14,48 @@ import com.bakdata.conquery.models.auth.entities.Group; import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ConqueryConfig; -import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.error.ConqueryError; +import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.forms.managed.ExternalExecution; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.worker.Namespace; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.Uninterruptibles; import lombok.Data; import lombok.extern.slf4j.Slf4j; @Data @Slf4j -public abstract class ExecutionManager { +public abstract class ExecutionManager { - public interface Result { + /** + * Holds all informations about an execution, which cannot/should not be serialized/cached in a store. + */ + public interface State { + + /** + * Synchronization barrier for web requests. + * Barrier is activated upon starting an execution so request can wait for execution completion. + * When the execution is finished the barrier is removed. + */ + CountDownLatch getExecutingLock(); + } + + public interface InternalState extends State{ Stream streamQueryResults(); } private final MetaStorage storage; - private final Cache executionResults = + /** + * Cache for execution states. + */ + private final Cache executionStates = CacheBuilder.newBuilder() .softValues() .removalListener(this::executionRemoved) @@ -44,7 +64,7 @@ public interface Result { /** * Manage state of evicted Queries, setting them to NEW. */ - private void executionRemoved(RemovalNotification removalNotification) { + private void executionRemoved(RemovalNotification removalNotification) { // If removal was done manually we assume it was also handled properly if (!removalNotification.wasEvicted()) { return; @@ -58,7 +78,7 @@ private void executionRemoved(RemovalNotification removal // The query might already be deleted if (execution != null) { - execution.reset(); + execution.reset(this); } } @@ -67,23 +87,28 @@ public ManagedExecution getExecution(ManagedExecutionId execution) { return storage.getExecution(execution); } - protected R getResult(ManagedExecution execution, Callable defaultProvider) throws ExecutionException { - return executionResults.get(execution.getId(), defaultProvider); + public R getResult(ManagedExecutionId id) { + State state = executionStates.getIfPresent(id); + if (state == null) { + throw new NoSuchElementException("No execution found for %s".formatted(id)); + } + return (R) state; } - protected void addResult(ManagedExecution execution, R result) { - executionResults.put(execution.getId(), result); + public void addState(ManagedExecutionId id, State result) { + executionStates.put(id, result); } - public final ManagedExecution runQuery(Namespace namespace, QueryDescription query, User user, Dataset submittedDataset, ConqueryConfig config, boolean system) { - final ManagedExecution execution = createExecution(query, user, submittedDataset, system); + public final ManagedExecution runQuery(Namespace namespace, QueryDescription query, User user, ConqueryConfig config, boolean system) { + final ManagedExecution execution = createExecution(query, user, namespace, system); + execute(namespace, execution, config); return execution; } - public final void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config) { + public final void execute(Namespace namespace, ManagedExecution execution, ConqueryConfig config) { clearQueryResults(execution); @@ -103,30 +128,37 @@ public final void execute(Namespace namespace, ManagedExecution execution, Conq throw e; } - log.info("Starting execution[{}]", execution.getQueryId()); - - execution.start(); + ManagedExecutionId executionId = execution.getId(); + log.info("Starting execution[{}]", executionId); + try { + execution.start(); - final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(execution.getOwner(), storage).map(Group::getName).orElse("none"); - ExecutionMetrics.getRunningQueriesCounter(primaryGroupName).inc(); + final String primaryGroupName = AuthorizationHelper.getPrimaryGroup(execution.getOwner(), storage).map(Group::getName).orElse("none"); + ExecutionMetrics.getRunningQueriesCounter(primaryGroupName).inc(); - if (execution instanceof InternalExecution internalExecution) { - doExecute(namespace, internalExecution); + if (execution instanceof InternalExecution internalExecution) { + doExecute((ManagedExecution & InternalExecution) internalExecution); + } + } + catch (Exception e) { + log.warn("Failed to execute '{}'", executionId); + execution.fail(ConqueryError.asConqueryError(e), this); } } - protected abstract void doExecute(Namespace namespace, InternalExecution execution); + protected abstract void doExecute(E execution); // Visible for testing - public final ManagedExecution createExecution(QueryDescription query, User user, Dataset submittedDataset, boolean system) { - return createQuery(query, UUID.randomUUID(), user, submittedDataset, system); + public final ManagedExecution createExecution(QueryDescription query, User user, Namespace namespace, boolean system) { + return createExecution(query, UUID.randomUUID(), user, namespace, system); } - public final ManagedExecution createQuery(QueryDescription query, UUID queryId, User user, Dataset submittedDataset, boolean system) { + public final ManagedExecution createExecution(QueryDescription query, UUID queryId, User user, Namespace namespace, boolean system) { // Transform the submitted query into an initialized execution - ManagedExecution managed = query.toManagedExecution(user, submittedDataset, storage); + ManagedExecution managed = query.toManagedExecution(user, namespace.getDataset(), storage); managed.setSystem(system); managed.setQueryId(queryId); + managed.setMetaStorage(storage); // Store the execution storage.addExecution(managed); @@ -134,18 +166,55 @@ public final ManagedExecution createQuery(QueryDescription query, UUID queryId, return managed; } - public abstract void cancelQuery(final Dataset dataset, final ManagedExecution query); + public final void cancelQuery(final ManagedExecution execution) { + executionStates.invalidate(execution.getId()); + + if (execution instanceof ExternalExecution externalExecution) { + externalExecution.cancel(); + return; + } + doCancelQuery(execution); + } + + + public abstract void doCancelQuery(final ManagedExecution execution); public void clearQueryResults(ManagedExecution execution) { - executionResults.invalidate(execution.getId()); + executionStates.invalidate(execution.getId()); } - public Stream streamQueryResults(ManagedExecution execution) { - final R resultParts = executionResults.getIfPresent(execution.getId()); + public Stream streamQueryResults(E execution) { + final InternalState resultParts = (InternalState) executionStates.getIfPresent(execution.getId()); return resultParts == null ? Stream.empty() : resultParts.streamQueryResults(); } + + public void clearBarrier(ManagedExecutionId id) { + State result = Objects.requireNonNull(executionStates.getIfPresent(id), "Cannot clear lock on absent execution result"); + + result.getExecutingLock().countDown(); + } + + /** + * Blocks until an execution finished of the specified timeout is reached. Return immediately if the execution is not running + */ + public ExecutionState awaitDone(ManagedExecution execution, int time, TimeUnit unit) { + ManagedExecutionId id = execution.getId(); + ExecutionState state = execution.getState(); + if (state != ExecutionState.RUNNING) { + return state; + } + + State result = executionStates.getIfPresent(id); + + if (result == null) { + throw new IllegalStateException("Execution is running, but no result is registered"); + } + Uninterruptibles.awaitUninterruptibly(result.getExecutingLock(), time, unit); + + return execution.getState(); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ExternalStateImpl.java b/backend/src/main/java/com/bakdata/conquery/models/query/ExternalStateImpl.java new file mode 100644 index 0000000000..68911f1039 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ExternalStateImpl.java @@ -0,0 +1,54 @@ +package com.bakdata.conquery.models.query; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; +import jakarta.ws.rs.core.Response; + +import com.bakdata.conquery.apiv1.execution.ResultAsset; +import com.bakdata.conquery.io.external.form.ExternalFormBackendApi; +import com.bakdata.conquery.io.result.ExternalState; +import com.bakdata.conquery.models.auth.entities.User; +import com.google.common.collect.MoreCollectors; +import it.unimi.dsi.fastutil.Pair; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + +@RequiredArgsConstructor +public class ExternalStateImpl implements ExternalState { + private final CountDownLatch latch; + + @Getter + private final ExternalFormBackendApi api; + + @Getter + private final User serviceUser; + + /** + * Pairs of external result assets (internal url) and their internal asset builder. + * The internal asset builder generates the asset url with the context of a user request. + */ + @Setter + private List> resultsAssetMap = Collections.emptyList(); + + @Override + public CountDownLatch getExecutingLock() { + return latch; + } + + @Override + public Stream getResultAssets() { + return resultsAssetMap.stream().map(Pair::value); + } + + @Override + public Response fetchExternalResult(String assetId) { + final ResultAsset resultRef = resultsAssetMap.stream() + .map(Pair::key).filter(a -> a.getAssetId().equals(assetId)) + .collect(MoreCollectors.onlyElement()); + + return api.getResult(resultRef.url()); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java b/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java index ba40879c5e..0c5d14a700 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ManagedQuery.java @@ -22,16 +22,14 @@ import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; -import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; -import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteQuery; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; -import com.bakdata.conquery.models.query.results.ShardResult; +import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.util.QueryUtils; -import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.OptBoolean; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.Setter; import lombok.ToString; @@ -42,7 +40,8 @@ @ToString(callSuper = true) @Slf4j @CPSType(base = ManagedExecution.class, id = "MANAGED_QUERY") -public class ManagedQuery extends ManagedExecution implements SingleTableResult, InternalExecution { +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class ManagedQuery extends ManagedExecution implements SingleTableResult, InternalExecution { // Needs to be resolved externally before being executed private Query query; @@ -55,27 +54,23 @@ public class ManagedQuery extends ManagedExecution implements SingleTableResult, private transient List columnDescriptions; - protected ManagedQuery(@JacksonInject(useInput = OptBoolean.FALSE) MetaStorage storage) { - super(storage); - } - public ManagedQuery(Query query, User owner, Dataset submittedDataset, MetaStorage storage) { super(owner, submittedDataset, storage); this.query = query; } @Override - protected void doInitExecutable() { - query.resolve(new QueryResolveContext(getNamespace(), getConfig(), getStorage(), null)); + protected void doInitExecutable(Namespace namespace) { + query.resolve(new QueryResolveContext(getNamespace(), getConfig(), getMetaStorage(), null)); } @Override - public void finish(ExecutionState executionState) { + public void finish(ExecutionState executionState, ExecutionManager executionManager) { //TODO this is not optimal with SQLExecutionService as this might fully evaluate the query. lastResultCount = query.countResults(streamResults(OptionalLong.empty())); - super.finish(executionState); + super.finish(executionState, executionManager); } @@ -101,9 +96,9 @@ public long resultRowCount() { } @Override - public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status) { + public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus status, Namespace namespace) { - super.setStatusBase(subject, status); + super.setStatusBase(subject, status, namespace); status.setNumberOfResults(getLastResultCount()); Query query = getQuery(); @@ -114,7 +109,8 @@ public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus sta } } - protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject, FullExecutionStatus status) { + @Override + protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject, FullExecutionStatus status, Namespace namespace) { if (columnDescriptions == null) { columnDescriptions = generateColumnDescriptions(isInitialized(), getConfig()); } @@ -127,8 +123,8 @@ public List getResultInfos(PrintSettings printSettings) { } @Override - public void reset() { - super.reset(); + public void reset(ExecutionManager executionManager) { + super.reset(executionManager); getNamespace().getExecutionManager().clearQueryResults(this); } @@ -156,11 +152,6 @@ protected String makeDefaultLabel(PrintSettings cfg) { return QueryUtils.makeQueryLabel(query, cfg, getId()); } - @Override - public WorkerMessage createExecutionMessage() { - return new ExecuteQuery(getId(), getQuery()); - } - @Override public void visit(Consumer visitor) { visitor.accept(this); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java index af4403f700..8ba4b4bed9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewExecution.java @@ -29,8 +29,6 @@ import com.bakdata.conquery.models.forms.util.Resolution; import com.bakdata.conquery.models.i18n.I18n; import com.bakdata.conquery.models.identifiable.ids.specific.SelectId; -import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; -import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteForm; import com.bakdata.conquery.models.query.ColumnDescriptor; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.query.PrintSettings; @@ -42,14 +40,15 @@ import com.bakdata.conquery.models.query.results.MultilineEntityResult; import com.bakdata.conquery.models.types.ResultType; import com.bakdata.conquery.models.types.SemanticType; -import com.fasterxml.jackson.annotation.JacksonInject; +import com.bakdata.conquery.models.worker.Namespace; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.OptBoolean; import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.DecimalNode; import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.MoreCollectors; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import lombok.ToString; import org.jetbrains.annotations.NotNull; @@ -59,23 +58,18 @@ */ @CPSType(id = "ENTITY_PREVIEW_EXECUTION", base = ManagedExecution.class) @ToString +@NoArgsConstructor(access = AccessLevel.PROTECTED) public class EntityPreviewExecution extends ManagedInternalForm { @ToString.Exclude private PreviewConfig previewConfig; - protected EntityPreviewExecution(@JacksonInject(useInput = OptBoolean.FALSE) MetaStorage storage) { - super(storage); - } - public EntityPreviewExecution(EntityPreviewForm entityPreviewQuery, User user, Dataset submittedDataset, MetaStorage storage) { super(entityPreviewQuery, user, submittedDataset, storage); } /** * Query contains both YEARS and QUARTERS lines: Group them. - * - * @return */ private static Map> getQuarterLines(EntityResult entityResult) { final Map> quarterLines = new HashMap<>(); @@ -99,8 +93,6 @@ private static Map> getQuarterLines(EntityResult /** * Query contains both YEARS and QUARTERS lines: Group them. - * - * @return */ private static Map getYearLines(EntityResult entityResult) { @@ -223,8 +215,8 @@ public boolean isSystem() { } @Override - public void doInitExecutable() { - super.doInitExecutable(); + public void doInitExecutable(Namespace namespace) { + super.doInitExecutable(namespace); previewConfig = getNamespace().getPreviewConfig(); } @@ -234,12 +226,12 @@ public void doInitExecutable() { * Most importantly to {@link EntityPreviewStatus#setInfos(List)} to for infos of entity. */ @Override - public FullExecutionStatus buildStatusFull(Subject subject) { + public FullExecutionStatus buildStatusFull(Subject subject, Namespace namespace) { initExecutable(getNamespace(), getConfig()); final EntityPreviewStatus status = new EntityPreviewStatus(); - setStatusFull(status, subject); + setStatusFull(status, subject, namespace); status.setQuery(getValuesQuery().getQuery()); @@ -396,17 +388,10 @@ private ManagedQuery getValuesQuery() { } @Override - protected void setAdditionalFieldsForStatusWithSource(Subject subject, FullExecutionStatus status) { + protected void setAdditionalFieldsForStatusWithSource(Subject subject, FullExecutionStatus status, Namespace namespace) { status.setColumnDescriptions(generateColumnDescriptions(isInitialized(), getConfig())); } - @Override - public WorkerMessage createExecutionMessage() { - return new ExecuteForm(getId(), getFlatSubQueries().entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getQuery()))); - } - @Override public List getResultInfos(PrintSettings printSettings) { return getValuesQuery().getResultInfos(printSettings); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java index 3ac2552767..fae63122e6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/preview/EntityPreviewForm.java @@ -8,6 +8,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; +import jakarta.validation.Valid; import com.bakdata.conquery.apiv1.forms.Form; import com.bakdata.conquery.apiv1.forms.InternalForm; @@ -39,8 +40,6 @@ import com.bakdata.conquery.models.worker.DatasetRegistry; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ClassToInstanceMap; -import jakarta.validation.Valid; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -161,7 +160,7 @@ public Map createSubQueries() { } @Override - public void authorize(Subject subject, Dataset submittedDataset, @NonNull ClassToInstanceMap visitors, MetaStorage storage) { + public void authorize(Subject subject, Dataset submittedDataset, @NonNull List visitors, MetaStorage storage) { QueryDescription.authorizeQuery(this, subject, submittedDataset, visitors, storage); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java index a07989bb0b..84bcdee3f7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java @@ -25,14 +25,16 @@ public FormShardResult(ManagedExecutionId formId, ManagedExecutionId subQueryId, /** * Distribute the result to a sub query. - * - * @param executionManager */ @Override public void addResult(DistributedExecutionManager executionManager) { final ManagedInternalForm managedInternalForm = (ManagedInternalForm) executionManager.getExecution(getFormId()); final ManagedQuery subQuery = managedInternalForm.getSubQuery(getQueryId()); + if (subQuery == null) { + throw new IllegalStateException("Subquery %s did not belong to form %s. Known subqueries: %s".formatted(getQueryId(), formId, managedInternalForm.getSubQueries())); + } + executionManager.handleQueryResult(this, subQuery); @@ -41,12 +43,13 @@ public void addResult(DistributedExecutionManager executionManager) { managedInternalForm.fail( getError().orElseThrow( () -> new IllegalStateException(String.format("Query[%s] failed but no error was set.", subQuery.getId())) - ) + ), + executionManager ); } if (managedInternalForm.allSubQueriesDone()) { - managedInternalForm.finish(ExecutionState.DONE); + managedInternalForm.finish(ExecutionState.DONE, executionManager); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java index 871e9eced7..58d00e6580 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java @@ -15,7 +15,6 @@ import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.FilterSearch; import com.bakdata.conquery.sql.DSLContextWrapper; -import com.bakdata.conquery.sql.execution.SqlExecutionResult; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -30,7 +29,7 @@ public class LocalNamespace extends Namespace { public LocalNamespace( ObjectMapper preprocessMapper, NamespaceStorage storage, - ExecutionManager executionManager, + ExecutionManager executionManager, DSLContextWrapper dslContextWrapper, SqlStorageHandler storageHandler, JobManager jobManager, diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java index 73b16aada8..d2ad40c0c4 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java @@ -38,7 +38,7 @@ public abstract class Namespace extends IdResolveContext { @ToString.Include private final NamespaceStorage storage; - private final ExecutionManager executionManager; + private final ExecutionManager executionManager; // TODO: 01.07.2020 FK: This is not used a lot, as NamespacedMessages are highly convoluted and hard to decouple as is. private final JobManager jobManager; @@ -129,7 +129,6 @@ final void updateFilterSearch() { * This collects the string values of the given {@link Column}s (each is a {@link com.bakdata.conquery.models.datasets.concepts.Searchable}) * and registers them in the namespace's {@link FilterSearch#registerValues(Searchable, Collection)}. * After value registration for a column is complete, {@link FilterSearch#shrinkSearch(Searchable)} should be called. - * */ abstract void registerColumnValuesInSearch(Set columns); diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java index db5cbbc540..322af14f23 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/AdminResource.java @@ -9,6 +9,17 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.UUID; +import jakarta.inject.Inject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; import com.bakdata.conquery.io.jersey.ExtraMimeTypes; @@ -18,20 +29,10 @@ import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.jobs.JobManagerStatus; import com.bakdata.conquery.models.messages.network.specific.CancelJobMessage; +import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.models.worker.ShardNodeInformation; import com.bakdata.conquery.resources.admin.ui.AdminUIResource; import io.dropwizard.auth.Auth; -import jakarta.inject.Inject; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.PathParam; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; -import jakarta.ws.rs.core.UriBuilder; import lombok.RequiredArgsConstructor; @Consumes({ExtraMimeTypes.JSON_STRING, ExtraMimeTypes.SMILE_STRING}) @@ -108,13 +109,14 @@ public FullExecutionStatus[] getQueries(@Auth Subject currentUser, @QueryParam(" .filter(t -> t.getCreationTime().toLocalDate().isAfter(since) || t.getCreationTime().toLocalDate().isEqual(since)) .limit(limit) .map(t -> { + Namespace namespace = processor.getDatasetRegistry().get(t.getDataset().getId()); try { - return t.buildStatusFull(currentUser); + return t.buildStatusFull(currentUser, namespace); } catch (ConqueryError e) { // Initialization of execution probably failed, so we construct a status based on the overview status final FullExecutionStatus fullExecutionStatus = new FullExecutionStatus(); - t.setStatusBase(currentUser, fullExecutionStatus); + t.setStatusBase(currentUser, fullExecutionStatus, namespace); fullExecutionStatus.setStatus(ExecutionState.FAILED); fullExecutionStatus.setError(e); return fullExecutionStatus; diff --git a/backend/src/main/java/com/bakdata/conquery/resources/api/QueryResource.java b/backend/src/main/java/com/bakdata/conquery/resources/api/QueryResource.java index 599c06500b..facc8705ee 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/api/QueryResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/api/QueryResource.java @@ -4,19 +4,6 @@ import static com.bakdata.conquery.resources.ResourceConstants.QUERY; import java.util.concurrent.TimeUnit; - -import com.bakdata.conquery.apiv1.AdditionalMediaTypes; -import com.bakdata.conquery.apiv1.MetaDataPatch; -import com.bakdata.conquery.apiv1.QueryProcessor; -import com.bakdata.conquery.apiv1.RequestAwareUriBuilder; -import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; -import com.bakdata.conquery.models.auth.entities.Subject; -import com.bakdata.conquery.models.auth.permissions.Ability; -import com.bakdata.conquery.models.execution.ExecutionState; -import com.bakdata.conquery.models.execution.ManagedExecution; -import com.bakdata.conquery.models.query.SingleTableResult; -import io.dropwizard.auth.Auth; -import io.dropwizard.jersey.PATCH; import jakarta.inject.Inject; import jakarta.servlet.http.HttpServletRequest; import jakarta.ws.rs.BadRequestException; @@ -31,6 +18,19 @@ import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.Response; + +import com.bakdata.conquery.apiv1.AdditionalMediaTypes; +import com.bakdata.conquery.apiv1.MetaDataPatch; +import com.bakdata.conquery.apiv1.QueryProcessor; +import com.bakdata.conquery.apiv1.RequestAwareUriBuilder; +import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; +import com.bakdata.conquery.models.auth.entities.Subject; +import com.bakdata.conquery.models.auth.permissions.Ability; +import com.bakdata.conquery.models.execution.ExecutionState; +import com.bakdata.conquery.models.execution.ManagedExecution; +import com.bakdata.conquery.models.query.SingleTableResult; +import io.dropwizard.auth.Auth; +import io.dropwizard.jersey.PATCH; import lombok.RequiredArgsConstructor; @Path("queries") @@ -52,7 +52,7 @@ public FullExecutionStatus getStatus(@Auth Subject subject, @PathParam(QUERY) Ma subject.authorize(query.getDataset(), Ability.READ); subject.authorize(query, Ability.READ); - query.awaitDone(1, TimeUnit.SECONDS); + processor.awaitDone(query, 1, TimeUnit.SECONDS); return processor.getQueryFullStatus(query, subject, RequestAwareUriBuilder.fromRequest(servletRequest), allProviders); } @@ -68,7 +68,7 @@ public Response getDescription(@Auth Subject subject, @PathParam(QUERY) ManagedE subject.authorize(query.getDataset(), Ability.READ); subject.authorize(query, Ability.READ); - if(query.awaitDone(1, TimeUnit.SECONDS) != ExecutionState.DONE){ + if (processor.awaitDone(query, 1, TimeUnit.SECONDS) != ExecutionState.DONE) { return Response.status(Response.Status.CONFLICT.getStatusCode(), "Query is still running.").build(); // Request was submitted too early. } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/api/ResultExternalResource.java b/backend/src/main/java/com/bakdata/conquery/resources/api/ResultExternalResource.java index 93732d3e1b..7736ee2ffb 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/api/ResultExternalResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/api/ResultExternalResource.java @@ -6,13 +6,6 @@ import java.net.URI; import java.net.URISyntaxException; - -import com.bakdata.conquery.io.result.ExternalResult; -import com.bakdata.conquery.io.result.external.ExternalResultProcessor; -import com.bakdata.conquery.models.auth.entities.Subject; -import com.bakdata.conquery.models.execution.ManagedExecution; -import com.bakdata.conquery.resources.ResourceConstants; -import io.dropwizard.auth.Auth; import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.HeaderParam; @@ -21,6 +14,12 @@ import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.UriBuilder; + +import com.bakdata.conquery.io.result.external.ExternalResultProcessor; +import com.bakdata.conquery.models.auth.entities.Subject; +import com.bakdata.conquery.models.forms.managed.ExternalExecution; +import com.bakdata.conquery.resources.ResourceConstants; +import io.dropwizard.auth.Auth; import lombok.extern.slf4j.Slf4j; @Path("result/external") @@ -32,7 +31,7 @@ public class ResultExternalResource { private ExternalResultProcessor processor; - public static URI getDownloadURL(UriBuilder uriBuilder, E exec, String filename) + public static URI getDownloadURL(UriBuilder uriBuilder, ExternalExecution exec, String filename) throws URISyntaxException { return uriBuilder .path(ResultExternalResource.class) @@ -57,12 +56,12 @@ public static URI getDownloadURL(U @Path("{" + QUERY + "}/{" + FILENAME + "}") public Response download( @Auth Subject subject, - @PathParam(QUERY) ManagedExecution execution, + @PathParam(QUERY) ExternalExecution execution, @PathParam(FILENAME) String fileName, @HeaderParam("user-agent") String userAgent, @QueryParam("charset") String queryCharset ) { - log.info("Result download for {} on dataset {} by user {} ({}).", execution, execution.getDataset().getId(), subject.getId(), subject.getName()); + log.info("Result download for {} on dataset {} by user {} ({}).", execution, execution.getDataset(), subject.getId(), subject.getName()); return processor.getResult(subject, execution, fileName); } } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java index 571ceb2d5c..9e405705e8 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java @@ -6,7 +6,6 @@ import java.util.concurrent.ConcurrentMap; import com.bakdata.conquery.io.storage.MetaStorage; -import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.error.ConqueryError; import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; @@ -15,15 +14,14 @@ import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.ManagedQuery; -import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.sql.conversion.SqlConverter; import com.bakdata.conquery.sql.conversion.model.SqlQuery; -import com.bakdata.conquery.sql.execution.SqlExecutionResult; import com.bakdata.conquery.sql.execution.SqlExecutionService; +import com.bakdata.conquery.sql.execution.SqlExecutionState; import lombok.extern.slf4j.Slf4j; @Slf4j -public class SqlExecutionManager extends ExecutionManager { +public class SqlExecutionManager extends ExecutionManager { private final SqlExecutionService executionService; private final SqlConverter converter; @@ -37,17 +35,23 @@ public SqlExecutionManager(SqlConverter sqlConverter, SqlExecutionService sqlExe } @Override - protected void doExecute(Namespace namespace, InternalExecution execution) { + protected void doExecute(E execution) { + + addState(execution.getId(), new SqlExecutionState()); if (execution instanceof ManagedQuery managedQuery) { - CompletableFuture sqlQueryExecution = executeAsync(managedQuery); + CompletableFuture sqlQueryExecution = executeAsync(managedQuery, this); runningExecutions.put(managedQuery.getId(), sqlQueryExecution); return; } if (execution instanceof ManagedInternalForm managedForm) { - CompletableFuture.allOf(managedForm.getSubQueries().values().stream().map(this::executeAsync).toArray(CompletableFuture[]::new)) - .thenRun(() -> managedForm.finish(ExecutionState.DONE)); + CompletableFuture.allOf(managedForm.getSubQueries().values().stream().map(managedQuery -> { + addState(managedQuery.getId(), new SqlExecutionState()); + return executeAsync(managedQuery, this); + + }).toArray(CompletableFuture[]::new)) + .thenRun(() -> managedForm.finish(ExecutionState.DONE, this)); return; } @@ -55,9 +59,9 @@ protected void doExecute(Namespace namespace, InternalExecution execution) { } @Override - public void cancelQuery(Dataset dataset, ManagedExecution query) { + public void doCancelQuery(ManagedExecution execution) { - CompletableFuture sqlQueryExecution = runningExecutions.remove(query.getId()); + CompletableFuture sqlQueryExecution = runningExecutions.remove(execution.getId()); // already finished/canceled if (sqlQueryExecution == null) { @@ -68,22 +72,29 @@ public void cancelQuery(Dataset dataset, ManagedExecution query) { sqlQueryExecution.cancel(true); } - query.cancel(); + execution.cancel(); } - private CompletableFuture executeAsync(ManagedQuery managedQuery) { + private CompletableFuture executeAsync(ManagedQuery managedQuery, SqlExecutionManager executionManager) { SqlQuery sqlQuery = converter.convert(managedQuery.getQuery(), managedQuery.getNamespace()); - return CompletableFuture.supplyAsync(() -> executionService.execute(sqlQuery)) .thenAccept(result -> { - addResult(managedQuery, result); + ManagedExecutionId id = managedQuery.getId(); + + // We need to transfer the columns and data from the query result together with the execution lock to a new result + SqlExecutionState startResult = getResult(id); + SqlExecutionState + finishResult = + new SqlExecutionState(result.getColumnNames(), result.getTable(), startResult.getExecutingLock()); + addState(id, finishResult); + managedQuery.setLastResultCount(((long) result.getRowCount())); - managedQuery.finish(ExecutionState.DONE); - runningExecutions.remove(managedQuery.getId()); + managedQuery.finish(ExecutionState.DONE, executionManager); + runningExecutions.remove(id); }) .exceptionally(e -> { - managedQuery.fail(new ConqueryError.SqlError(e)); + managedQuery.fail(ConqueryError.asConqueryError(e), this); runningExecutions.remove(managedQuery.getId()); return null; }); diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java index 815c0d47d3..026d440907 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionService.java @@ -7,6 +7,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -38,14 +39,14 @@ public class SqlExecutionService { private final ResultSetProcessor resultSetProcessor; - public SqlExecutionResult execute(SqlQuery sqlQuery) { + public SqlExecutionState execute(SqlQuery sqlQuery) { - final SqlExecutionResult result = dslContext.connectionResult(connection -> createStatementAndExecute(sqlQuery, connection)); + final SqlExecutionState result = dslContext.connectionResult(connection -> createStatementAndExecute(sqlQuery, connection)); return result; } - private SqlExecutionResult createStatementAndExecute(SqlQuery sqlQuery, Connection connection) { + private SqlExecutionState createStatementAndExecute(SqlQuery sqlQuery, Connection connection) { final String sqlString = sqlQuery.getSql(); final List resultTypes = sqlQuery.getResultInfos().stream().map(ResultInfo::getType).collect(Collectors.toList()); @@ -58,7 +59,7 @@ private SqlExecutionResult createStatementAndExecute(SqlQuery sqlQuery, Connecti final List columnNames = getColumnNames(resultSet, columnCount); final List resultTable = createResultTable(resultSet, resultTypes, columnCount); - return new SqlExecutionResult(columnNames, resultTable); + return new SqlExecutionState(columnNames, resultTable, new CountDownLatch(1)); } // not all DB vendors throw SQLExceptions catch (SQLException | RuntimeException e) { diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionResult.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionState.java similarity index 53% rename from backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionResult.java rename to backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionState.java index fa9a313918..ee130c3b47 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionResult.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/SqlExecutionState.java @@ -1,6 +1,7 @@ package com.bakdata.conquery.sql.execution; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.stream.Stream; import com.bakdata.conquery.models.query.ExecutionManager; @@ -8,15 +9,24 @@ import lombok.Value; @Value -public class SqlExecutionResult implements ExecutionManager.Result { +public class SqlExecutionState implements ExecutionManager.InternalState { List columnNames; List table; int rowCount; + CountDownLatch executingLock; - public SqlExecutionResult(List columnNames, List table) { + public SqlExecutionState() { + this.columnNames = null; + this.table = null; + this.executingLock = new CountDownLatch(1); + rowCount = 0; + } + + public SqlExecutionState(List columnNames, List table, CountDownLatch executingLock) { this.columnNames = columnNames; this.table = table; + this.executingLock = executingLock; rowCount = table.size(); } diff --git a/backend/src/main/java/com/bakdata/conquery/util/QueryUtils.java b/backend/src/main/java/com/bakdata/conquery/util/QueryUtils.java index c14f53eaf9..901394f33a 100644 --- a/backend/src/main/java/com/bakdata/conquery/util/QueryUtils.java +++ b/backend/src/main/java/com/bakdata/conquery/util/QueryUtils.java @@ -7,7 +7,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -43,7 +42,7 @@ import com.bakdata.conquery.models.query.queryplan.aggregators.Aggregator; import com.bakdata.conquery.models.query.visitor.QueryVisitor; import com.google.common.base.Strings; -import com.google.common.collect.ClassToInstanceMap; +import com.google.common.collect.MoreCollectors; import lombok.Getter; import lombok.NonNull; import lombok.experimental.UtilityClass; @@ -60,12 +59,12 @@ public class QueryUtils { public static Consumer getNoOpEntryPoint() { return (whatever) -> {}; } - + /** * Gets the specified visitor from the map. If none was found an exception is raised. */ - public static T getVisitor(ClassToInstanceMap visitors, Class clazz){ - return Objects.requireNonNull(visitors.getInstance(clazz),String.format("Among the visitor that traversed the query no %s could be found", clazz)); + public static T getVisitor(List visitors, Class clazz){ + return (T) visitors.stream().filter(clazz::isInstance).collect(MoreCollectors.onlyElement()); } public static String createDefaultMultiLabel(List elements, String delimiter, Locale locale) { diff --git a/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java b/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java index 10b3307d7d..0b5c173c20 100644 --- a/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java +++ b/backend/src/test/java/com/bakdata/conquery/api/StoredQueriesProcessorTest.java @@ -10,6 +10,8 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; +import jakarta.validation.Validator; +import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.QueryProcessor; import com.bakdata.conquery.apiv1.execution.ExecutionStatus; @@ -51,8 +53,6 @@ import com.google.common.collect.ImmutableList; import io.dropwizard.core.setup.Environment; import io.dropwizard.jersey.validation.Validators; -import jakarta.validation.Validator; -import jakarta.ws.rs.core.UriBuilder; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -129,7 +129,7 @@ public static void beforeAll() { @Test public void getQueriesFiltered() { - List infos = processor.getQueriesFiltered(DATASET_0, URI_BUILDER, USERS[0], queries, true) + List infos = processor.getQueriesFiltered(DATASET_0.getId(), URI_BUILDER, USERS[0], queries, true) .collect(Collectors.toList()); assertThat(infos) @@ -156,8 +156,8 @@ private static User mockUser(int id, List allowedQueryIds) { } - private static ManagedForm mockManagedForm(User user, ManagedExecutionId id, ExecutionState execState, final Dataset dataset){ - return new ManagedInternalForm(new ExportForm(), user, dataset, STORAGE) { + private static ManagedForm mockManagedForm(User user, ManagedExecutionId id, ExecutionState execState, final Dataset dataset) { + return new ManagedInternalForm<>(new ExportForm(), user, dataset, STORAGE) { { setState(execState); setCreationTime(LocalDateTime.MIN); diff --git a/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java b/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java index f20effb7f5..83d96953ad 100644 --- a/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java +++ b/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java @@ -2,8 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.net.URL; import java.time.ZoneId; @@ -12,7 +11,6 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Stream; - import jakarta.validation.Validator; import com.bakdata.conquery.apiv1.FormConfigPatch; @@ -20,6 +18,7 @@ import com.bakdata.conquery.apiv1.forms.export_form.AbsoluteMode; import com.bakdata.conquery.apiv1.forms.export_form.ExportForm; import com.bakdata.conquery.apiv1.forms.export_form.RelativeMode; +import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.io.jackson.MutableInjectableValues; @@ -52,15 +51,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; -import io.dropwizard.jersey.validation.Validators; import io.dropwizard.core.setup.Environment; +import io.dropwizard.jersey.validation.Validators; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; -import org.mockito.Mockito; /** @@ -70,17 +68,15 @@ @TestInstance(Lifecycle.PER_CLASS) public class FormConfigTest { - private ConqueryConfig config = new ConqueryConfig(); + private final ConqueryConfig config = new ConqueryConfig(); private MetaStorage storage; - private DatasetRegistry namespacesMock; private FormConfigProcessor processor; - private AuthorizationController controller; private Validator validator = Validators.newValidatorFactory().getValidator(); - private Dataset dataset = new Dataset("test"); - private Dataset dataset1 = new Dataset("test1"); + private final Dataset dataset = new Dataset("test"); + private final Dataset dataset1 = new Dataset("test1"); private DatasetId datasetId; private DatasetId datasetId1; private ExportForm form; @@ -94,7 +90,7 @@ public void setupTestClass() throws Exception { datasetId1 = dataset1.getId(); // Mock DatasetRegistry for translation - namespacesMock = Mockito.mock(DatasetRegistry.class); + DatasetRegistry namespacesMock = mock(DatasetRegistry.class); doAnswer(invocation -> { throw new UnsupportedOperationException("Not yet implemented"); @@ -102,7 +98,7 @@ public void setupTestClass() throws Exception { doAnswer(invocation -> { final DatasetId id = invocation.getArgument(0); - Namespace namespaceMock = Mockito.mock(LocalNamespace.class); + Namespace namespaceMock = mock(LocalNamespace.class); if (id.equals(datasetId)) { when(namespaceMock.getDataset()).thenReturn(dataset); } @@ -123,14 +119,18 @@ else if (id.equals(datasetId1)) { ((MutableInjectableValues) FormConfigProcessor.getMAPPER().getInjectableValues()) .add(IdResolveContext.class, namespacesMock); processor = new FormConfigProcessor(validator, storage, namespacesMock); - controller = new AuthorizationController(storage, config, new Environment(this.getClass().getSimpleName()), null); + AuthorizationController controller = new AuthorizationController(storage, config, new Environment(this.getClass().getSimpleName()), null); controller.start(); } @BeforeEach public void setupTest() { - final ManagedQuery managedQuery = new ManagedQuery(null, null, dataset, null); + + user = new User("test", "test", storage); + storage.addUser(user); + + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), user, dataset, null); managedQuery.setQueryId(UUID.randomUUID()); form = new ExportForm(); @@ -138,10 +138,6 @@ public void setupTest() { form.setTimeMode(mode); form.setQueryGroupId(managedQuery.getId()); mode.setForm(form); - - - user = new User("test", "test", storage); - storage.addUser(user); } @AfterEach @@ -161,7 +157,7 @@ public void addConfigWithoutTranslation() { processor.addConfig(user, dataset, formConfig); - assertThat(storage.getAllFormConfigs()).containsExactly(formConfig.intern(user, dataset)); + assertThat(storage.getAllFormConfigs()).containsExactly(formConfig.intern(user, dataset.getId())); } @Test @@ -171,7 +167,7 @@ public void deleteConfig() { ObjectMapper mapper = FormConfigProcessor.getMAPPER(); FormConfig formConfig = new FormConfig(form.getClass().getAnnotation(CPSType.class).id(), mapper.valueToTree(form)); - formConfig.setDataset(dataset); + formConfig.setDataset(dataset.getId()); user.addPermission(formConfig.createPermission(AbilitySets.FORM_CONFIG_CREATOR)); storage.addFormConfig(formConfig); @@ -193,7 +189,7 @@ public void getConfig() { ObjectMapper mapper = FormConfigProcessor.getMAPPER(); JsonNode values = mapper.valueToTree(form); FormConfig formConfig = new FormConfig(form.getClass().getAnnotation(CPSType.class).id(), values); - formConfig.setDataset(dataset); + formConfig.setDataset(dataset.getId()); formConfig.setOwner(user); user.addPermission(formConfig.createPermission(Ability.READ.asSet())); storage.addFormConfig(formConfig); @@ -337,7 +333,7 @@ public void patchConfig() { // CHECK PART 1 FormConfig patchedFormExpected = new FormConfig(form.getClass().getAnnotation(CPSType.class).id(), values); - patchedFormExpected.setDataset(dataset); + patchedFormExpected.setDataset(dataset.getId()); patchedFormExpected.setFormId(config.getFormId()); patchedFormExpected.setLabel("newTestLabel"); patchedFormExpected.setShared(true); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java index ef12549871..fe452dc4b3 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java @@ -16,6 +16,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.Invocation; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; import com.bakdata.conquery.ConqueryConstants; import com.bakdata.conquery.apiv1.query.ConceptQuery; @@ -48,10 +52,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.univocity.parsers.csv.CsvParser; -import jakarta.ws.rs.client.Entity; -import jakarta.ws.rs.client.Invocation; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; import lombok.NonNull; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -73,8 +73,8 @@ public static void importPreviousQueries(StandaloneSupport support, RequiredData ConceptQuery query = new ConceptQuery(new CQExternal(Arrays.asList("ID", "DATE_SET"), data, false)); - ExecutionManager executionManager = support.getNamespace().getExecutionManager(); - ManagedExecution managed = executionManager.createQuery(query, queryId, user, support.getNamespace().getDataset(), false); + ExecutionManager executionManager = support.getNamespace().getExecutionManager(); + ManagedExecution managed = executionManager.createExecution(query, queryId, user, support.getNamespace(), false); user.addPermission(managed.createPermission(AbilitySets.QUERY_CREATOR)); @@ -88,8 +88,8 @@ public static void importPreviousQueries(StandaloneSupport support, RequiredData Query query = ConqueryTestSpec.parseSubTree(support, queryNode, Query.class); UUID queryId = new UUID(0L, id++); - ExecutionManager executionManager = support.getNamespace().getExecutionManager(); - ManagedExecution managed = executionManager.createQuery(query, queryId, user, support.getNamespace().getDataset(), false); + ExecutionManager executionManager = support.getNamespace().getExecutionManager(); + ManagedExecution managed = executionManager.createExecution(query, queryId, user, support.getNamespace(), false); user.addPermission(ExecutionPermission.onInstance(AbilitySets.QUERY_CREATOR, managed.getId())); @@ -247,7 +247,7 @@ public static void uploadConcept(StandaloneSupport support, Dataset dataset, Con } - private static List> getConcepts(StandaloneSupport support, ArrayNode rawConcepts) throws JSONException, IOException { + private static List> getConcepts(StandaloneSupport support, ArrayNode rawConcepts) throws IOException { return ConqueryTestSpec.parseSubTreeList( support, rawConcepts, @@ -257,7 +257,7 @@ private static List> getConcepts(StandaloneSupport support, ArrayNode } public static void updateConcepts(StandaloneSupport support, ArrayNode rawConcepts, @NonNull Response.Status.Family expectedResponseFamily) - throws JSONException, IOException { + throws IOException { List> concepts = getConcepts(support, rawConcepts); for (Concept concept : concepts) { updateConcept(support, concept, expectedResponseFamily); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java b/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java index cfa9728123..8f7602777c 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/json/FormTest.java @@ -11,17 +11,13 @@ import java.util.Map; import java.util.OptionalLong; import java.util.concurrent.TimeUnit; - import com.bakdata.conquery.apiv1.forms.Form; -import com.bakdata.conquery.integration.common.LoadingUtil; import com.bakdata.conquery.integration.common.RequiredData; import com.bakdata.conquery.integration.common.ResourceFile; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.io.result.csv.CsvRenderer; import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ConqueryConfig; -import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.exceptions.JSONException; import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.forms.managed.ManagedForm; @@ -100,10 +96,10 @@ public void executeTest(StandaloneSupport support) throws Exception { ManagedInternalForm managedForm = (ManagedInternalForm) support .getNamespace() .getExecutionManager() - .runQuery(namespace, form, support.getTestUser(), support.getDataset(), support.getConfig(), false); + .runQuery(namespace, form, support.getTestUser(), support.getConfig(), false); - managedForm.awaitDone(10, TimeUnit.MINUTES); - if (managedForm.getState() != ExecutionState.DONE) { + ExecutionState executionState = namespace.getExecutionManager().awaitDone(managedForm, 10, TimeUnit.MINUTES); + if (executionState != ExecutionState.DONE) { if (managedForm.getState() == ExecutionState.FAILED) { fail(getLabel() + " Query failed"); } @@ -177,7 +173,7 @@ private void checkMultipleResult(Map> managedMapping, * * @see FormTest#checkMultipleResult(Map, ConqueryConfig, PrintSettings) */ - private void checkSingleResult(F managedForm, ConqueryConfig config, PrintSettings printSettings) + private & SingleTableResult> void checkSingleResult(F managedForm, ConqueryConfig config, PrintSettings printSettings) throws IOException { @@ -205,25 +201,6 @@ private void checkSingleResult(F man } - private static void importConcepts(StandaloneSupport support, ArrayNode rawConcepts) throws JSONException, IOException { - if (rawConcepts == null) { - return; - } - - Dataset dataset = support.getDataset(); - - List> concepts = parseSubTreeList( - support, - rawConcepts, - Concept.class, - c -> c.setDataset(support.getDataset()) - ); - - for (Concept concept : concepts) { - LoadingUtil.uploadConcept(support, dataset, concept); - } - } - private Form parseForm(StandaloneSupport support) throws JSONException, IOException { return parseSubTree(support, rawForm, Form.class); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java index e6cacdd25c..f3fab1bae2 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/ExternalFormBackendTest.java @@ -10,13 +10,14 @@ import java.time.ZonedDateTime; import java.util.Collections; import java.util.List; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.UriBuilder; import com.bakdata.conquery.apiv1.execution.FullExecutionStatus; import com.bakdata.conquery.apiv1.execution.ResultAsset; import com.bakdata.conquery.apiv1.frontend.FrontendConfiguration; import com.bakdata.conquery.apiv1.frontend.VersionContainer; import com.bakdata.conquery.integration.common.IntegrationUtils; -import com.bakdata.conquery.io.result.ExternalResult; import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.FormBackendConfig; @@ -26,14 +27,13 @@ import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.forms.frontendconfiguration.FormScanner; +import com.bakdata.conquery.models.forms.managed.ExternalExecution; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.resources.api.ConfigResource; import com.bakdata.conquery.resources.api.ResultExternalResource; import com.bakdata.conquery.resources.hierarchies.HierarchyHelper; import com.bakdata.conquery.util.support.StandaloneSupport; import com.bakdata.conquery.util.support.TestConquery; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.UriBuilder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; @@ -97,19 +97,19 @@ public void execute(String name, TestConquery testConquery) throws Exception { // Generate asset urls and check them in the status final ManagedExecution storedExecution = testConquery.getSupport(name).getMetaStorage().getExecution(managedExecutionId); final URI - downloadURLasset1 = - ResultExternalResource.getDownloadURL(apiUriBuilder.clone(), (ManagedExecution & ExternalResult) storedExecution, executionStatus.getResultUrls() - .get(0) - .getAssetId()); + downloadUrlAsset1 = + ResultExternalResource.getDownloadURL(apiUriBuilder.clone(), (ExternalExecution) storedExecution, executionStatus.getResultUrls() + .get(0) + .getAssetId()); final URI - downloadURLasset2 = - ResultExternalResource.getDownloadURL(apiUriBuilder.clone(), (ManagedExecution & ExternalResult) storedExecution, executionStatus.getResultUrls() + downloadUrlAsset2 = + ResultExternalResource.getDownloadURL(apiUriBuilder.clone(), (ExternalExecution) storedExecution, executionStatus.getResultUrls() .get(1) .getAssetId()); assertThat(executionStatus.getStatus()).isEqualTo(ExecutionState.DONE); - assertThat(executionStatus.getResultUrls()).containsExactly(new ResultAsset("Result", downloadURLasset1), new ResultAsset("Another Result", downloadURLasset2)); + assertThat(executionStatus.getResultUrls()).containsExactly(new ResultAsset("Result", downloadUrlAsset1), new ResultAsset("Another Result", downloadUrlAsset2)); log.info("Download Result"); final String diff --git a/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java b/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java index 793beca635..baabe0f0be 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java +++ b/backend/src/test/java/com/bakdata/conquery/io/result/ResultTestUtil.java @@ -1,5 +1,7 @@ package com.bakdata.conquery.io.result; +import static org.mockito.Mockito.mock; + import java.util.Collections; import java.util.List; import java.util.Locale; @@ -8,7 +10,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; +import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Dataset; @@ -58,7 +62,7 @@ public class ResultTestUtil { @NotNull public static ManagedQuery getTestQuery() { - return new ManagedQuery(null, null, null, null) { + return new ManagedQuery(mock(Query.class), mock(User.class), new Dataset(ResultTestUtil.class.getSimpleName()), null) { @Override public List getResultInfos(PrintSettings printSettings) { return getResultTypes().stream() diff --git a/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java b/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java index 58d1b710f4..9cda4e839d 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java @@ -3,6 +3,7 @@ import static com.bakdata.conquery.io.result.ResultTestUtil.getResultTypes; import static com.bakdata.conquery.io.result.ResultTestUtil.getTestEntityResults; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -17,10 +18,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; import com.bakdata.conquery.io.result.ResultTestUtil; +import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.ExcelConfig; +import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.i18n.I18n; import com.bakdata.conquery.models.identifiable.mapping.EntityPrintId; import com.bakdata.conquery.models.query.ManagedQuery; @@ -66,7 +70,7 @@ void writeAndRead() throws IOException { // The Shard nodes send Object[] but since Jackson is used for deserialization, nested collections are always a list because they are not further specialized List results = getTestEntityResults(); - ManagedQuery mquery = new ManagedQuery(null, null, null, null) { + ManagedQuery mquery = new ManagedQuery(mock(Query.class), mock(User.class), new Dataset(ExcelResultRenderTest.class.getSimpleName()), null) { public List getResultInfos(PrintSettings printSettings) { return getResultTypes().stream() .map(ResultTestUtil.TypedSelectDummy::new) @@ -112,7 +116,6 @@ private List readComputed(InputStream inputStream, PrintSettings setting XSSFSheet sheet = workbook.getSheetAt(0); List computed = new ArrayList<>(); - int i = 0; for (Row row : sheet) { StringJoiner sj = new StringJoiner("\t"); DataFormatter formatter = new DataFormatter(settings.getLocale()); @@ -128,7 +131,6 @@ private List readComputed(InputStream inputStream, PrintSettings setting sj.add(formatted); } computed.add(sj.toString()); - i++; } return computed; } @@ -165,9 +167,7 @@ private void joinValue(StringJoiner valueJoiner, Object val, ResultInfo info) { String printVal = info.printNullable(val); if (info.getType().equals(ResultType.Primitive.BOOLEAN)) { - /** - * Even though we set the locale to GERMAN, poi's {@link DataFormatter#formatCellValue(Cell)} hardcoded english booleans - */ + // Even though we set the locale to GERMAN, poi's {@link DataFormatter#formatCellValue(Cell)} hardcoded english booleans printVal = (Boolean) val ? "TRUE" : "FALSE"; } diff --git a/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java b/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java index 96b4c394e9..069cdcb456 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStoreDumpTest.java @@ -1,12 +1,15 @@ package com.bakdata.conquery.io.storage.xodus.stores; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.io.File; import java.io.IOException; import java.util.concurrent.Executors; +import jakarta.validation.Validator; import com.bakdata.conquery.apiv1.query.ConceptQuery; +import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.apiv1.query.QueryDescription; import com.bakdata.conquery.apiv1.query.concept.specific.CQReusedQuery; import com.bakdata.conquery.io.jackson.Jackson; @@ -22,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Files; import io.dropwizard.jersey.validation.Validators; -import jakarta.validation.Validator; import jetbrains.exodus.env.Environment; import jetbrains.exodus.env.Environments; import lombok.extern.slf4j.Slf4j; @@ -42,7 +44,7 @@ public class SerializingStoreDumpTest { private ObjectMapper objectMapper; // Test data - private final ManagedQuery managedQuery = new ManagedQuery(null, null, new Dataset("dataset"), STORAGE); + private final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), new Dataset("dataset"), STORAGE); private final ConceptQuery cQuery = new ConceptQuery( new CQReusedQuery(managedQuery.getId())); private final User user = new User("username", "userlabel", STORAGE); diff --git a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java index f812282567..680712e45a 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java @@ -323,7 +323,7 @@ public void formConfig() throws JSONException, IOException { ObjectMapper mapper = FormConfigProcessor.getMAPPER(); JsonNode values = mapper.valueToTree(form); FormConfig formConfig = new FormConfig(form.getClass().getAnnotation(CPSType.class).id(), values); - formConfig.setDataset(dataset); + formConfig.setDataset(dataset.getId()); SerializationTestUtil .forType(FormConfig.class) diff --git a/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java b/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java index f2b1bece25..71c2be7212 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java +++ b/backend/src/test/java/com/bakdata/conquery/models/execution/DefaultLabelTest.java @@ -3,6 +3,7 @@ import static com.bakdata.conquery.models.execution.ManagedExecution.AUTO_LABEL_SUFFIX; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import java.time.LocalDateTime; import java.util.List; @@ -11,6 +12,7 @@ import com.bakdata.conquery.apiv1.forms.export_form.ExportForm; import com.bakdata.conquery.apiv1.query.ConceptQuery; +import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.apiv1.query.concept.specific.CQAnd; import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; import com.bakdata.conquery.apiv1.query.concept.specific.CQReusedQuery; @@ -31,13 +33,12 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.mockito.Mockito; public class DefaultLabelTest { private final static MetaStorage STORAGE = new NonPersistentStoreFactory().createMetaStorage(); - private static final Namespace NAMESPACE = Mockito.mock(LocalNamespace.class); + private static final Namespace NAMESPACE = mock(LocalNamespace.class); private static final Dataset DATASET = new Dataset("dataset"); private static final User user = new User("user","user", STORAGE); @@ -126,7 +127,7 @@ void autoLabelConceptQueryFallback(Locale locale, String autoLabel) { void autoLabelReusedQuery(Locale locale, String autoLabel) { I18n.LOCALE.set(locale); - final ManagedQuery managedQuery = new ManagedQuery(null, null, DATASET, STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE); managedQuery.setQueryId(UUID.randomUUID()); CQReusedQuery reused = new CQReusedQuery(managedQuery.getId()); @@ -166,7 +167,7 @@ void autoLabelUploadQuery(Locale locale, String autoLabel) { void autoLabelComplexQuery(Locale locale, String autoLabel) { I18n.LOCALE.set(locale); - final ManagedQuery managedQuery = new ManagedQuery(null, null, DATASET, STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE); managedQuery.setQueryId(UUID.randomUUID()); CQAnd and = new CQAnd(); @@ -199,7 +200,7 @@ void autoLabelComplexQuery(Locale locale, String autoLabel) { void autoLabelComplexQueryNullLabels(Locale locale, String autoLabel) { I18n.LOCALE.set(locale); - final ManagedQuery managedQuery = new ManagedQuery(null, null, DATASET, STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(mock(Query.class), mock(User.class), DATASET, STORAGE); managedQuery.setQueryId(UUID.randomUUID()); CQAnd and = new CQAnd(); diff --git a/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java b/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java index 301a2597dc..39202011ea 100644 --- a/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java +++ b/backend/src/test/java/com/bakdata/conquery/tasks/PermissionCleanupTaskTest.java @@ -3,6 +3,7 @@ import static com.bakdata.conquery.tasks.PermissionCleanupTask.deletePermissionsOfOwnedInstances; import static com.bakdata.conquery.tasks.PermissionCleanupTask.deleteQueryPermissionsWithMissingRef; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.time.Instant; import java.time.LocalDateTime; @@ -41,7 +42,7 @@ private ManagedQuery createManagedQuery() { ConceptQuery query = new ConceptQuery(root); - final ManagedQuery managedQuery = new ManagedQuery(query, null, new Dataset("test"), STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(query, mock(User.class), new Dataset("test"), STORAGE); managedQuery.setCreationTime(LocalDateTime.now().minusDays(1)); diff --git a/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java b/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java index c8d9a47a17..34707104d6 100644 --- a/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java +++ b/backend/src/test/java/com/bakdata/conquery/tasks/QueryCleanupTaskTest.java @@ -1,6 +1,7 @@ package com.bakdata.conquery.tasks; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.time.Duration; import java.time.LocalDateTime; @@ -13,6 +14,7 @@ import com.bakdata.conquery.apiv1.query.concept.specific.CQAnd; import com.bakdata.conquery.apiv1.query.concept.specific.CQReusedQuery; import com.bakdata.conquery.io.storage.MetaStorage; +import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.util.NonPersistentStoreFactory; @@ -34,7 +36,7 @@ private ManagedQuery createManagedQuery() { ConceptQuery query = new ConceptQuery(root); - final ManagedQuery managedQuery = new ManagedQuery(query, null, new Dataset("test"), STORAGE); + final ManagedQuery managedQuery = new ManagedQuery(query, mock(User.class), new Dataset("test"), STORAGE); managedQuery.setCreationTime(LocalDateTime.now().minus(queryExpiration).minusDays(1));