Skip to content

Commit

Permalink
Transaction test for empty collections hardened
Browse files Browse the repository at this point in the history
* The test now includes collections indirectly made empty by
  deleting/moving around resources being their children without updating
  the collection itself
* Other tests adjusted (there were many test creating empty collections
  :-) )
  • Loading branch information
zozlak committed Aug 6, 2024
1 parent b2b616a commit 1d2ea8f
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 105 deletions.
116 changes: 72 additions & 44 deletions src/acdhOeaw/arche/doorkeeper/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ static public function onTxCommit(string $method, int $txId,
$pdo->commit();
}

/**
* Stores ids of all pre-transaction parents of resources affected by the current transaction
* @var array<int>
*/
private array $parentIds;

public function __construct(private int $txId, private PDO $pdo,
private Schema $schema,
private Ontology $ontology,
Expand Down Expand Up @@ -210,6 +216,11 @@ public function checkAutoCreatedResources(): void {
*/
#[CheckAttribute]
public function checkEmptyCollections(): void {
$parentIds = $this->fetchParentIds();
$idsReplace = '';
if (count($parentIds) > 0) {
$idsReplace = 'OR r.id IN (' . substr(str_repeat('?::bigint, ', count($parentIds)), 0, -2) . ')';
}
$query = "
SELECT string_agg(DISTINCT ids, ', ')
FROM
Expand All @@ -218,18 +229,21 @@ public function checkEmptyCollections(): void {
JOIN identifiers USING (id)
WHERE
r.state = ?
AND r.transaction_id = ?
AND (r.transaction_id = ? %ids%)
AND c.property = ?
AND c.value IN (?, ?)
AND NOT EXISTS (SELECT 1 FROM relations ch WHERE ch.property = ? AND ch.target_id = r.id)
";
$param = [
ArcheTransaction::STATE_ACTIVE,
$this->txId,
RDF::RDF_TYPE,
$this->schema->classes->topCollection, $this->schema->classes->collection,
$this->schema->parent
];
$query = str_replace('%ids%', $idsReplace, $query);
$param = array_merge(
[ArcheTransaction::STATE_ACTIVE, $this->txId],
$parentIds,
[
RDF::RDF_TYPE,
$this->schema->classes->topCollection, $this->schema->classes->collection,
$this->schema->parent,
]
);
$t = microtime(true);
$query = $this->pdo->prepare($query);
$query->execute($param);
Expand All @@ -247,40 +261,11 @@ public function checkEmptyCollections(): void {
* @throws DoorkeeperException
*/
public function updateCollections(): void {
$parentIds = $this->fetchParentIds();
$this->log?->info("\t\tupdating collections affected by the transaction");

$t0 = microtime(true);
$query = $this->pdo->prepare("
SELECT id
FROM resources
WHERE transaction_id = ?
");
$query->execute([$this->txId]);
$resIds = $query->fetchAll(PDO::FETCH_COLUMN);

// find all pre-transaction parents of resources affected by the current transaction
if (count($resIds) > 0) {
$pdoOld = RC::$transaction->getPreTransactionDbHandle();
$query = "
WITH RECURSIVE t(id, n) AS (
SELECT * FROM (VALUES %ids%) t1
UNION
SELECT target_id, 1
FROM t JOIN relations USING (id)
WHERE property = ?
)
SELECT DISTINCT id FROM t WHERE n > 0
";
$query = str_replace('%ids%', substr(str_repeat('(?::bigint, 0), ', count($resIds)), 0, -2), $query);
$query = $pdoOld->prepare($query);
$query->execute(array_merge($resIds, [$this->schema->parent]));
$parentIds = $query->fetchAll(PDO::FETCH_COLUMN);
} else {
$parentIds = [];
}

$t0 = microtime(true);
// find all affected parents (gr, pp) and their children
$query = "
$query = "
CREATE TEMPORARY TABLE _resources AS
SELECT p.id AS cid, (get_relatives(p.id, ?, 999999, 0)).*
FROM (
Expand All @@ -290,14 +275,14 @@ public function updateCollections(): void {
" . (count($parentIds) > 0 ? "UNION SELECT * FROM (VALUES %ids%) pp" : "") . "
) p
";
$query = str_replace('%ids%', substr(str_repeat('(?::bigint), ', count($parentIds)), 0, -2), $query);
$param = array_merge(
$query = str_replace('%ids%', substr(str_repeat('(?::bigint), ', count($parentIds)), 0, -2), $query);
$param = array_merge(
[$this->schema->parent, $this->schema->parent, $this->txId],
$parentIds,
);
$query = $this->pdo->prepare($query);
$query = $this->pdo->prepare($query);
$query->execute($param);
$t1 = microtime(true);
$t1 = microtime(true);

// try to lock resources to be updated
// TODO - how to lock them in a way which doesn't cause a deadlock
Expand Down Expand Up @@ -485,4 +470,47 @@ private function updateCollectionAggregates(): void {
");
$query->execute([RDF::XSD_STRING]);
}

/**
* Returns ids of all pre-transaction parents of resources affected by the current transaction
*
* @return array<int>
*/
private function fetchParentIds(): array {
if (!isset($this->parentIds)) {
$this->log?->info("\t\tfinding collections affected by the transaction");
$t0 = microtime(true);
$query = $this->pdo->prepare("
SELECT id
FROM resources
WHERE transaction_id = ?
");
$query->execute([$this->txId]);
$resIds = $query->fetchAll(PDO::FETCH_COLUMN);

// find all pre-transaction parents of resources affected by the current transaction
if (count($resIds) === 0) {
$this->parentIds = [];
} else {
$pdoOld = RC::$transaction->getPreTransactionDbHandle();
$query = "
WITH RECURSIVE t(id, n) AS (
SELECT * FROM (VALUES %ids%) t1
UNION
SELECT target_id, 1
FROM t JOIN relations USING (id)
WHERE property = ?
)
SELECT DISTINCT id FROM t WHERE n > 0
";
$query = str_replace('%ids%', substr(str_repeat('(?::bigint, 0), ', count($resIds)), 0, -2), $query);
$query = $pdoOld->prepare($query);
$query->execute(array_merge($resIds, [$this->schema->parent]));
$this->parentIds = $query->fetchAll(PDO::FETCH_COLUMN);
}
$t1 = microtime(true);
$this->log?->debug("\t\t\ttiming: " . ($t1 - $t0));
}
return $this->parentIds;
}
}
Loading

0 comments on commit 1d2ea8f

Please sign in to comment.