Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support queuing score by id #114

Merged
merged 32 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4bfce33
add convert to schema
notbakaneko Jul 13, 2022
5cb8623
adding commands to index/delete specific score
notbakaneko Jul 13, 2022
e9b8026
figure out deserialization later
notbakaneko Jul 14, 2022
f8435d4
support looking up records by id
notbakaneko Jul 14, 2022
e334397
only need the id for deleting
notbakaneko Jul 14, 2022
0635423
consolidate to single command
notbakaneko Jul 14, 2022
56caed6
Required long arugment just puts 0 if it's missing...
notbakaneko Jul 14, 2022
fc49a8d
move the Action/ScoreId out of the score structure
notbakaneko Jul 14, 2022
12e5b7b
add command for pushing arbitrary commands to queue.
notbakaneko Jul 15, 2022
b40a6c6
sending empty payload is an error
notbakaneko Jul 15, 2022
ec9d810
helper method for organizing bulk descriptor payload
notbakaneko Jul 15, 2022
3c1450e
remove scores that don't exist via lookup
notbakaneko Jul 15, 2022
dac0f58
rely on ScoreItem.ToString()
notbakaneko Jul 15, 2022
e592b85
add some documentation
notbakaneko Jul 15, 2022
2c2840a
fix description
notbakaneko Jul 15, 2022
708546b
formatting
notbakaneko Jul 15, 2022
8579baa
unused
notbakaneko Jul 15, 2022
4118771
was missing the extra space before >_>
notbakaneko Jul 15, 2022
d7cfa68
shouldn't be sharing the buffer between tasks
notbakaneko Jul 15, 2022
4b995dc
use existing queue name
notbakaneko Jul 20, 2022
e0e829c
remove direct deletion support
notbakaneko Jul 20, 2022
25a5fc6
remove action
notbakaneko Jul 20, 2022
3e601b7
more accurate condition of indexing in readme
notbakaneko Jul 20, 2022
34af8ac
Simplify `ScoreItem` construction
peppy Jul 20, 2022
13ccc1d
Rename `IndexQueueItems` to `ProcessableItemsBuffer`
peppy Jul 20, 2022
1e8f036
Add xmldoc for `ProcessableItemsBuffer` and change deletions to use `…
peppy Jul 20, 2022
5745b00
Show deleted document count in list command
peppy Jul 20, 2022
4756205
Remove unused method
peppy Jul 20, 2022
cd149e0
Tidy up remaining flow a touch
peppy Jul 20, 2022
6887302
Merge branch 'master' into feature/queue-item-action
peppy Jul 20, 2022
0105420
handle network errors in dispatch instead of simply failing
notbakaneko Jul 21, 2022
8af1705
add convert lookup
notbakaneko Jul 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ A string value is used to indicate the current schema version to be used.

## Adding items to be indexed

Scores with `preserve`=`true` will be added to the index,
scores with `preserve`=`false` will be removed from the index.
Scores with `preserve`=`true` belonging to a user with `user_warnings`=`0` will be added to the index,
scores where any of the previous conditions are false will be removed from the index.

Push items to `osu-queue:score-index-${schema}`

Expand Down Expand Up @@ -124,6 +124,14 @@ For testing purposes, we can add fake items to the queue:

It should be noted that these items will not exist or match the ones in the database.

## Queuing a specific score for indexing

schema=${schema} dotnet run index ${id}

will queue the score with `${id}` for indexing; the score will be added or deleted as necessary, according to the value of `SoloScore.ShouldIndex`.

See [Queuing items for processing from another client](#queuing-items-for-processing-from-another-client)

## Adding existing database records to the queue

schema=1 dotnet run all
Expand All @@ -149,3 +157,30 @@ Populating an index is done by pushing score items to a queue.
where `${cmd}` is the command to run, e.g. `dotnet osu.ElasticIndexer.dll queue`

# Typical use-cases

## Queuing items for processing from another client

Push items into the Redis queue "`osu-queue:score-index-${schema}`"
e.g.

```csharp
ListLeftPush("osu-queue:score-index-1", "{ \"ScoreId\": 1 }");
```

or from redis-cli:
```
LPUSH "osu-queue:score-index-1" "{\"ScoreId\":1}"
```

### Indexing a score by `id`
```json
{ "ScoreId": 1 }
```

### Queuing a whole score

```json
{
"Score": {Solo.Score}
}
```
32 changes: 32 additions & 0 deletions osu.ElasticIndexer/Commands/IndexCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System;
using System.ComponentModel.DataAnnotations;
using System.Threading;
using McMaster.Extensions.CommandLineUtils;

namespace osu.ElasticIndexer.Commands
{
[Command("index", Description = "Queue a score for indexing by id.")]
public class IndexCommand : ProcessorCommandBase
{
[Argument(1)]
[Required]
public string ScoreId { get; set; } = string.Empty;

public int OnExecute(CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(AppSettings.Schema))
throw new MissingSchemaException();

var id = long.Parse(ScoreId);
var scoreItem = new ScoreItem { ScoreId = id };
Processor.PushToQueue(scoreItem);

Console.WriteLine(ConsoleColor.Cyan, $"Queued to {Processor.QueueName}: {scoreItem}");

return 0;
}
}
}
2 changes: 1 addition & 1 deletion osu.ElasticIndexer/Commands/ListIndicesCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public int OnExecute()
var schema = indexState.Mappings.Meta?["schema"];
var aliased = indexState.Aliases.ContainsKey(client.AliasName);

Console.WriteLine($"{record.Index} schema:{schema} aliased:{aliased} {record.Status} {record.DocsCount} {record.PrimaryStoreSize}");
Console.WriteLine($"{record.Index} schema:{schema} aliased:{aliased} {record.Status} docs {record.DocsCount} deleted {record.DocsDeleted} {record.PrimaryStoreSize}");
}
}

Expand Down
2 changes: 1 addition & 1 deletion osu.ElasticIndexer/Commands/PumpAllScoresCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public int OnExecute(CancellationToken cancellationToken)
if (Verbose)
Console.WriteLine($"Pushing {score}");

Processor.PushToQueue(new ScoreItem(score));
Processor.PushToQueue(new ScoreItem { Score = score });
}

if (!Verbose)
Expand Down
2 changes: 1 addition & 1 deletion osu.ElasticIndexer/Commands/PumpFakeScoresCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public int OnExecute(CancellationToken cancellationToken)
preserve = true
};

Processor.PushToQueue(new ScoreItem(score));
Processor.PushToQueue(new ScoreItem { Score = score });

if (counter % 1000 == 0)
Console.WriteLine($"pushed to {Processor.QueueName}, current id: {counter}");
Expand Down
31 changes: 31 additions & 0 deletions osu.ElasticIndexer/Commands/PushFileCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System.ComponentModel.DataAnnotations;
using System.IO;
using System.Threading;
using McMaster.Extensions.CommandLineUtils;

namespace osu.ElasticIndexer.Commands
{
[Command("push-file", Description = "Push contents of a file to the queue for testing.")]
public class PushFileCommand : ProcessorCommandBase
{
[Argument(0)]
[Required]
public string Filename { get; set; } = string.Empty;

public int OnExecute(CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(AppSettings.Schema))
throw new MissingSchemaException();

var value = File.ReadAllText(Filename);
var redis = new Redis();

redis.Connection.GetDatabase().ListLeftPush(Processor.QueueName, value);

return 0;
}
}
}
16 changes: 16 additions & 0 deletions osu.ElasticIndexer/ElasticModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,21 @@ public static IEnumerable<List<T>> Chunk<T>(string? where, int chunkSize = 10000

public static IEnumerable<List<T>> Chunk<T>(int chunkSize = 10000, long? resumeFrom = null) where T : ElasticModel =>
Chunk<T>(null, chunkSize, resumeFrom);

public static List<T> Find<T>(IEnumerable<long> ids) where T : ElasticModel
{
using (var dbConnection = new MySqlConnection(AppSettings.ConnectionString))
{
var attribute = typeof(T).GetCustomAttributes<ChunkOnAttribute>().First();
var cursorColumn = attribute.CursorColumn;
var selects = attribute.Query;

string query = $"select {selects} where {cursorColumn} in @ids";

dbConnection.Open();

return dbConnection.Query<T>(query, new { ids }).AsList();
}
}
}
}
78 changes: 51 additions & 27 deletions osu.ElasticIndexer/IndexQueueProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Nest;
using osu.Server.QueueProcessor;

Expand Down Expand Up @@ -38,41 +38,44 @@ internal IndexQueueProcessor(string index, Client client, Action stopCallback)

protected override void ProcessResults(IEnumerable<ScoreItem> items)
{
var add = new List<SoloScore>();
var remove = new List<SoloScore>();
var buffer = new ProcessableItemsBuffer();

// Figure out what to do with the queue item.
foreach (var item in items)
{
if (item.Score.ShouldIndex)
add.Add(item.Score);
if (item.ScoreId != null)
{
buffer.ScoreIdsForLookup.Add(item.ScoreId.Value);
}
else if (item.Score != null)
{
if (item.Score.ShouldIndex)
buffer.Additions.Add(item.Score);
else
buffer.Deletions.Add(item.Score.id);
}
else
remove.Add(item.Score);
{
Console.WriteLine(ConsoleColor.Red, "queue item missing both data and action");
}
}

var bulkDescriptor = new BulkDescriptor()
.Index(index)
.IndexMany(add)
.DeleteMany(remove);
// Handle any scores that need a lookup from the database.
performDatabaseLookup(buffer);

var response = client.ElasticClient.Bulk(bulkDescriptor);
Debug.Assert(buffer.ScoreIdsForLookup.Count == 0);
peppy marked this conversation as resolved.
Show resolved Hide resolved

handleResponse(response, items);
}

private BulkResponse dispatch(BulkDescriptor bulkDescriptor)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't supposed to be unused; apparently I managed to test it and somehow not push the change using it 👀

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to add the usage back for this then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to update the calls to it but there was an issue I came across when checking ES8 support, except I can't seem to replicate it now 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new logic means that if ES goes away, the queue processor will never exit. I don't think this should be the case? Queue processor itself already has fail and retry logic, so unless this is intended to handle actual intermittent errors during normal operation, I'd either leave the handling to that mechanism, or make it retry for a max of 1-5 seconds to ensure things don't get stuck.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrying queue items with minimal delay can easily deplete all the retries while a server restarts or network recovers; a better way may be to be able to just re-queue an item without it being marked as failed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a setting on QueueProcessor that ensures failed items are never dropped, for cases that matters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will merge as-is for now and we can revisit when this becomes an issue. Tracking at ppy/osu-queue-processor#16.

{
try
{
return client.ElasticClient.Bulk(bulkDescriptor);
}
catch (ElasticsearchClientException ex)
if (buffer.Additions.Any() || buffer.Deletions.Any())
{
// Server disappeared, maybe network failure or it's restarting; spin until it's available again.
Console.WriteLine(ConsoleColor.Red, ex.Message);
Console.WriteLine(ConsoleColor.Yellow, ex.InnerException?.Message);
waitUntilActive();
var bulkDescriptor = new BulkDescriptor()
.Index(index)
.IndexMany(buffer.Additions)
// type is needed for ids https://github.com/elastic/elasticsearch-net/issues/3500
.DeleteMany<SoloScore>(buffer.Deletions);

return dispatch(bulkDescriptor);
var response = client.ElasticClient.Bulk(bulkDescriptor);

handleResponse(response, items);
}
}

Expand All @@ -95,7 +98,7 @@ private void handleResponse(BulkResponse response, IEnumerable<ScoreItem> items)
// Elasticsearch bulk thread pool is full.
if (response.ItemsWithErrors.Any(item => item.Status == 429 || item.Error.Type == "es_rejected_execution_exception"))
{
Console.WriteLine(ConsoleColor.Yellow, $"Server returned 429, re-queued chunk with lastId {items.Last().Score.id}");
Console.WriteLine(ConsoleColor.Yellow, $"Server returned 429, re-queued chunk with lastId {items.Last()}");

foreach (var item in items)
{
Expand Down Expand Up @@ -124,6 +127,27 @@ private void handleResponse(BulkResponse response, IEnumerable<ScoreItem> items)
// TODO: per-item errors?
}

private void performDatabaseLookup(ProcessableItemsBuffer buffer)
{
if (!buffer.ScoreIdsForLookup.Any()) return;

var scores = ElasticModel.Find<SoloScore>(buffer.ScoreIdsForLookup);

foreach (var score in scores)
{
if (score.ShouldIndex)
buffer.Additions.Add(score);
else
buffer.Deletions.Add(score.id);

buffer.ScoreIdsForLookup.Remove(score.id);
}

// Remaining scores do not exist and should be deleted.
buffer.Deletions.AddRange(buffer.ScoreIdsForLookup);
buffer.ScoreIdsForLookup.Clear();
}

private void waitUntilActive()
{
// Spin until valid response from elasticsearch.
Expand Down
26 changes: 26 additions & 0 deletions osu.ElasticIndexer/ProcessableItemsBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System.Collections.Generic;

namespace osu.ElasticIndexer
{
public class ProcessableItemsBuffer
{
/// <summary>
/// A set of all score IDs which have arrived but are not yet determined to be an addition or deletion.
/// These should be processed into either <see cref="Additions"/> or <see cref="Deletions"/>.
/// </summary>
public readonly HashSet<long> ScoreIdsForLookup = new HashSet<long>();

/// <summary>
/// New scores which should be indexed.
/// </summary>
public readonly List<SoloScore> Additions = new List<SoloScore>();

/// <summary>
/// Score IDs which should be purged from the index is they are present.
/// </summary>
public readonly List<long> Deletions = new List<long>();
}
}
2 changes: 2 additions & 0 deletions osu.ElasticIndexer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ namespace osu.ElasticIndexer
[Subcommand(typeof(CloseIndexCommand))]
[Subcommand(typeof(DeleteIndexCommand))]
[Subcommand(typeof(ListIndicesCommand))]
[Subcommand(typeof(IndexCommand))]
[Subcommand(typeof(OpenIndexCommand))]
[Subcommand(typeof(PumpAllScoresCommand))]
[Subcommand(typeof(PumpFakeScoresCommand))]
[Subcommand(typeof(PushFileCommand))]
[Subcommand(typeof(SchemaVersionCommand))]
[Subcommand(typeof(UpdateAliasCommand))]
[Subcommand(typeof(WatchQueueCommand))]
Expand Down
11 changes: 5 additions & 6 deletions osu.ElasticIndexer/ScoreItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ namespace osu.ElasticIndexer
{
public class ScoreItem : QueueItem
{
public SoloScore Score { get; }
public long? ScoreId { get; init; }

public ScoreItem(SoloScore score)
{
Score = score;
}
// ScoreId is always preferred if present (this property is ignored).
// Note that this is generally not used anymore. Consider removing in the future unless a use case comes up?
public SoloScore? Score { get; init; }

public override string ToString() => $"ScoreItem id: {Score.id}";
public override string ToString() => Score != null ? $"ScoreItem Score: {Score.id}" : $"ScoreItem ScoreId: {ScoreId}";
}
}
4 changes: 4 additions & 0 deletions osu.ElasticIndexer/SoloScore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public string data
[Number(NumberType.Integer)]
public int? build_id => scoreData.build_id;

[Computed]
[Boolean]
public bool convert { get; set; }

[Computed]
[Boolean]
public bool passed => scoreData.passed;
Expand Down
3 changes: 3 additions & 0 deletions osu.ElasticIndexer/schemas/solo_scores.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"build_id": {
"type": "integer"
},
"convert": {
"type": "boolean"
},
"country_code": {
"type": "keyword"
},
Expand Down