Skip to content

Commit

Permalink
Merge pull request #3500 from ingef/feature/calculate-column-position…
Browse files Browse the repository at this point in the history
…-in-table

Trigger calculate column position from table
  • Loading branch information
thoniTUB authored Aug 7, 2024
2 parents d163e0a + d4fc041 commit 91f8f38
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
* @JsonDeserialize(converter = Initializing.Converter.class )
* }
* </pre>
* @param <T>
* @param <T> the class to be initialized
*
* @implNote cannot be used on classes with a back reference. In this case make the reference managing class
* initializable and let it initialize its children.
*/
public interface Initializing<T extends Initializing<T>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class MetaStorage extends ConqueryStorage implements Injectable {
private IdentifiableStore<Group> authGroup;

public void openStores(ObjectMapper mapper) {

authUser = storageFactory.createUserStore(centralRegistry, "meta", this, mapper);
authRole = storageFactory.createRoleStore(centralRegistry, "meta", this, mapper);
authGroup = storageFactory.createGroupStore(centralRegistry, "meta", this, mapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ private void decorateDatasetStore(SingletonStore<Dataset> store) {
private void decorateTableStore(IdentifiableStore<Table> store) {
store.onAdd(table -> {
for (Column column : table.getColumns()) {
column.init();
getCentralRegistry().register(column);
}
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,18 @@
package com.bakdata.conquery.io.storage.xodus.stores;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.*;
import java.nio.file.Files;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.JacksonUtil;
Expand All @@ -43,14 +29,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.validation.Validator;
import jetbrains.exodus.ArrayByteIterable;
import jetbrains.exodus.ByteIterable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.ToString;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
Expand Down Expand Up @@ -460,7 +441,7 @@ private ByteIterable handle(StoreEntryConsumer<KEY, VALUE> consumer, IterationSt
}

/**
* Deserializes the gives serial value (either a key or a value of an store entry) to a concrete object. If that fails the entry-value is dumped if configured so to a file using the entry-key for the filename.
* Deserializes the given serial value (either a key or a value of a store entry) to a concrete object. If that fails the entry-value is dumped if configured so to a file using the entry-key for the filename.
*
* @param <TYPE> The deserialized object type.
* @param serial The to be deserialized object (key or value of an entry)
Expand All @@ -476,7 +457,7 @@ private <TYPE> TYPE getDeserializedAndDumpFailed(ByteIterable serial, Function<B
}
catch (Exception e) {
// With trace also print the stacktrace
log.warn(onFailWarnMsgFmt, onFailKeyStringSupplier.get(), log.isTraceEnabled() ? e : null);
log.warn(onFailWarnMsgFmt + "(enable TRACE for exception logging)", onFailKeyStringSupplier.get(), log.isTraceEnabled() ? e : null);

if (shouldDumpUnreadables()) {
dumpToFile(onFailOrigValue.getBytesUnsafe(), onFailKeyStringSupplier.get(), e, unreadableValuesDumpDir, store.getName(), objectMapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import org.jetbrains.annotations.TestOnly;

@CPSType(id = "DATASET", base = StringPermissionBuilder.class)
public class DatasetPermission extends StringPermissionBuilder {
Expand Down Expand Up @@ -38,7 +39,7 @@ public static ConqueryPermission onInstance(Set<Ability> abilities, DatasetId in
return INSTANCE.instancePermission(abilities, instance);
}

@Deprecated
@TestOnly
public static ConqueryPermission onInstance(Ability ability, DatasetId instance) {
return INSTANCE.instancePermission(ability, instance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import org.jetbrains.annotations.TestOnly;

@CPSType(id = "EXECUTION", base = StringPermissionBuilder.class)
public class ExecutionPermission extends StringPermissionBuilder {
Expand Down Expand Up @@ -43,7 +44,7 @@ public Set<Ability> getAllowedAbilities() {
}

//// Helper functions
@Deprecated
@TestOnly
public static ConqueryPermission onInstance(Ability ability, ManagedExecutionId instance) {
return INSTANCE.instancePermission(ability, instance);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.bakdata.conquery.models.datasets;

import javax.annotation.Nullable;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.apiv1.frontend.FrontendValue;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
Expand All @@ -13,7 +14,6 @@
import com.bakdata.conquery.util.search.TrieSearch;
import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -43,8 +43,12 @@ public class Column extends Labeled<ColumnId> implements NamespacedIdentifiable<
private boolean generateSuffixes;
private boolean searchDisabled = false;

/**
* @implNote the position is not stored in the {@link Table} as a mapping because the frequent map lookups could
* mean a large processing overhead.
*/
@JsonIgnore
private int position = -1;
private int position = UNKNOWN_POSITION;

/**
* if this is set this column counts as the secondary id of the given name for this
Expand Down Expand Up @@ -81,4 +85,5 @@ public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config) {
public void init() {
position = ArrayUtils.indexOf(getTable().getColumns(), this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,46 @@
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;

import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.io.jackson.Initializing;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.io.storage.NamespacedStorage;
import com.bakdata.conquery.models.config.DatabaseConfig;
import com.bakdata.conquery.models.identifiable.Labeled;
import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
import com.fasterxml.jackson.annotation.OptBoolean;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.dropwizard.validation.ValidationMethod;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.TestOnly;

@Getter
@Setter
@Slf4j
public class Table extends Labeled<TableId> implements NamespacedIdentifiable<TableId> {
@JsonDeserialize(converter = Table.Initializer.class)
@EqualsAndHashCode(callSuper = true)
public class Table extends Labeled<TableId> implements NamespacedIdentifiable<TableId>, Initializing<Table> {

// TODO: 10.01.2020 fk: register imports here?

@JsonIgnore
@JacksonInject(useInput = OptBoolean.FALSE)
@NotNull
@Setter(onMethod_ = @TestOnly)
@EqualsAndHashCode.Exclude
private NamespacedStorage storage;

@NsIdRef
private Dataset dataset;
@NotNull
Expand All @@ -37,7 +52,7 @@ public class Table extends Labeled<TableId> implements NamespacedIdentifiable<Ta
private Column[] columns = new Column[0];
/**
* Defines the primary key/column of this table. Only required for SQL mode.
* If unset {@link ...SqlConnectorConfig#primaryColumn} is assumed.
* If unset {@link DatabaseConfig#getPrimaryColumn()} is assumed.
*/
@Nullable
@JsonManagedReference
Expand Down Expand Up @@ -104,4 +119,15 @@ public Column findSecondaryIdColumn(SecondaryIdDescription secondaryId) {
return null;
}

@Override
public Table init() {
if (dataset == null) {
dataset = storage.getDataset();
}
Arrays.stream(columns).forEach(Column::init);
return this;
}

static class Initializer extends Initializing.Converter<Table> {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import com.bakdata.conquery.models.datasets.Column;
Expand All @@ -14,22 +15,19 @@
import com.bakdata.conquery.models.query.entity.Entity;
import com.bakdata.conquery.models.query.queryplan.aggregators.Aggregator;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import lombok.ToString;

@ToString(callSuper = true, onlyExplicitlyIncluded = true)
public class ConceptValuesAggregator extends Aggregator<Set<Object>> {

private final Set<Object> entries = new HashSet<>();
private final TreeConcept concept;

private Column column;

private final Map<Table, Connector> tableConnectors;

public ConceptValuesAggregator(TreeConcept concept) {
super();
this.concept = concept;
tableConnectors = concept.getConnectors().stream()
.filter(conn -> conn.getColumn() != null)
.collect(Collectors.toMap(Connector::getTable, Functions.identity()));
Expand Down Expand Up @@ -68,7 +66,11 @@ public void consumeEvent(Bucket bucket, int event) {

@Override
public Set<Object> createAggregationResult() {
return entries.isEmpty() ? null : ImmutableSet.copyOf(entries);
if (entries.isEmpty()) {
return null;
}
// sort by natural order so we have a stable result
return new TreeSet<>(entries);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.mode.InternalObjectMapperCreator;
import com.bakdata.conquery.mode.cluster.ClusterNamespaceHandler;
import com.bakdata.conquery.mode.cluster.ClusterState;
Expand All @@ -22,23 +23,29 @@
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.util.NonPersistentStoreFactory;
import com.bakdata.conquery.util.extensions.WorkerStorageExtension;
import com.codahale.metrics.SharedMetricRegistries;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.jersey.validation.Validators;
import lombok.Getter;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;

@Getter
public abstract class AbstractSerializationTest {

@RegisterExtension
private static final WorkerStorageExtension WORKER_STORAGE_EXTENSION = new WorkerStorageExtension();

private final Validator validator = Validators.newValidator();
private final ConqueryConfig config = new ConqueryConfig() {{
this.setStorage(new NonPersistentStoreFactory());
}};
private DatasetRegistry<DistributedNamespace> datasetRegistry;
private Namespace namespace;
private MetaStorage metaStorage;
private WorkerStorage workerStorage;


private ObjectMapper managerMetaInternalMapper;
Expand All @@ -57,7 +64,7 @@ public static void beforeAll() {

@BeforeEach
public void before() throws IOException {

workerStorage = WORKER_STORAGE_EXTENSION.getStorage();
metaStorage = new MetaStorage(new NonPersistentStoreFactory());
InternalObjectMapperCreator creator = new InternalObjectMapperCreator(config, metaStorage, validator);
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), "emptyDefaultLabel");
Expand All @@ -66,6 +73,7 @@ public void before() throws IOException {
creator.init(datasetRegistry);

namespace = datasetRegistry.createNamespace(new Dataset("serialization_test"), metaStorage);
workerStorage.updateDataset(namespace.getDataset());

// Prepare manager meta internal mapper
managerMetaInternalMapper = creator.createInternalObjectMapper(View.Persistence.Manager.class);
Expand All @@ -83,6 +91,7 @@ public void before() throws IOException {

when(shardNode.createInternalObjectMapper(any())).thenCallRealMethod();
shardInternalMapper = shardNode.createInternalObjectMapper(View.Persistence.Shard.class);
getWorkerStorage().injectInto(shardInternalMapper);

// Prepare api mapper with a Namespace injected (usually done by PathParamInjector)
apiMapper = Jackson.copyMapperAndInjectables(Jackson.MAPPER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.identifiable.CentralRegistry;
import com.bakdata.conquery.models.worker.SingletonNamespaceCollection;
import com.bakdata.conquery.util.NonPersistentStoreFactory;
import com.bakdata.conquery.util.extensions.MetaStorageExtension;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class IdRefrenceTest {

@RegisterExtension
private final static MetaStorageExtension META_STORAGE_EXTENSION = new MetaStorageExtension();

@Test
public void testListReferences() throws IOException {
final ObjectMapper mapper = Jackson.copyMapperAndInjectables(Jackson.MAPPER);
Expand All @@ -35,10 +39,7 @@ public void testListReferences() throws IOException {
registry.register(dataset);
registry.register(table);

final MetaStorage metaStorage = new MetaStorage(new NonPersistentStoreFactory());

metaStorage.openStores(null);

MetaStorage metaStorage = META_STORAGE_EXTENSION.getMetaStorage();

User user = new User("usermail", "userlabel", metaStorage);
metaStorage.addUser(user);
Expand Down
Loading

0 comments on commit 91f8f38

Please sign in to comment.