Skip to content

Commit

Permalink
MET-6239 use transaction serialization level instead
Browse files Browse the repository at this point in the history
  • Loading branch information
jeortizquan committed Nov 18, 2024
1 parent 996591e commit 78b6f75
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ public DeBiasStateService debiasMachine(DatasetDeBiasRepository datasetDeBiasRep
RecordLogRepository recordLogRepository,
RecordDeBiasPublishable recordDeBiasPublishable,
RecordDeBiasMainRepository recordDeBiasMainRepository,
RecordDeBiasDetailRepository recordDeBiasDetailRepository,
LockRegistry lockRegistry) {
RecordDeBiasDetailRepository recordDeBiasDetailRepository) {
return new DeBiasStateServiceImpl(datasetDeBiasRepository,
datasetRepository,
recordLogRepository,
recordDeBiasPublishable,
recordDeBiasMainRepository,
recordDeBiasDetailRepository,
lockRegistry);
recordDeBiasDetailRepository);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

/**
Expand All @@ -45,8 +42,6 @@ public class DeBiasStateServiceImpl implements DeBiasStateService {
private final DatasetRepository datasetRepository;
private final RecordLogRepository recordLogRepository;
private final RecordDeBiasPublishable recordDeBiasPublishable;
private final Map<Integer, Lock> datasetIdLocksMap = new ConcurrentHashMap<>();
private final LockRegistry lockRegistry;

/**
* Instantiates a new DeBias detect service.
Expand All @@ -57,33 +52,26 @@ public class DeBiasStateServiceImpl implements DeBiasStateService {
* @param recordDeBiasPublishable the record publishable
* @param recordDeBiasMainRepository the record de bias main repository
* @param recordDeBiasDetailRepository the record de bias detail repository
* @param lockRegistry the lock registry
*/
public DeBiasStateServiceImpl(DatasetDeBiasRepository datasetDeBiasRepository,
DatasetRepository datasetRepository,
RecordLogRepository recordLogRepository,
RecordDeBiasPublishable recordDeBiasPublishable,
RecordDeBiasMainRepository recordDeBiasMainRepository,
RecordDeBiasDetailRepository recordDeBiasDetailRepository,
LockRegistry lockRegistry) {
RecordDeBiasDetailRepository recordDeBiasDetailRepository) {
this.datasetDeBiasRepository = datasetDeBiasRepository;
this.recordDeBiasMainRepository = recordDeBiasMainRepository;
this.recordDeBiasDetailRepository = recordDeBiasDetailRepository;
this.datasetRepository = datasetRepository;
this.recordLogRepository = recordLogRepository;
this.recordDeBiasPublishable = recordDeBiasPublishable;
this.lockRegistry = lockRegistry;
}

@Transactional
@Transactional(isolation = Isolation.SERIALIZABLE)
@Override
public boolean process(Integer datasetId) {
final Lock lock = datasetIdLocksMap.computeIfAbsent(datasetId, s -> lockRegistry.obtain("debiasProcess_" + datasetId));
LOGGER.info("{} {}", READY_STATE, datasetId);

try {
lock.lock();
LOGGER.info("DeBias processing: {} lock, Locked", datasetId);
DatasetDeBiasEntity dataset = getDatasetDeBiasEntity(datasetId);

processDatasetAndPublishToDeBiasReadyQueue(dataset);
Expand All @@ -92,9 +80,6 @@ public boolean process(Integer datasetId) {
} catch (RuntimeException e) {
LOGGER.warn("fail {} {}", READY_STATE, datasetId, e);
return false;
} finally {
lock.unlock();
LOGGER.info("DeBias processing: {} lock, Unlocked", datasetId);
}
return true;
}
Expand Down

0 comments on commit 78b6f75

Please sign in to comment.