Skip to content

Commit

Permalink
Merge pull request #601 from scireum/feature/ymo/SIRI-915-iterate
Browse files Browse the repository at this point in the history
Use streamBlockwise for cascadeDelete
  • Loading branch information
ymo-sci authored Dec 13, 2023
2 parents 4786811 + 67cb68a commit e942a75
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 60 deletions.
2 changes: 1 addition & 1 deletion src/main/java/sirius/db/es/ElasticQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ private void close() {

@Override
public void delete(@Nullable Consumer<E> entityCallback) {
iterateAll(entity -> {
streamBlockwise().forEach(entity -> {
if (entityCallback != null) {
entityCallback.accept(entity);
}
Expand Down
49 changes: 8 additions & 41 deletions src/main/java/sirius/db/jdbc/SmartQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import sirius.kernel.commons.Monoflop;
import sirius.kernel.commons.PullBasedSpliterator;
import sirius.kernel.commons.Strings;
import sirius.kernel.commons.Timeout;
import sirius.kernel.commons.Tuple;
import sirius.kernel.commons.Value;
import sirius.kernel.commons.Watch;
import sirius.kernel.di.std.ConfigValue;
import sirius.kernel.di.std.Part;
import sirius.kernel.health.Exceptions;
import sirius.kernel.health.HandledException;
Expand All @@ -39,7 +37,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -50,7 +47,6 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand All @@ -64,9 +60,6 @@
*/
public class SmartQuery<E extends SQLEntity> extends Query<SmartQuery<E>, E, SQLConstraint> {

@ConfigValue("jdbc.queryIterateTimeout")
private static Duration queryIterateTimeout;

@Part
private static OMA oma;

Expand Down Expand Up @@ -254,41 +247,15 @@ public boolean exists() {
return copy().fields(SQLEntity.ID).first().isPresent();
}

/**
* Deletes all matches using the {@link OMA#delete(BaseEntity)}.
* <p>
* Note that for very large result sets, we perform a blockwise strategy. We therefore iterate over
* the results until the timeout ({@link #queryIterateTimeout} is reached). In this case, we abort the
* iteration, execute the query again and continue deleting until all entities are gone.
*
* @param entityCallback a callback to be invoked for each entity to be deleted
*/
@Override
public void delete(@Nullable Consumer<E> entityCallback) {
if (forceFail) {
return;
}
AtomicBoolean continueDeleting = new AtomicBoolean(true);
TaskContext taskContext = TaskContext.get();
while (continueDeleting.get() && taskContext.isActive()) {
continueDeleting.set(false);
Timeout timeout = new Timeout(queryIterateTimeout);
iterate(entity -> {
if (entityCallback != null) {
entityCallback.accept(entity);
}
oma.delete(entity);
if (timeout.isReached()) {
// Timeout has been reached, set the flag so that another delete query is attempted....
continueDeleting.set(true);
// and abort processing the results of this query...
return false;
} else {
// Timeout not yet reached, continue deleting...
return true;
}
});
}
streamBlockwise().forEach(entity -> {
if (entityCallback != null) {
entityCallback.accept(entity);
}

oma.delete(entity);
});
}

@Override
Expand Down Expand Up @@ -427,7 +394,7 @@ private BaseEntity<?> findParent(Mapping mapping, BaseEntity<?> entity) {
/**
* Creates a sql constraint for sorting purposes.
* </p>
* In MySQL/MariaDB, NULL is considered as a 'missing, unkonwn value'. Any arithmetic comparison with NULL
* In MySQL/MariaDB, NULL is considered as a 'missing, unknown value'. Any arithmetic comparison with NULL
* returns false e.g. NULL != 'any' returns false.
* Therefore, comparisons with NULL values must be treated specially.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void parseValues(Object e, Values values) {
/**
* Returns the {@link EntityDescriptor} of the referenced entity.
*
* @return the referenced entity drescriptor
* @return the referenced entity descriptor
*/
public EntityDescriptor getReferencedDescriptor() {
if (referencedDescriptor == null) {
Expand Down Expand Up @@ -241,7 +241,8 @@ protected void onDeleteSetNull(Object e) {
referenceInstance.getMapper()
.select(referenceInstance.getClass())
.eq(nameAsMapping, idBeingDeleted)
.iterateAll(other -> cascadeSetNull(taskContext, idBeingDeleted, other));
.streamBlockwise()
.forEach(other -> cascadeSetNull(taskContext, idBeingDeleted, other));
}
}

Expand All @@ -264,7 +265,8 @@ protected void onDeleteCascade(Object e) {
referenceInstance.getMapper()
.select(referenceInstance.getClass())
.eq(nameAsMapping, ((BaseEntity<?>) e).getId())
.iterateAll(other -> cascadeDelete(taskContext, other));
.streamBlockwise()
.forEach(other -> cascadeDelete(taskContext, other));
}

private void cascadeDelete(TaskContext taskContext, BaseEntity<?> other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Class<? extends BaseEntity<?>> getReferencedType() {
/**
* Returns the {@link EntityDescriptor} of the referenced entity.
*
* @return the referenced entity drescriptor
* @return the referenced entity descriptor
*/
public EntityDescriptor getReferencedDescriptor() {
if (referencedDescriptor == null) {
Expand Down Expand Up @@ -253,7 +253,7 @@ protected static void ensureLabelsArePresent(Property property,
EntityDescriptor referencedDescriptor,
BaseEntityRef.OnDelete deleteHandler) {
// If a cascade delete handler is present and the referenced entity is not explicitly marked as
// "non complex" and we're within the IDE or running as a test, we force the system to compute / lookup
// "non-complex" and we're within the IDE or running as a test, we force the system to compute / lookup
// the associated NLS keys which might be required to generated appropriate deletion logs or rejection
// errors (otherwise this might be missed while developing or testing the system).
if ((referencedDescriptor.getAnnotation(ComplexDelete.class).map(ComplexDelete::value).orElse(true)
Expand Down Expand Up @@ -286,7 +286,8 @@ protected void onDeleteSetNull(Object e) {
referenceInstance.getMapper()
.select(referenceInstance.getClass())
.eq(nameAsMapping, ((BaseEntity<?>) e).getId())
.iterateAll(other -> cascadeSetNull(taskContext, other));
.streamBlockwise()
.forEach(other -> cascadeSetNull(taskContext, other));
}

private void cascadeSetNull(TaskContext taskContext, BaseEntity<?> other) {
Expand All @@ -309,7 +310,8 @@ protected void onDeleteCascade(Object e) {
referenceInstance.getMapper()
.select(referenceInstance.getClass())
.eq(nameAsMapping, ((BaseEntity<?>) e).getId())
.iterateAll(other -> cascadeDelete(taskContext, other));
.streamBlockwise()
.forEach(other -> cascadeDelete(taskContext, other));
}

private void cascadeDelete(TaskContext taskContext, BaseEntity<?> other) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/sirius/db/mixing/query/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ protected Query(EntityDescriptor descriptor) {
}

/**
* Applies the given contraints to the query.
* Applies the given constraints to the query.
*
* @param constraint the constraint which has to be fullfilled
* @param constraint the constraint which has to be fulfilled
* @return the query itself for fluent method calls
*/
public abstract Q where(C constraint);
Expand Down Expand Up @@ -131,6 +131,7 @@ public Q queryString(String query, QueryField... fields) {
* Deletes all matches using the {@link BaseMapper#delete(BaseEntity)} of the appropriate mapper.
* <p>
* Be aware that this might be slow for very large result sets.
* Attention: Some implementations (especially mongo) does not support deleting via streamBlockwise with a sort order.
*
* @param entityCallback a callback to be invoked for each entity to be deleted
*/
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/sirius/db/mongo/MongoQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public MongoQuery<E> orderDesc(Mapping field) {
* Adds a limit to the query.
*
* @param skip the number of items to skip (used for pagination).
* @param limit the max. number of items to return (exluding those who have been skipped).
* @param limit the max. number of items to return (excluding those who have been skipped).
* @return the builder itself for fluent method calls
*/
public MongoQuery<E> limit(int skip, int limit) {
Expand Down Expand Up @@ -282,7 +282,7 @@ public List<E> randomList() {
}

/**
* Aggregates the documents in the result of the given query with an sum operator.
* Aggregates the documents in the result of the given query with a sum operator.
* <p>
* Note that limits are ignored for this query.
*
Expand All @@ -294,7 +294,7 @@ public Value aggregateSum(@Nonnull Mapping field) {
}

/**
* Aggregates the documents in the result of the given query with an an average operator.
* Aggregates the documents in the result of the given query with an average operator.
* <p>
* Note that limits are ignored for this query.
*
Expand Down Expand Up @@ -331,7 +331,7 @@ public Value aggregateMin(@Nonnull Mapping field) {

@Override
public void delete(@Nullable Consumer<E> entityCallback) {
iterateAll(entity -> {
streamBlockwise().forEach(entity -> {
if (entityCallback != null) {
entityCallback.accept(entity);
}
Expand Down
3 changes: 0 additions & 3 deletions src/main/resources/component-060-db.conf
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ jdbc {
# Every connection which lasts longer will be logged to "db-slow" on level INFO
logConnectionThreshold = 30 seconds

# The default timeout at which a running query will get interrupted and restarted at the current position
queryIterateTimeout = 15 minutes

# A profile provides a template for database connections.
# Each value of the profile serves as backup or default value for the one in the database secion.
# Also a profile value can reference properties defined in one of both sections like this: ${name}.
Expand Down
2 changes: 0 additions & 2 deletions src/test/resources/test.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
docker.file = ["src/test/resources/docker-db.yml"]

jdbc {
# a very aggressive timeout finds more potential problems
queryIterateTimeout = 1s

database {
test {
Expand Down

0 comments on commit e942a75

Please sign in to comment.