Skip to content

Commit

Permalink
MET-6059 Implement indexing of a tombstone during record deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
stzanakis committed Aug 29, 2024
1 parent 8419eab commit 0e675b1
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,9 @@ private void publish(RdfWrapper rdf, Date recordDate, List<String> datasetIdsToR
final List<Pair<String, Date>> recordsForRedirection = performRedirection(rdf,
recordDate, datasetIdsToRedirectFrom, performRedirects);

final FullBeanImpl savedFullBean = publishToMongo(recordDate, fullBean, fullBeanPreprocessor,
final FullBeanImpl savedFullBean = publishToRecordMongo(recordDate, fullBean, fullBeanPreprocessor,
recordsForRedirection);

//Around here we need to use th new tombstone record dao.

publishToSolrFinal(rdf, savedFullBean);
}

Expand All @@ -210,7 +208,24 @@ public void publishMongo(RdfWrapper rdf, Date recordDate) throws IndexingExcepti

final TriConsumer<FullBeanImpl, FullBeanImpl, Pair<Date, Date>> fullBeanPreprocessor = providePreprocessor();

publishToMongo(recordDate, fullBean, fullBeanPreprocessor, Collections.emptyList());
publishToRecordMongo(recordDate, fullBean, fullBeanPreprocessor, Collections.emptyList());
}

/**
* Publishes an RDF only to tombstone mongo.
* @param fullBean Fullbean to publish.
* @param recordDate the data that would represent the created/updated date of a record
* @throws IndexingException which can be one of:
* <ul>
* <li>{@link IndexerRelatedIndexingException} In case an error occurred during publication.</li>
* <li>{@link SetupRelatedIndexingException} in case an error occurred during indexing setup</li>
* <li>{@link RecordRelatedIndexingException} in case an error occurred related to record
* contents</li>
* </ul>
*/
public void publishTombstone(FullBeanImpl fullBean, Date recordDate) throws IndexingException {
final TriConsumer<FullBeanImpl, FullBeanImpl, Pair<Date, Date>> fullBeanPreprocessor = providePreprocessor();
publishToTombstoneMongo(recordDate, fullBean, fullBeanPreprocessor, Collections.emptyList());
}

/**
Expand Down Expand Up @@ -283,16 +298,29 @@ private void publishToSolrFinal(RdfWrapper rdf, FullBeanImpl savedFullBean) thro
}
}

private FullBeanImpl publishToMongo(Date recordDate, FullBeanImpl fullBean,
private FullBeanImpl publishToRecordMongo(Date recordDate, FullBeanImpl fullBean,
TriConsumer<FullBeanImpl, FullBeanImpl, Pair<Date, Date>> fullBeanPreprocessor,
List<Pair<String, Date>> recordsForRedirection)
throws SetupRelatedIndexingException, IndexerRelatedIndexingException, RecordRelatedIndexingException {
return publishToMongo(recordDate, fullBean, fullBeanPreprocessor, recordsForRedirection, recordDao);
}

private FullBeanImpl publishToTombstoneMongo(Date recordDate, FullBeanImpl fullBean,
TriConsumer<FullBeanImpl, FullBeanImpl, Pair<Date, Date>> fullBeanPreprocessor,
List<Pair<String, Date>> recordsForRedirection)
throws SetupRelatedIndexingException, IndexerRelatedIndexingException, RecordRelatedIndexingException {
// Publish to Mongo
return publishToMongo(recordDate, fullBean, fullBeanPreprocessor, recordsForRedirection, tombstoneRecordDao);
}

private FullBeanImpl publishToMongo(Date recordDate, FullBeanImpl fullBean,
TriConsumer<FullBeanImpl, FullBeanImpl, Pair<Date, Date>> fullBeanPreprocessor,
List<Pair<String, Date>> recordsForRedirection, RecordDao tombstoneRecordDao)
throws SetupRelatedIndexingException, IndexerRelatedIndexingException, RecordRelatedIndexingException {
final FullBeanImpl savedFullBean;
try {
savedFullBean = new FullBeanUpdater(fullBeanPreprocessor).update(fullBean, recordDate,
recordsForRedirection.stream().map(Pair::getValue).min(Comparator.naturalOrder())
.orElse(null), recordDao);
.orElse(null), tombstoneRecordDao);
} catch (MongoIncompatibleDriverException | MongoConfigurationException | MongoSecurityException e) {
throw new SetupRelatedIndexingException(MONGO_SERVER_PUBLISH_ERROR, e);
} catch (MongoSocketException | MongoClientException | MongoInternalException | MongoInterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,31 @@
import org.apache.solr.client.solrj.util.ClientUtils;

/**
* This class provides functionality for accessing records that are already indexed from the Mongo
* and the Solr data stores. Note that this class does <b>NOT</b> contain functionality for indexing
* records.
* This class provides functionality for accessing records that are already indexed from the Mongo and the Solr data stores. Note
* that this class does <b>NOT</b> contain functionality for indexing records.
*/
public class IndexedRecordAccess {

private static final String ID_FIELD = "_id";
private static final String ABOUT_FIELD = "about";

private final RecordDao mongoServer;
private final RecordDao recordDao;
private final SolrClient solrServer;

/**
* Constructor.
*
* @param mongoServer The Mongo server connection.
* @param recordDao the mongo dao for connecting with the Mongo records database.
* @param solrServer The Solr server connection.
*/
IndexedRecordAccess(RecordDao mongoServer, SolrClient solrServer) {
this.mongoServer = mongoServer;
IndexedRecordAccess(RecordDao recordDao, SolrClient solrServer) {
this.recordDao = recordDao;
this.solrServer = solrServer;
}

/**
* Counts the records in a given dataset. The criteria of whether a record belongs to a certain
* dataset is the same as that used in the method {@link #removeDataset(String, Date)}, i.e. it is
* based on the <code>rdf:about</code> values.
* Counts the records in a given dataset. The criteria of whether a record belongs to a certain dataset is the same as that used
* in the method {@link #removeDataset(String, Date)}, i.e. it is based on the <code>rdf:about</code> values.
*
* @param datasetId The ID of the dataset of which to count the records. Is not null.
* @return The number of records encountered for the given dataset.
Expand All @@ -62,9 +60,18 @@ public long countRecords(String datasetId) {
}

/**
* Removes the record with the given rdf:about value. Also removes any associated entities (i.e.
* those entities that are always part of only one record and the removal of which can not
* invalidate references from other records):
* Get fullbean from database give the rdf about.
* @param rdfAbout the rdf about
* @return the fullbean
*/
public FullBeanImpl getFullbean(String rdfAbout) {
final Datastore datastore = recordDao.getDatastore();
return datastore.find(FullBeanImpl.class).filter(Filters.eq(ABOUT_FIELD, rdfAbout)).first();
}

/**
* Removes the record with the given rdf:about value. Also removes any associated entities (i.e. those entities that are always
* part of only one record and the removal of which can not invalidate references from other records):
* <ul>
* <li>Aggregation</li>
* <li>EuropeanaAggregation</li>
Expand All @@ -75,20 +82,20 @@ public long countRecords(String datasetId) {
* not removed.
*
* @param rdfAbout The about value of the record to remove. Is not null.
* @return Whether or not the record was removed.
* @return Whether the record was removed.
* @throws IndexerRelatedIndexingException In case something went wrong.
*/
public boolean removeRecord(String rdfAbout) throws IndexerRelatedIndexingException {
try {

// Remove Solr record
final String queryValue = ClientUtils.escapeQueryChars(rdfAbout);
solrServer.deleteByQuery(EdmLabel.EUROPEANA_ID.toString() + ":" + queryValue);
solrServer.deleteByQuery(EdmLabel.EUROPEANA_ID + ":" + queryValue);

// Obtain the Mongo record
final Datastore datastore = mongoServer.getDatastore();
final Datastore datastore = recordDao.getDatastore();
final FullBeanImpl recordToDelete = datastore.find(FullBeanImpl.class)
.filter(Filters.eq(ABOUT_FIELD, rdfAbout)).first();
.filter(Filters.eq(ABOUT_FIELD, rdfAbout)).first();

// Remove mongo record and dependencies
if (recordToDelete != null) {
Expand All @@ -111,14 +118,13 @@ public boolean removeRecord(String rdfAbout) throws IndexerRelatedIndexingExcept
* <p>Removes all records that belong to a given dataset. For details on what parts of the record
* are removed, see the documentation of {@link #removeRecord(String)}.</p>
* <p><b>NOTE</b> that the rdf:about is
* used to find the dependencies, rather than the actual references in the records. While this is
* a reasonably safe way to go for now, eventually a more generic way along the lines of {@link
* #removeRecord(String)} should be found, in which the exact composition of the rdf:about is
* taken out of the equation.</p>
* used to find the dependencies, rather than the actual references in the records. While this is a reasonably safe way to go
* for now, eventually a more generic way along the lines of {@link #removeRecord(String)} should be found, in which the exact
* composition of the rdf:about is taken out of the equation.</p>
*
* @param datasetId The ID of the dataset to clear. Is not null.
* @param maxRecordDate The cutoff date: all records that have a lower timestampUpdated than this
* date will be removed. If null is provided then all records from that dataset will be removed.
* @param maxRecordDate The cutoff date: all records that have a lower timestampUpdated than this date will be removed. If null
* is provided then all records from that dataset will be removed.
* @return The number of records that were removed.
* @throws IndexerRelatedIndexingException In case something went wrong.
*/
Expand All @@ -136,31 +142,31 @@ public long removeDataset(String datasetId, Date maxRecordDate)
}

/**
* Return all record IDs that belong to the given dataset. For implementation details see {@link
* #removeDataset(String, Date)} as the selection is to be performed analogously.
* Return all record IDs that belong to the given dataset. For implementation details see {@link #removeDataset(String, Date)}
* as the selection is to be performed analogously.
*
* @param datasetId The ID of the dataset to search. Is not null.
* @param maxRecordDate The cutoff date: all records that have a lower timestampUpdated than this
* date will be included. If null is provided then all records from that dataset are included.
* @param maxRecordDate The cutoff date: all records that have a lower timestampUpdated than this date will be included. If null
* is provided then all records from that dataset are included.
* @return The record IDs in a stream.
*/
public Stream<String> getRecordIds(String datasetId, Date maxRecordDate) {
final FindOptions findOptions = new FindOptions()
.projection().exclude(ID_FIELD)
.projection().include(ABOUT_FIELD);
.projection().exclude(ID_FIELD)
.projection().include(ABOUT_FIELD);
final Iterator<FullBeanImpl> resultIterator = createMongoQuery(datasetId, maxRecordDate)
.iterator(findOptions);
.iterator(findOptions);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, 0), false)
.map(FullBeanImpl::getAbout);
.map(FullBeanImpl::getAbout);
}

/**
* Count all records that belong to the given dataset. For implementation details see {@link
* #removeDataset(String, Date)} as the selection is to be performed analogously.
* Count all records that belong to the given dataset. For implementation details see {@link #removeDataset(String, Date)} as
* the selection is to be performed analogously.
*
* @param datasetId The ID of the dataset to search. Is not null.
* @param maxRecordDate The cutoff date: all records that have a lower timestampUpdated than this
* date will be counted. If null is provided then all records from that dataset will be counted.
* @param maxRecordDate The cutoff date: all records that have a lower timestampUpdated than this date will be counted. If null
* is provided then all records from that dataset will be counted.
* @return The record IDs in a stream.
*/
public long countRecords(String datasetId, Date maxRecordDate) {
Expand All @@ -180,19 +186,19 @@ private void removeDatasetFromSolr(String datasetId, Date maxRecordDate)
DateFormat dateFormat = new SimpleDateFormat(CommonStringValues.DATE_FORMAT_Z, Locale.US);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
solrQuery.append(" AND ").append(EdmLabel.TIMESTAMP_UPDATED).append(":[* TO ")
.append(dateFormat.format(maxRecordDate)).append('}');
.append(dateFormat.format(maxRecordDate)).append('}');
}
solrServer.deleteByQuery(solrQuery.toString());
}

private long removeDatasetFromMongo(String datasetId, Date maxRecordDate) {
return createMongoQuery(datasetId, maxRecordDate).delete(new DeleteOptions().multi(true))
.getDeletedCount();
.getDeletedCount();
}

private Query<FullBeanImpl> createMongoQuery(String datasetId, Date maxRecordDate) {
final Pattern pattern = Pattern.compile("^" + Pattern.quote(getRecordIdPrefix(datasetId)));
final Query<FullBeanImpl> query = mongoServer.getDatastore().find(FullBeanImpl.class);
final Query<FullBeanImpl> query = recordDao.getDatastore().find(FullBeanImpl.class);
query.filter(Filters.regex(ABOUT_FIELD).pattern(pattern));
if (maxRecordDate != null) {
query.filter(Filters.lt("timestampUpdated", maxRecordDate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ TierResults indexAndGetTierCalculations(InputStream recordContent,
*/
boolean remove(String rdfAbout) throws IndexingException;

/**
* Creates and indexes a tombstone record.
*
* @param rdfAbout the id of the record
* @return whether a record was tombstoned
* @throws IndexingException in case something went wrong.
*/
boolean indexTombstone(String rdfAbout) throws IndexingException;

/**
* <p>
* Removes all records that belong to a given dataset. This method also removes the associated
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package eu.europeana.indexing;

import eu.europeana.corelib.definitions.edm.entity.ChangeLog;
import eu.europeana.corelib.definitions.edm.entity.EuropeanaAggregation;
import eu.europeana.corelib.solr.bean.impl.FullBeanImpl;
import eu.europeana.corelib.solr.entity.ChangeLogImpl;
import eu.europeana.corelib.solr.entity.EuropeanaAggregationImpl;
import eu.europeana.indexing.exception.IndexerRelatedIndexingException;
import eu.europeana.indexing.exception.IndexingException;
import eu.europeana.indexing.exception.SetupRelatedIndexingException;
Expand Down Expand Up @@ -124,6 +129,34 @@ public boolean remove(String rdfAbout) throws IndexerRelatedIndexingException {
return this.connectionProvider.getIndexedRecordAccess().removeRecord(rdfAbout);
}

@Override
public boolean indexTombstone(String rdfAbout) throws IndexerRelatedIndexingException {
final FullBeanImpl publishedFullbean = this.connectionProvider.getIndexedRecordAccess().getFullbean(rdfAbout);
if (publishedFullbean != null) {
final FullBeanPublisher publisher = connectionProvider.getFullBeanPublisher(true);
prepareTombstoneFullbean();
try {
publisher.publishTombstone(publishedFullbean, publishedFullbean.getTimestampCreated());
} catch (IndexingException e) {
throw new IndexerRelatedIndexingException("Could not create tombstone record '" + rdfAbout + "'.", e);
}
}
return publishedFullbean != null;
}

//TODO: 2024-08-29 - Once tombstones are working, we need to refine the fields we need. For now unused.
private FullBeanImpl prepareTombstoneFullbean() {
final FullBeanImpl tombstoneFullbean = new FullBeanImpl();
final ChangeLog changeLog = new ChangeLogImpl();
changeLog.setType("Delete");
changeLog.setContext("http://data.europeana.eu/vocabulary/depublicationReason/noPermission");
changeLog.setEndTime(new Date());
final EuropeanaAggregation europeanaAggregation = new EuropeanaAggregationImpl();
europeanaAggregation.setChangeLog(List.of(changeLog));
tombstoneFullbean.setEuropeanaAggregation(europeanaAggregation);
return tombstoneFullbean;
}

@Override
public int removeAll(String datasetId, Date maxRecordDate)
throws IndexerRelatedIndexingException {
Expand Down

0 comments on commit 0e675b1

Please sign in to comment.