Skip to content

Commit

Permalink
Added reverse lookup timeout index
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Sep 14, 2024
1 parent 69cc1a1 commit 1006cd7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type VARCHAR(255),
instance VARCHAR(255),
timeout_id VARCHAR(255),
expires BIGINT,
PRIMARY KEY (type, instance, timeout_id)
PRIMARY KEY (type, instance, timeout_id),
INDEX (expires, type, instance, timeout_id)
)";
var command = new MySqlCommand(_initializeSql, conn);
await command.ExecuteNonQueryAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ instance VARCHAR(255),
timeout_id VARCHAR(255),
expires BIGINT,
PRIMARY KEY (type, instance, timeout_id)
)";
);
CREATE INDEX IF NOT EXISTS idx_{_tablePrefix}_timeouts
ON {_tablePrefix}_timeouts (expires, type, instance, timeout_id);
";
var command = new NpgsqlCommand(_initializeSql, conn);
await command.ExecuteNonQueryAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ public async Task Initialize()

_initializeSql ??= @$"
CREATE TABLE {_tablePrefix}_Timeouts (
flowType NVARCHAR(255),
flowInstance NVARCHAR(255),
FlowType NVARCHAR(255),
FlowInstance NVARCHAR(255),
TimeoutId NVARCHAR(255),
Expires BIGINT,
PRIMARY KEY (flowType, flowInstance, TimeoutId)
PRIMARY KEY (FlowType, FlowInstance, TimeoutId),
INDEX {_tablePrefix}_Timeouts_idx NONCLUSTERED (Expires, FlowType, FlowInstance, TimeoutId)
);";
var command = new SqlCommand(_initializeSql, conn);
try
Expand Down Expand Up @@ -64,23 +65,23 @@ public async Task UpsertTimeout(StoredTimeout storedTimeout, bool overwrite)
_upsertTimeoutSql ??= @$"
IF EXISTS (
SELECT * FROM {_tablePrefix}_Timeouts
WHERE flowType = @flowType AND flowInstance = @flowInstance AND TimeoutId=@TimeoutId
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND TimeoutId=@TimeoutId
)
BEGIN
UPDATE {_tablePrefix}_Timeouts
SET Expires = @Expiry
WHERE flowType = @flowType AND flowInstance = @flowInstance AND TimeoutId = @TimeoutId AND @Overwrite = 1
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND TimeoutId = @TimeoutId AND @Overwrite = 1
END
ELSE
BEGIN
INSERT INTO {_tablePrefix}_Timeouts
(flowType, flowInstance, TimeoutId, Expires)
(FlowType, FlowInstance, TimeoutId, Expires)
VALUES
(@flowType, @flowInstance, @TimeoutId, @Expiry);
(@FlowType, @FlowInstance, @TimeoutId, @Expiry);
END";
await using var command = new SqlCommand(_upsertTimeoutSql, conn);
command.Parameters.AddWithValue("@flowType", functionId.Type.Value);
command.Parameters.AddWithValue("@flowInstance", functionId.Instance.Value);
command.Parameters.AddWithValue("@FlowType", functionId.Type.Value);
command.Parameters.AddWithValue("@FlowInstance", functionId.Instance.Value);
command.Parameters.AddWithValue("@TimeoutId", timeoutId);
command.Parameters.AddWithValue("@Expiry", expiry);
command.Parameters.AddWithValue("@Overwrite", overwrite ? 1 : 2);
Expand All @@ -95,12 +96,12 @@ public async Task RemoveTimeout(FlowId flowId, string timeoutId)
_removeTimeoutSql ??= @$"
DELETE FROM {_tablePrefix}_Timeouts
WHERE
flowType = @flowType AND
flowInstance = @flowInstance AND
FlowType = @FlowType AND
FlowInstance = @FlowInstance AND
TimeoutId = @TimeoutId";
await using var command = new SqlCommand(_removeTimeoutSql, conn);
command.Parameters.AddWithValue("@flowType", flowId.Type.Value);
command.Parameters.AddWithValue("@flowInstance", flowId.Instance.Value);
command.Parameters.AddWithValue("@FlowType", flowId.Type.Value);
command.Parameters.AddWithValue("@FlowInstance", flowId.Instance.Value);
command.Parameters.AddWithValue("@TimeoutId", timeoutId);
await command.ExecuteNonQueryAsync();
}
Expand All @@ -112,11 +113,11 @@ public async Task Remove(FlowId flowId)

_removeSql ??= @$"
DELETE FROM {_tablePrefix}_Timeouts
WHERE flowType = @flowType AND flowInstance = @flowInstance";
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance";

await using var command = new SqlCommand(_removeSql, conn);
command.Parameters.AddWithValue("@flowType", flowId.Type.Value);
command.Parameters.AddWithValue("@flowInstance", flowId.Instance.Value);
command.Parameters.AddWithValue("@FlowType", flowId.Type.Value);
command.Parameters.AddWithValue("@FlowInstance", flowId.Instance.Value);
await command.ExecuteNonQueryAsync();
}

Expand All @@ -125,7 +126,7 @@ public async Task<IEnumerable<StoredTimeout>> GetTimeouts(long expiresBefore)
{
await using var conn = await CreateConnection();
_getTimeoutsExpiresBeforeSql ??= @$"
SELECT flowType, flowInstance, TimeoutId, Expires
SELECT FlowType, FlowInstance, TimeoutId, Expires
FROM {_tablePrefix}_Timeouts
WHERE Expires <= @ExpiresBefore";

Expand Down Expand Up @@ -155,11 +156,11 @@ public async Task<IEnumerable<StoredTimeout>> GetTimeouts(FlowId flowId)
_getTimeoutsSql ??= @$"
SELECT TimeoutId, Expires
FROM {_tablePrefix}_Timeouts
WHERE flowType = @flowType AND flowInstance = @flowInstance";
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance";

await using var command = new SqlCommand(_getTimeoutsSql, conn);
command.Parameters.AddWithValue("@flowType", typeId.Value);
command.Parameters.AddWithValue("@flowInstance", instanceId.Value);
command.Parameters.AddWithValue("@FlowType", typeId.Value);
command.Parameters.AddWithValue("@FlowInstance", instanceId.Value);

var storedTimeouts = new List<StoredTimeout>();
await using var reader = await command.ExecuteReaderAsync();
Expand Down

0 comments on commit 1006cd7

Please sign in to comment.