Skip to content

Commit

Permalink
Merge pull request #3562 from ingef/fix/concurrent-mod-FilterSearch
Browse files Browse the repository at this point in the history
use ConcurrentHashMap where concurrent access is possible.
  • Loading branch information
thoniTUB authored Sep 17, 2024
2 parents d426b67 + dc6289f commit 29a48b6
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import com.bakdata.conquery.mode.cluster.ClusterConnectionShard;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.util.io.ConqueryMDC;
import io.dropwizard.core.ConfiguredBundle;
import io.dropwizard.core.setup.Environment;
Expand All @@ -33,7 +33,7 @@ public class ShardNode implements ConfiguredBundle<ConqueryConfig> {

private final String name;
@Setter
private Workers workers;
private ShardWorkers workers;
private ClusterConnectionShard clusterConnection;

public ShardNode() {
Expand All @@ -51,7 +51,7 @@ public void run(ConqueryConfig config, Environment environment) throws Exception


InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator());
workers = new Workers(
workers = new ShardWorkers(
config.getQueries().getExecutionPool(),
internalMapperFactory,
config.getCluster().getEntityBucketSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.bakdata.conquery.io.jackson.JacksonUtil;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -107,7 +107,7 @@ private MessageManager getMessageManager(IoSession session) {
@Getter @RequiredArgsConstructor
public static class MessageManager {

private final Map<UUID, ChunkedMessage.List> messages = new HashMap<>();
private final ConcurrentMap<UUID, ChunkedMessage.List> messages = new ConcurrentHashMap<>();
private UUID lastId = null;
private ChunkedMessage.List lastMessage = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.bakdata.conquery.models.messages.network.specific.RegisterWorker;
import com.bakdata.conquery.models.messages.network.specific.UpdateJobManagerStatus;
import com.bakdata.conquery.models.worker.IdResolveContext;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.models.worker.WorkerInformation;
import com.bakdata.conquery.models.worker.Workers;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.lifecycle.Managed;
Expand All @@ -49,7 +49,7 @@ public class ClusterConnectionShard implements Managed, IoHandler {

private final ConqueryConfig config;
private final Environment environment;
private final Workers workers;
private final ShardWorkers workers;
private final InternalMapperFactory internalMapperFactory;

private JobManager jobManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
package com.bakdata.conquery.mode.cluster;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import jakarta.validation.Validator;

public record InternalMapperFactory(ConqueryConfig config, Validator validator) {

public ObjectMapper createShardCommunicationMapper() {
return createInternalObjectMapper(View.InternalCommunication.class);
}

public ObjectMapper createWorkerCommunicationMapper(Workers workers) {
public ObjectMapper createWorkerCommunicationMapper(ShardWorkers workers) {
final ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class);

workers.injectInto(objectMapper);

return objectMapper;
}

public ObjectMapper createWorkerPersistenceMapper(Workers workers) {
public ObjectMapper createWorkerPersistenceMapper(ShardWorkers workers) {
final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Shard.class);

workers.injectInto(objectMapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -76,8 +76,7 @@ public void execute() throws Exception {
// Most computations are cheap but data intensive: we fork here to use as many cores as possible.
final ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1);

final HashMap<Searchable, TrieSearch<FrontendValue>> searchCache = new HashMap<>();
final Map<Searchable, TrieSearch<FrontendValue>> synchronizedResult = Collections.synchronizedMap(searchCache);
final Map<Searchable, TrieSearch<FrontendValue>> searchCache = new ConcurrentHashMap<>();

log.debug("Found {} searchable Objects.", collectedSearchables.values().stream().mapToLong(Set::size).sum());

Expand All @@ -95,7 +94,7 @@ public void execute() throws Exception {
try {
final TrieSearch<FrontendValue> search = searchable.createTrieSearch(indexConfig);

synchronizedResult.put(searchable, search);
searchCache.put(searchable, search);

log.debug(
"DONE collecting {} entries for `{}`, within {}",
Expand Down Expand Up @@ -125,7 +124,7 @@ public void execute() throws Exception {
service.shutdownNow();
return;
}
log.debug("Still waiting for {} to finish.", Sets.difference(collectedSearchables.get(Searchable.class), synchronizedResult.keySet()));
log.debug("Still waiting for {} to finish.", Sets.difference(collectedSearchables.get(Searchable.class), searchCache.keySet()));
}

// Shrink searches before registering in the filter search
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.bakdata.conquery.models.messages.network;

import jakarta.validation.Validator;

import com.bakdata.conquery.commands.ManagerNode;
import com.bakdata.conquery.commands.ShardNode;
import com.bakdata.conquery.io.mina.MessageSender;
Expand All @@ -10,7 +8,8 @@
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import jakarta.validation.Validator;
import lombok.Getter;

@Getter
Expand All @@ -32,12 +31,12 @@ public boolean isConnected() {
@Getter
public static class ShardNodeNetworkContext extends NetworkMessageContext<MessageToManagerNode> {

private final Workers workers;
private final ShardWorkers workers;
private final ConqueryConfig config;
private final Validator validator;
private final NetworkSession rawSession;

public ShardNodeNetworkContext(NetworkSession session, Workers workers, ConqueryConfig config, Validator validator) {
public ShardNodeNetworkContext(NetworkSession session, ShardWorkers workers, ConqueryConfig config, Validator validator) {
super(session, config.getCluster().getBackpressure());
this.workers = workers;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.bakdata.conquery.models.query;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import c10n.C10N;
import lombok.experimental.UtilityClass;

@UtilityClass
public class C10nCache {
private static Map<Locale, Map<Class, Object>> cache = new HashMap<>();
private static ConcurrentMap<Locale, ConcurrentMap<Class, Object>> cache = new ConcurrentHashMap<>();

public <T> T getLocalized(Class<? super T> clazz, Locale locale) {
return (T) cache.computeIfAbsent(locale, (ignored) -> new HashMap<>())
return (T) cache.computeIfAbsent(locale, (ignored) -> new ConcurrentHashMap<>())
.computeIfAbsent(clazz, ignored -> C10N.get(clazz, locale));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import com.bakdata.conquery.apiv1.frontend.FrontendValue;
Expand Down Expand Up @@ -34,8 +35,8 @@ public class FilterSearch {
* In the code below, the keys of this map will usually be called "reference".
*/
@JsonIgnore
private Map<Searchable, TrieSearch<FrontendValue>> searchCache = new HashMap<>();
private Map<SelectFilter<?>, Integer> totals = new HashMap<>();
private ConcurrentMap<Searchable, TrieSearch<FrontendValue>> searchCache = new ConcurrentHashMap<>();
private ConcurrentMap<SelectFilter<?>, Integer> totals = new ConcurrentHashMap<>();

/**
* From a given {@link FrontendValue} extract all relevant keywords.
Expand Down Expand Up @@ -127,7 +128,7 @@ public void shrinkSearch(Searchable searchable) {
}

public synchronized void clearSearch() {
totals = new HashMap<>();
searchCache = new HashMap<>();
totals = new ConcurrentHashMap<>();
searchCache = new ConcurrentHashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.bakdata.conquery.models.worker;

import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.validation.Validator;

import com.bakdata.conquery.commands.ShardNode;
import com.bakdata.conquery.io.storage.WorkerStorage;
Expand All @@ -22,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.lifecycle.Managed;
import jakarta.validation.Validator;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
Expand All @@ -33,13 +32,13 @@
* Each Shard contains one {@link Worker} per {@link Dataset}.
*/
@Slf4j
public class Workers extends IdResolveContext implements Managed {
public class ShardWorkers extends IdResolveContext implements Managed {
@Getter @Setter
private AtomicInteger nextWorker = new AtomicInteger(0);
@Getter
private final ConcurrentHashMap<WorkerId, Worker> workers = new ConcurrentHashMap<>();
@JsonIgnore
private final transient Map<DatasetId, Worker> dataset2Worker = new HashMap<>();
private final transient ConcurrentMap<DatasetId, Worker> dataset2Worker = new ConcurrentHashMap<>();

/**
* Shared ExecutorService among Workers for Jobs.
Expand All @@ -54,7 +53,7 @@ public class Workers extends IdResolveContext implements Managed {
private final int secondaryIdSubPlanRetention;


public Workers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) {
public ShardWorkers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) {
this.queryThreadPoolDefinition = queryThreadPoolDefinition;

// TODO This shouldn't be coupled to the query thread pool definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static org.mockito.Mockito.mock;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
Expand All @@ -15,10 +13,11 @@
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.util.NonPersistentStoreFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.jersey.validation.Validators;
import jakarta.validation.Validator;
import lombok.Getter;
import org.junit.jupiter.api.BeforeEach;

Expand Down Expand Up @@ -62,7 +61,7 @@ public void before() {
namespaceStorage.updateDataset(new Dataset("serialization_test"));

// Prepare shard node internal mapper
final Workers workers = mock(Workers.class);
final ShardWorkers workers = mock(ShardWorkers.class);
shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(workers);

// Prepare api mapper with a Namespace injected (usually done by PathParamInjector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.bakdata.conquery.models.events.stores.specific.ScaledDecimalStore;
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.identifiable.CentralRegistry;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.dropwizard.jersey.validation.Validators;
Expand Down Expand Up @@ -62,7 +62,7 @@ public static void setupRegistry() {

// Prepare shard node internal mapper
InternalMapperFactory internalMapperFactory = new InternalMapperFactory(new ConqueryConfig(), Validators.newValidator());
shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(mock(Workers.class));
shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(mock(ShardWorkers.class));
}

@Test
Expand Down
Loading

0 comments on commit 29a48b6

Please sign in to comment.