From 4bfce33ed3375266e61c5abcce35e43b634643da Mon Sep 17 00:00:00 2001 From: bakaneko Date: Wed, 13 Jul 2022 18:04:28 +0900 Subject: [PATCH 01/31] add convert to schema --- osu.ElasticIndexer/SoloScore.cs | 4 ++++ osu.ElasticIndexer/schemas/solo_scores.json | 3 +++ 2 files changed, 7 insertions(+) diff --git a/osu.ElasticIndexer/SoloScore.cs b/osu.ElasticIndexer/SoloScore.cs index 4f88d11b..7131e16a 100644 --- a/osu.ElasticIndexer/SoloScore.cs +++ b/osu.ElasticIndexer/SoloScore.cs @@ -69,6 +69,10 @@ public string data [Number(NumberType.Integer)] public int? build_id => scoreData.build_id; + [Computed] + [Boolean] + public bool convert { get; set; } + [Computed] [Boolean] public bool passed => scoreData.passed; diff --git a/osu.ElasticIndexer/schemas/solo_scores.json b/osu.ElasticIndexer/schemas/solo_scores.json index 9384f239..d4bda961 100644 --- a/osu.ElasticIndexer/schemas/solo_scores.json +++ b/osu.ElasticIndexer/schemas/solo_scores.json @@ -14,6 +14,9 @@ "build_id": { "type": "integer" }, + "convert": { + "type": "boolean" + }, "country_code": { "type": "keyword" }, From 5cb86235fdee6cad920595e931c78f434c0631ba Mon Sep 17 00:00:00 2001 From: bakaneko Date: Wed, 13 Jul 2022 20:58:32 +0900 Subject: [PATCH 02/31] adding commands to index/delete specific score --- osu.ElasticIndexer/Commands/ScoresCommand.cs | 21 +++++++++++ .../Commands/ScoresDeleteCommand.cs | 36 +++++++++++++++++++ .../Commands/ScoresIndexCommand.cs | 36 +++++++++++++++++++ osu.ElasticIndexer/Program.cs | 1 + osu.ElasticIndexer/ScoreItem.cs | 6 ++++ 5 files changed, 100 insertions(+) create mode 100644 osu.ElasticIndexer/Commands/ScoresCommand.cs create mode 100644 osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs create mode 100644 osu.ElasticIndexer/Commands/ScoresIndexCommand.cs diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs new file mode 100644 index 00000000..90bd72e1 --- /dev/null +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -0,0 +1,21 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; +using System.Linq; +using System.Threading; +using McMaster.Extensions.CommandLineUtils; + +namespace osu.ElasticIndexer.Commands +{ + [Command("scores", Description = "Namespace for score indexing commands")] + [Subcommand(typeof(ScoresDeleteCommand))] + [Subcommand(typeof(ScoresIndexCommand))] + public class ScoresCommand : ProcessorCommandBase + { + public int OnExecute(CancellationToken cancellationToken) + { + return 0; + } + } +} diff --git a/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs b/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs new file mode 100644 index 00000000..a875e9b5 --- /dev/null +++ b/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs @@ -0,0 +1,36 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; +using System.ComponentModel.DataAnnotations; +using System.Threading; +using McMaster.Extensions.CommandLineUtils; + +namespace osu.ElasticIndexer.Commands +{ + [Command("delete", Description = "Queues a score for deletion")] + public class ScoresDeleteCommand : ProcessorCommandBase + { + [Argument(0)] + [Required] + public long ScoreId { get; set; } + + public int OnExecute(CancellationToken cancellationToken) + { + if (string.IsNullOrEmpty(AppSettings.Schema)) + throw new MissingSchemaException(); + + var redis = new Redis(); + var currentSchema = redis.GetSchemaVersion(); + Console.WriteLine(ConsoleColor.Cyan, $"Current schema version is: {currentSchema}"); + Console.WriteLine(ConsoleColor.Cyan, $"Pushing to queue with schema: {AppSettings.Schema}"); + + var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }, "delete"); + Processor.PushToQueue(scoreItem); + + Console.WriteLine(ConsoleColor.Cyan, $"Queued delete: {scoreItem}"); + + return 0; + } + } +} diff --git a/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs b/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs new file mode 100644 index 00000000..4c5a6819 --- /dev/null +++ b/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs @@ -0,0 +1,36 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; +using System.ComponentModel.DataAnnotations; +using System.Threading; +using McMaster.Extensions.CommandLineUtils; + +namespace osu.ElasticIndexer.Commands +{ + [Command("index", Description = "Queues a score for indexing")] + public class ScoresIndexCommand : ProcessorCommandBase + { + [Argument(0)] + [Required] + public long ScoreId { get; set; } + + public int OnExecute(CancellationToken cancellationToken) + { + if (string.IsNullOrEmpty(AppSettings.Schema)) + throw new MissingSchemaException(); + + var redis = new Redis(); + var currentSchema = redis.GetSchemaVersion(); + Console.WriteLine(ConsoleColor.Cyan, $"Current schema version is: {currentSchema}"); + Console.WriteLine(ConsoleColor.Cyan, $"Pushing to queue with schema: {AppSettings.Schema}"); + + var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }, "index"); + Processor.PushToQueue(scoreItem); + + Console.WriteLine(ConsoleColor.Cyan, $"Queued index: {scoreItem}"); + + return 0; + } + } +} diff --git a/osu.ElasticIndexer/Program.cs b/osu.ElasticIndexer/Program.cs index b9205fff..1d5c0442 100644 --- a/osu.ElasticIndexer/Program.cs +++ b/osu.ElasticIndexer/Program.cs @@ -15,6 +15,7 @@ namespace osu.ElasticIndexer [Subcommand(typeof(OpenIndexCommand))] [Subcommand(typeof(PumpAllScoresCommand))] [Subcommand(typeof(PumpFakeScoresCommand))] + [Subcommand(typeof(ScoresCommand))] [Subcommand(typeof(SchemaVersionCommand))] [Subcommand(typeof(UpdateAliasCommand))] [Subcommand(typeof(WatchQueueCommand))] diff --git a/osu.ElasticIndexer/ScoreItem.cs b/osu.ElasticIndexer/ScoreItem.cs index a1abdd06..4f7f3bfc 100644 --- a/osu.ElasticIndexer/ScoreItem.cs +++ b/osu.ElasticIndexer/ScoreItem.cs @@ -7,6 +7,7 @@ namespace osu.ElasticIndexer { public class ScoreItem : QueueItem { + public string? Action { get; } public SoloScore Score { get; } public ScoreItem(SoloScore score) @@ -14,6 +15,11 @@ public ScoreItem(SoloScore score) Score = score; } + public ScoreItem(SoloScore score, string action): this(score) + { + Action = action; + } + public override string ToString() => $"ScoreItem id: {Score.id}"; } } From e9b8026e016c4324af3e83d57629ac5144d4b8ca Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 14 Jul 2022 18:11:19 +0900 Subject: [PATCH 03/31] figure out deserialization later --- osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs | 2 +- osu.ElasticIndexer/Commands/ScoresIndexCommand.cs | 2 +- osu.ElasticIndexer/ScoreItem.cs | 10 +++------- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs b/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs index a875e9b5..83bc39d9 100644 --- a/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs @@ -25,7 +25,7 @@ public int OnExecute(CancellationToken cancellationToken) Console.WriteLine(ConsoleColor.Cyan, $"Current schema version is: {currentSchema}"); Console.WriteLine(ConsoleColor.Cyan, $"Pushing to queue with schema: {AppSettings.Schema}"); - var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }, "delete"); + var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }) { Action = "delete" }; Processor.PushToQueue(scoreItem); Console.WriteLine(ConsoleColor.Cyan, $"Queued delete: {scoreItem}"); diff --git a/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs b/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs index 4c5a6819..26f9245e 100644 --- a/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs @@ -25,7 +25,7 @@ public int OnExecute(CancellationToken cancellationToken) Console.WriteLine(ConsoleColor.Cyan, $"Current schema version is: {currentSchema}"); Console.WriteLine(ConsoleColor.Cyan, $"Pushing to queue with schema: {AppSettings.Schema}"); - var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }, "index"); + var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }) { Action = "index" }; Processor.PushToQueue(scoreItem); Console.WriteLine(ConsoleColor.Cyan, $"Queued index: {scoreItem}"); diff --git a/osu.ElasticIndexer/ScoreItem.cs b/osu.ElasticIndexer/ScoreItem.cs index 4f7f3bfc..c59937a9 100644 --- a/osu.ElasticIndexer/ScoreItem.cs +++ b/osu.ElasticIndexer/ScoreItem.cs @@ -7,7 +7,8 @@ namespace osu.ElasticIndexer { public class ScoreItem : QueueItem { - public string? Action { get; } + // TODO :figure out what's the deal with constructor not working? + public string? Action { get; set; } public SoloScore Score { get; } public ScoreItem(SoloScore score) @@ -15,11 +16,6 @@ public ScoreItem(SoloScore score) Score = score; } - public ScoreItem(SoloScore score, string action): this(score) - { - Action = action; - } - - public override string ToString() => $"ScoreItem id: {Score.id}"; + public override string ToString() => $"ScoreItem id: {Score.id}, action: {Action}"; } } From f8435d420b994f209edb70bcdea2938ee6d92fc1 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 14 Jul 2022 19:20:50 +0900 Subject: [PATCH 04/31] support looking up records by id --- osu.ElasticIndexer/ElasticModel.cs | 16 ++++++++++++++ osu.ElasticIndexer/IndexQueueProcessor.cs | 26 +++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/osu.ElasticIndexer/ElasticModel.cs b/osu.ElasticIndexer/ElasticModel.cs index b7f5a6fd..0e44a0ce 100644 --- a/osu.ElasticIndexer/ElasticModel.cs +++ b/osu.ElasticIndexer/ElasticModel.cs @@ -61,5 +61,21 @@ public static IEnumerable> Chunk(string? where, int chunkSize = 10000 public static IEnumerable> Chunk(int chunkSize = 10000, long? resumeFrom = null) where T : ElasticModel => Chunk(null, chunkSize, resumeFrom); + + public static List Find(IEnumerable ids) where T : ElasticModel + { + using (var dbConnection = new MySqlConnection(AppSettings.ConnectionString)) + { + var attribute = typeof(T).GetCustomAttributes().First(); + var cursorColumn = attribute.CursorColumn; + var selects = attribute.Query; + + string query = $"select {selects} where {cursorColumn} in @ids"; + + dbConnection.Open(); + + return dbConnection.Query(query, new { ids }).AsList(); + } + } } } diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 928b460c..e7f3ab0e 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -40,15 +40,41 @@ protected override void ProcessResults(IEnumerable items) { var add = new List(); var remove = new List(); + var list = new List(); foreach (var item in items) { + var action = item.Action?.ToLowerInvariant(); + if (action == "delete") + { + remove.Add(item.Score); + continue; + } + + if (action == "index") + { + list.Add(item.Score.id); + continue; + } + if (item.Score.ShouldIndex) add.Add(item.Score); else remove.Add(item.Score); } + if (list.Any()) + { + var scores = ElasticModel.Find(list); + foreach (var score in scores) + { + if (score.ShouldIndex) + add.Add(score); + else + remove.Add(score); + } + } + var bulkDescriptor = new BulkDescriptor() .Index(index) .IndexMany(add) From e33439747636b94a17e5b01928b34cabe46f8e3f Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 14 Jul 2022 19:25:27 +0900 Subject: [PATCH 05/31] only need the id for deleting --- osu.ElasticIndexer/IndexQueueProcessor.cs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index e7f3ab0e..630490a1 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -39,7 +39,7 @@ internal IndexQueueProcessor(string index, Client client, Action stopCallback) protected override void ProcessResults(IEnumerable items) { var add = new List(); - var remove = new List(); + var remove = new List(); var list = new List(); foreach (var item in items) @@ -47,7 +47,7 @@ protected override void ProcessResults(IEnumerable items) var action = item.Action?.ToLowerInvariant(); if (action == "delete") { - remove.Add(item.Score); + remove.Add(item.Score.id.ToString()); continue; } @@ -60,7 +60,7 @@ protected override void ProcessResults(IEnumerable items) if (item.Score.ShouldIndex) add.Add(item.Score); else - remove.Add(item.Score); + remove.Add(item.Score.id.ToString()); } if (list.Any()) @@ -71,14 +71,15 @@ protected override void ProcessResults(IEnumerable items) if (score.ShouldIndex) add.Add(score); else - remove.Add(score); + remove.Add(score.id.ToString()); } } var bulkDescriptor = new BulkDescriptor() .Index(index) .IndexMany(add) - .DeleteMany(remove); + // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 + .DeleteMany(remove); var response = client.ElasticClient.Bulk(bulkDescriptor); From 0635423ffdc7570b2a102c98659607c597165494 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 14 Jul 2022 20:48:39 +0900 Subject: [PATCH 06/31] consolidate to single command --- osu.ElasticIndexer/Commands/ScoresCommand.cs | 20 +++++++++-- .../Commands/ScoresDeleteCommand.cs | 36 ------------------- .../Commands/ScoresIndexCommand.cs | 36 ------------------- osu.ElasticIndexer/IndexQueueProcessor.cs | 2 +- osu.ElasticIndexer/ScoreItem.cs | 5 ++- 5 files changed, 23 insertions(+), 76 deletions(-) delete mode 100644 osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs delete mode 100644 osu.ElasticIndexer/Commands/ScoresIndexCommand.cs diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs index 90bd72e1..ebcb26e7 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -2,6 +2,7 @@ // See the LICENCE file in the repository root for full licence text. using System; +using System.ComponentModel.DataAnnotations; using System.Linq; using System.Threading; using McMaster.Extensions.CommandLineUtils; @@ -9,12 +10,27 @@ namespace osu.ElasticIndexer.Commands { [Command("scores", Description = "Namespace for score indexing commands")] - [Subcommand(typeof(ScoresDeleteCommand))] - [Subcommand(typeof(ScoresIndexCommand))] public class ScoresCommand : ProcessorCommandBase { + [Argument(0)] + [Required] + [AllowedValues("delete", "index", IgnoreCase = true)] + public string Action { get; set; } = string.Empty; + + [Argument(1)] + [Required] + public long ScoreId { get; set; } + public int OnExecute(CancellationToken cancellationToken) { + if (string.IsNullOrEmpty(AppSettings.Schema)) + throw new MissingSchemaException(); + + var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }) { Action = Action }; + Processor.PushToQueue(scoreItem); + + Console.WriteLine(ConsoleColor.Cyan, $"Queued to {Processor.QueueName}: {scoreItem}"); + return 0; } } diff --git a/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs b/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs deleted file mode 100644 index 83bc39d9..00000000 --- a/osu.ElasticIndexer/Commands/ScoresDeleteCommand.cs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. -// See the LICENCE file in the repository root for full licence text. - -using System; -using System.ComponentModel.DataAnnotations; -using System.Threading; -using McMaster.Extensions.CommandLineUtils; - -namespace osu.ElasticIndexer.Commands -{ - [Command("delete", Description = "Queues a score for deletion")] - public class ScoresDeleteCommand : ProcessorCommandBase - { - [Argument(0)] - [Required] - public long ScoreId { get; set; } - - public int OnExecute(CancellationToken cancellationToken) - { - if (string.IsNullOrEmpty(AppSettings.Schema)) - throw new MissingSchemaException(); - - var redis = new Redis(); - var currentSchema = redis.GetSchemaVersion(); - Console.WriteLine(ConsoleColor.Cyan, $"Current schema version is: {currentSchema}"); - Console.WriteLine(ConsoleColor.Cyan, $"Pushing to queue with schema: {AppSettings.Schema}"); - - var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }) { Action = "delete" }; - Processor.PushToQueue(scoreItem); - - Console.WriteLine(ConsoleColor.Cyan, $"Queued delete: {scoreItem}"); - - return 0; - } - } -} diff --git a/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs b/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs deleted file mode 100644 index 26f9245e..00000000 --- a/osu.ElasticIndexer/Commands/ScoresIndexCommand.cs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. -// See the LICENCE file in the repository root for full licence text. - -using System; -using System.ComponentModel.DataAnnotations; -using System.Threading; -using McMaster.Extensions.CommandLineUtils; - -namespace osu.ElasticIndexer.Commands -{ - [Command("index", Description = "Queues a score for indexing")] - public class ScoresIndexCommand : ProcessorCommandBase - { - [Argument(0)] - [Required] - public long ScoreId { get; set; } - - public int OnExecute(CancellationToken cancellationToken) - { - if (string.IsNullOrEmpty(AppSettings.Schema)) - throw new MissingSchemaException(); - - var redis = new Redis(); - var currentSchema = redis.GetSchemaVersion(); - Console.WriteLine(ConsoleColor.Cyan, $"Current schema version is: {currentSchema}"); - Console.WriteLine(ConsoleColor.Cyan, $"Pushing to queue with schema: {AppSettings.Schema}"); - - var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }) { Action = "index" }; - Processor.PushToQueue(scoreItem); - - Console.WriteLine(ConsoleColor.Cyan, $"Queued index: {scoreItem}"); - - return 0; - } - } -} diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 630490a1..0e4088ca 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -44,7 +44,7 @@ protected override void ProcessResults(IEnumerable items) foreach (var item in items) { - var action = item.Action?.ToLowerInvariant(); + var action = item.ParsedAction; if (action == "delete") { remove.Add(item.Score.id.ToString()); diff --git a/osu.ElasticIndexer/ScoreItem.cs b/osu.ElasticIndexer/ScoreItem.cs index c59937a9..a240f09a 100644 --- a/osu.ElasticIndexer/ScoreItem.cs +++ b/osu.ElasticIndexer/ScoreItem.cs @@ -9,6 +9,9 @@ public class ScoreItem : QueueItem { // TODO :figure out what's the deal with constructor not working? public string? Action { get; set; } + + public string? ParsedAction => Action?.ToLowerInvariant(); + public SoloScore Score { get; } public ScoreItem(SoloScore score) @@ -16,6 +19,6 @@ public ScoreItem(SoloScore score) Score = score; } - public override string ToString() => $"ScoreItem id: {Score.id}, action: {Action}"; + public override string ToString() => $"ScoreItem id: {Score.id} {Action}"; } } From 56caed675416065bdf986ff979335dcbbe51ddad Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 14 Jul 2022 20:50:02 +0900 Subject: [PATCH 07/31] Required long arugment just puts 0 if it's missing... --- osu.ElasticIndexer/Commands/ScoresCommand.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs index ebcb26e7..d5bc2e2a 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -19,14 +19,15 @@ public class ScoresCommand : ProcessorCommandBase [Argument(1)] [Required] - public long ScoreId { get; set; } + public string ScoreId { get; set; } = string.Empty; public int OnExecute(CancellationToken cancellationToken) { if (string.IsNullOrEmpty(AppSettings.Schema)) throw new MissingSchemaException(); - var scoreItem = new ScoreItem(new SoloScore() { id = ScoreId }) { Action = Action }; + var Id = long.Parse(ScoreId); + var scoreItem = new ScoreItem(new SoloScore() { id = Id }) { Action = Action }; Processor.PushToQueue(scoreItem); Console.WriteLine(ConsoleColor.Cyan, $"Queued to {Processor.QueueName}: {scoreItem}"); From fc49a8d9f863d438ab51460ab2a07f810062c6f9 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 14 Jul 2022 22:54:52 +0900 Subject: [PATCH 08/31] move the Action/ScoreId out of the score structure --- osu.ElasticIndexer/Commands/ScoresCommand.cs | 2 +- osu.ElasticIndexer/IndexQueueProcessor.cs | 35 ++++++++++++-------- osu.ElasticIndexer/ScoreItem.cs | 11 ++++-- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs index d5bc2e2a..56a0a165 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -27,7 +27,7 @@ public int OnExecute(CancellationToken cancellationToken) throw new MissingSchemaException(); var Id = long.Parse(ScoreId); - var scoreItem = new ScoreItem(new SoloScore() { id = Id }) { Action = Action }; + var scoreItem = new ScoreItem() { Action = Action, ScoreId = Id }; Processor.PushToQueue(scoreItem); Console.WriteLine(ConsoleColor.Cyan, $"Queued to {Processor.QueueName}: {scoreItem}"); diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 0e4088ca..b07d9ea4 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -40,32 +40,39 @@ protected override void ProcessResults(IEnumerable items) { var add = new List(); var remove = new List(); - var list = new List(); + var lookupIds = new List(); foreach (var item in items) { var action = item.ParsedAction; - if (action == "delete") + if (item.ScoreId != null && action != null) { - remove.Add(item.Score.id.ToString()); - continue; + var id = (long) item.ScoreId; // doesn't figure out id isn't nullable here... + if (action == "delete") + { + remove.Add(id.ToString()); + } + else if (action == "index") + { + lookupIds.Add(id); + } } - - if (action == "index") + else if (item.Score != null) { - list.Add(item.Score.id); - continue; + if (item.Score.ShouldIndex) + add.Add(item.Score); + else + remove.Add(item.Score.id.ToString()); } - - if (item.Score.ShouldIndex) - add.Add(item.Score); else - remove.Add(item.Score.id.ToString()); + { + Console.WriteLine(ConsoleColor.Red, "queue item missing both data and action"); + } } - if (list.Any()) + if (lookupIds.Any()) { - var scores = ElasticModel.Find(list); + var scores = ElasticModel.Find(lookupIds); foreach (var score in scores) { if (score.ShouldIndex) diff --git a/osu.ElasticIndexer/ScoreItem.cs b/osu.ElasticIndexer/ScoreItem.cs index a240f09a..a8aa846e 100644 --- a/osu.ElasticIndexer/ScoreItem.cs +++ b/osu.ElasticIndexer/ScoreItem.cs @@ -1,6 +1,7 @@ // Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. +using System.Runtime.Serialization; using osu.Server.QueueProcessor; namespace osu.ElasticIndexer @@ -9,16 +10,22 @@ public class ScoreItem : QueueItem { // TODO :figure out what's the deal with constructor not working? public string? Action { get; set; } + public long? ScoreId { get; set; } + [IgnoreDataMember] public string? ParsedAction => Action?.ToLowerInvariant(); - public SoloScore Score { get; } + public SoloScore? Score { get; set; } + + public ScoreItem() + { + } public ScoreItem(SoloScore score) { Score = score; } - public override string ToString() => $"ScoreItem id: {Score.id} {Action}"; + public override string ToString() => Score != null ? $"ScoreItem Score: {Score.id}" : $"ScoreItem ScoreId: {ScoreId} {Action} "; } } From 12e5b7b838abb831406835be8c153c37a40e4635 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 17:12:48 +0900 Subject: [PATCH 09/31] add command for pushing arbitrary commands to queue. makes it easier to test handling arbitrary data instead of making different QueueItem subclasses --- .../Commands/PushFileCommand.cs | 34 +++++++++++++++++++ osu.ElasticIndexer/Program.cs | 1 + 2 files changed, 35 insertions(+) create mode 100644 osu.ElasticIndexer/Commands/PushFileCommand.cs diff --git a/osu.ElasticIndexer/Commands/PushFileCommand.cs b/osu.ElasticIndexer/Commands/PushFileCommand.cs new file mode 100644 index 00000000..44a8d395 --- /dev/null +++ b/osu.ElasticIndexer/Commands/PushFileCommand.cs @@ -0,0 +1,34 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System.ComponentModel.DataAnnotations; +using System.IO; +using System.Threading; +using McMaster.Extensions.CommandLineUtils; + +namespace osu.ElasticIndexer.Commands +{ + [Command("push-file", Description = "Push contents of a file to the queue for testing.")] + public class PushFileCommand : ProcessorCommandBase + { + [Argument(0)] + [Required] + public string Filename { get; set; } = string.Empty; + + public int OnExecute(CancellationToken cancellationToken) + { + if (string.IsNullOrEmpty(AppSettings.Schema)) + throw new MissingSchemaException(); + + var value = File.ReadAllText(Filename); + + var redis = new Redis(); + var database = redis.Connection.GetDatabase(); + var queueName = $"osu-queue:score-index-{AppSettings.Schema}"; + + database.ListLeftPush(queueName, value); + + return 0; + } + } +} diff --git a/osu.ElasticIndexer/Program.cs b/osu.ElasticIndexer/Program.cs index 1d5c0442..c314c01e 100644 --- a/osu.ElasticIndexer/Program.cs +++ b/osu.ElasticIndexer/Program.cs @@ -15,6 +15,7 @@ namespace osu.ElasticIndexer [Subcommand(typeof(OpenIndexCommand))] [Subcommand(typeof(PumpAllScoresCommand))] [Subcommand(typeof(PumpFakeScoresCommand))] + [Subcommand(typeof(PushFileCommand))] [Subcommand(typeof(ScoresCommand))] [Subcommand(typeof(SchemaVersionCommand))] [Subcommand(typeof(UpdateAliasCommand))] From b40a6c65302dbbb6ca7bd6609350c690ff61a480 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 17:17:34 +0900 Subject: [PATCH 10/31] sending empty payload is an error --- osu.ElasticIndexer/IndexQueueProcessor.cs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index b07d9ea4..0d353d85 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -82,15 +82,18 @@ protected override void ProcessResults(IEnumerable items) } } - var bulkDescriptor = new BulkDescriptor() - .Index(index) - .IndexMany(add) - // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 - .DeleteMany(remove); + if (add.Any() || remove.Any()) + { + var bulkDescriptor = new BulkDescriptor() + .Index(index) + .IndexMany(add) + // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 + .DeleteMany(remove); - var response = client.ElasticClient.Bulk(bulkDescriptor); + var response = client.ElasticClient.Bulk(bulkDescriptor); - handleResponse(response, items); + handleResponse(response, items); + } } private BulkResponse dispatch(BulkDescriptor bulkDescriptor) From ec9d810369a09ccf892c18157f7b1ea327fd2bd1 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 17:29:41 +0900 Subject: [PATCH 11/31] helper method for organizing bulk descriptor payload --- osu.ElasticIndexer/IndexQueueProcessor.cs | 34 +++++++++++++---------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 0d353d85..1a054acc 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -18,6 +18,9 @@ public class IndexQueueProcessor : QueueProcessor private readonly Client client; private readonly string index; + private readonly List scoresAdd = new List(); + private readonly List scoresRemove = new List(); + // QueueProcessor doesn't expose cancellation without overriding Run, // so we're making use of a supplied callback to stop processing. private readonly Action stop; @@ -38,8 +41,6 @@ internal IndexQueueProcessor(string index, Client client, Action stopCallback) protected override void ProcessResults(IEnumerable items) { - var add = new List(); - var remove = new List(); var lookupIds = new List(); foreach (var item in items) @@ -50,7 +51,7 @@ protected override void ProcessResults(IEnumerable items) var id = (long) item.ScoreId; // doesn't figure out id isn't nullable here... if (action == "delete") { - remove.Add(id.ToString()); + scoresRemove.Add(id.ToString()); } else if (action == "index") { @@ -59,10 +60,7 @@ protected override void ProcessResults(IEnumerable items) } else if (item.Score != null) { - if (item.Score.ShouldIndex) - add.Add(item.Score); - else - remove.Add(item.Score.id.ToString()); + addToBuffer(item.Score); } else { @@ -75,25 +73,33 @@ protected override void ProcessResults(IEnumerable items) var scores = ElasticModel.Find(lookupIds); foreach (var score in scores) { - if (score.ShouldIndex) - add.Add(score); - else - remove.Add(score.id.ToString()); + addToBuffer(score); } } - if (add.Any() || remove.Any()) + if (scoresAdd.Any() || scoresRemove.Any()) { var bulkDescriptor = new BulkDescriptor() .Index(index) - .IndexMany(add) + .IndexMany(scoresAdd) // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 - .DeleteMany(remove); + .DeleteMany(scoresRemove); var response = client.ElasticClient.Bulk(bulkDescriptor); handleResponse(response, items); } + + scoresAdd.Clear(); + scoresRemove.Clear(); + } + + private void addToBuffer(SoloScore score) + { + if (score.ShouldIndex) + scoresAdd.Add(score); + else + scoresRemove.Add(score.id.ToString()); } private BulkResponse dispatch(BulkDescriptor bulkDescriptor) From 3c1450e59a1b22629f7d3204da3256d952ce4cd0 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 17:42:00 +0900 Subject: [PATCH 12/31] remove scores that don't exist via lookup --- osu.ElasticIndexer/IndexQueueProcessor.cs | 31 +++++++++++++++-------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 1a054acc..545fe8fd 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -18,6 +18,7 @@ public class IndexQueueProcessor : QueueProcessor private readonly Client client; private readonly string index; + private readonly HashSet lookupIds = new HashSet(); private readonly List scoresAdd = new List(); private readonly List scoresRemove = new List(); @@ -41,8 +42,7 @@ internal IndexQueueProcessor(string index, Client client, Action stopCallback) protected override void ProcessResults(IEnumerable items) { - var lookupIds = new List(); - + // Figure out what to do with the queue item. foreach (var item in items) { var action = item.ParsedAction; @@ -68,14 +68,8 @@ protected override void ProcessResults(IEnumerable items) } } - if (lookupIds.Any()) - { - var scores = ElasticModel.Find(lookupIds); - foreach (var score in scores) - { - addToBuffer(score); - } - } + // Handle any scores that need a lookup. + performLookup(); if (scoresAdd.Any() || scoresRemove.Any()) { @@ -167,6 +161,23 @@ private void handleResponse(BulkResponse response, IEnumerable items) // TODO: per-item errors? } + private void performLookup() + { + if (!lookupIds.Any()) return; + + var scores = ElasticModel.Find(lookupIds); + foreach (var score in scores) + { + addToBuffer(score); + lookupIds.Remove(score.id); + } + + // Remaining scores do not exist and should be deleted. + scoresRemove.AddRange(lookupIds.Select(id => id.ToString())); + + lookupIds.Clear(); + } + private void waitUntilActive() { // Spin until valid response from elasticsearch. From dac0f581fc31e175f269fc865470ee271c57ecba Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 18:11:17 +0900 Subject: [PATCH 13/31] rely on ScoreItem.ToString() --- osu.ElasticIndexer/IndexQueueProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 545fe8fd..c89efc72 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -132,7 +132,7 @@ private void handleResponse(BulkResponse response, IEnumerable items) // Elasticsearch bulk thread pool is full. if (response.ItemsWithErrors.Any(item => item.Status == 429 || item.Error.Type == "es_rejected_execution_exception")) { - Console.WriteLine(ConsoleColor.Yellow, $"Server returned 429, re-queued chunk with lastId {items.Last().Score.id}"); + Console.WriteLine(ConsoleColor.Yellow, $"Server returned 429, re-queued chunk with lastId {items.Last()}"); foreach (var item in items) { From e592b852091dc579905df4776cad4372529b9ed0 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 18:11:44 +0900 Subject: [PATCH 14/31] add some documentation --- README.md | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/README.md b/README.md index a049987d..9c03fe99 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,20 @@ For testing purposes, we can add fake items to the queue: It should be noted that these items will not exist or match the ones in the database. +## Queuing a specific score for indexing + + schema=${schema} dotnet run scores index ${id} + +will queue the score with `${id}` for indexing; the score will be added or deleted as necessary, according to the value of `SoloScore.ShouldIndex`. + +See [Queuing items for processing from another client](#queuing-items-for-processing-from-another-client) + +## Queuing a specific score for deletion + + schema=${schema} dotnet run scores delete ${id} + +will queue the score with `${id}` for deletion, regardless of whether the score should be indexed or not. + ## Adding existing database records to the queue schema=1 dotnet run all @@ -149,3 +163,35 @@ Populating an index is done by pushing score items to a queue. where `${cmd}` is the command to run, e.g. `dotnet osu.ElasticIndexer.dll queue` # Typical use-cases + +## Queuing items for processing from another client + +Push items into the Redis queue "`osu-queue:score-index-${schema}`" +e.g. + +```csharp +ListLeftPush("osu-queue:score-index-1", "{ \"Action\": \"index\",\"ScoreId\": 1 }"); +``` + +or from redis-cli: +``` +LPUSH "osu-queue:score-index-1" "{\"Action\":\"index\",\"ScoreId\":1}" +``` + +### Indexing a score by `id` +```json +{ "Action": "index", "ScoreId": 1 } +``` + +### Deleting a score by `id` +```json +{ "Action": "delete", "ScoreId": 1 } +``` + +### Queuing a whole score + +```json +{ + "Score": {Solo.Score} +} +``` From 2c2840a2e2a16853e8f9d01c1b8f3c355ae42c10 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 18:26:53 +0900 Subject: [PATCH 15/31] fix description --- osu.ElasticIndexer/Commands/ScoresCommand.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs index 56a0a165..478efd37 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -9,7 +9,7 @@ namespace osu.ElasticIndexer.Commands { - [Command("scores", Description = "Namespace for score indexing commands")] + [Command("scores", Description = "Queue a score for indexing or deletion by id.")] public class ScoresCommand : ProcessorCommandBase { [Argument(0)] From 708546b5b3b0be81af6d37932f653f140c0c4b39 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 18:47:02 +0900 Subject: [PATCH 16/31] formatting --- osu.ElasticIndexer/Commands/ScoresCommand.cs | 4 ++-- osu.ElasticIndexer/IndexQueueProcessor.cs | 13 ++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs index 478efd37..420559bc 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -26,8 +26,8 @@ public int OnExecute(CancellationToken cancellationToken) if (string.IsNullOrEmpty(AppSettings.Schema)) throw new MissingSchemaException(); - var Id = long.Parse(ScoreId); - var scoreItem = new ScoreItem() { Action = Action, ScoreId = Id }; + var id = long.Parse(ScoreId); + var scoreItem = new ScoreItem { Action = Action, ScoreId = id }; Processor.PushToQueue(scoreItem); Console.WriteLine(ConsoleColor.Cyan, $"Queued to {Processor.QueueName}: {scoreItem}"); diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index c89efc72..79d736fe 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -46,9 +46,11 @@ protected override void ProcessResults(IEnumerable items) foreach (var item in items) { var action = item.ParsedAction; + if (item.ScoreId != null && action != null) { - var id = (long) item.ScoreId; // doesn't figure out id isn't nullable here... + var id = (long)item.ScoreId; // doesn't figure out id isn't nullable here... + if (action == "delete") { scoresRemove.Add(id.ToString()); @@ -74,10 +76,10 @@ protected override void ProcessResults(IEnumerable items) if (scoresAdd.Any() || scoresRemove.Any()) { var bulkDescriptor = new BulkDescriptor() - .Index(index) - .IndexMany(scoresAdd) - // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 - .DeleteMany(scoresRemove); + .Index(index) + .IndexMany(scoresAdd) + // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 + .DeleteMany(scoresRemove); var response = client.ElasticClient.Bulk(bulkDescriptor); @@ -166,6 +168,7 @@ private void performLookup() if (!lookupIds.Any()) return; var scores = ElasticModel.Find(lookupIds); + foreach (var score in scores) { addToBuffer(score); From 8579baa295dff46ab0f4e1982ecd735b5d147814 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 18:52:55 +0900 Subject: [PATCH 17/31] unused --- osu.ElasticIndexer/Commands/ScoresCommand.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs index 420559bc..44ed3fe2 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -3,7 +3,6 @@ using System; using System.ComponentModel.DataAnnotations; -using System.Linq; using System.Threading; using McMaster.Extensions.CommandLineUtils; From 4118771cefeffaad0bb3a8a10713e88cea914b9e Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 18:55:54 +0900 Subject: [PATCH 18/31] was missing the extra space before >_> --- osu.ElasticIndexer/IndexQueueProcessor.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 79d736fe..4484a465 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -76,10 +76,10 @@ protected override void ProcessResults(IEnumerable items) if (scoresAdd.Any() || scoresRemove.Any()) { var bulkDescriptor = new BulkDescriptor() - .Index(index) - .IndexMany(scoresAdd) - // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 - .DeleteMany(scoresRemove); + .Index(index) + .IndexMany(scoresAdd) + // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 + .DeleteMany(scoresRemove); var response = client.ElasticClient.Bulk(bulkDescriptor); From d7cfa684f5e2211a8307338745319cbdd754048b Mon Sep 17 00:00:00 2001 From: bakaneko Date: Fri, 15 Jul 2022 22:42:43 +0900 Subject: [PATCH 19/31] shouldn't be sharing the buffer between tasks --- osu.ElasticIndexer/IndexQueueItems.cs | 14 ++++++++ osu.ElasticIndexer/IndexQueueProcessor.cs | 43 ++++++++++------------- 2 files changed, 33 insertions(+), 24 deletions(-) create mode 100644 osu.ElasticIndexer/IndexQueueItems.cs diff --git a/osu.ElasticIndexer/IndexQueueItems.cs b/osu.ElasticIndexer/IndexQueueItems.cs new file mode 100644 index 00000000..282318f5 --- /dev/null +++ b/osu.ElasticIndexer/IndexQueueItems.cs @@ -0,0 +1,14 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System.Collections.Generic; + +namespace osu.ElasticIndexer +{ + public class IndexQueueItems + { + public readonly HashSet LookupIds = new HashSet(); + public readonly List Add = new List(); + public readonly List Remove = new List(); + } +} diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 4484a465..bd1201c0 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -18,10 +18,6 @@ public class IndexQueueProcessor : QueueProcessor private readonly Client client; private readonly string index; - private readonly HashSet lookupIds = new HashSet(); - private readonly List scoresAdd = new List(); - private readonly List scoresRemove = new List(); - // QueueProcessor doesn't expose cancellation without overriding Run, // so we're making use of a supplied callback to stop processing. private readonly Action stop; @@ -42,6 +38,8 @@ internal IndexQueueProcessor(string index, Client client, Action stopCallback) protected override void ProcessResults(IEnumerable items) { + var buffer = new IndexQueueItems(); + // Figure out what to do with the queue item. foreach (var item in items) { @@ -53,16 +51,16 @@ protected override void ProcessResults(IEnumerable items) if (action == "delete") { - scoresRemove.Add(id.ToString()); + buffer.Remove.Add(id.ToString()); } else if (action == "index") { - lookupIds.Add(id); + buffer.LookupIds.Add(id); } } else if (item.Score != null) { - addToBuffer(item.Score); + addToBuffer(item.Score, buffer); } else { @@ -71,31 +69,28 @@ protected override void ProcessResults(IEnumerable items) } // Handle any scores that need a lookup. - performLookup(); + performLookup(buffer); - if (scoresAdd.Any() || scoresRemove.Any()) + if (buffer.Add.Any() || buffer.Remove.Any()) { var bulkDescriptor = new BulkDescriptor() .Index(index) - .IndexMany(scoresAdd) + .IndexMany(buffer.Add) // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 - .DeleteMany(scoresRemove); + .DeleteMany(buffer.Remove); var response = client.ElasticClient.Bulk(bulkDescriptor); handleResponse(response, items); } - - scoresAdd.Clear(); - scoresRemove.Clear(); } - private void addToBuffer(SoloScore score) + private void addToBuffer(SoloScore score, IndexQueueItems buffer) { if (score.ShouldIndex) - scoresAdd.Add(score); + buffer.Add.Add(score); else - scoresRemove.Add(score.id.ToString()); + buffer.Remove.Add(score.id.ToString()); } private BulkResponse dispatch(BulkDescriptor bulkDescriptor) @@ -163,22 +158,22 @@ private void handleResponse(BulkResponse response, IEnumerable items) // TODO: per-item errors? } - private void performLookup() + private void performLookup(IndexQueueItems buffer) { - if (!lookupIds.Any()) return; + if (!buffer.LookupIds.Any()) return; - var scores = ElasticModel.Find(lookupIds); + var scores = ElasticModel.Find(buffer.LookupIds); foreach (var score in scores) { - addToBuffer(score); - lookupIds.Remove(score.id); + addToBuffer(score, buffer); + buffer.LookupIds.Remove(score.id); } // Remaining scores do not exist and should be deleted. - scoresRemove.AddRange(lookupIds.Select(id => id.ToString())); + buffer.Remove.AddRange(buffer.LookupIds.Select(id => id.ToString())); - lookupIds.Clear(); + buffer.LookupIds.Clear(); } private void waitUntilActive() From 4b995dcd84e0e1fb4aee5bd000c1fb49b3c19d56 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Wed, 20 Jul 2022 13:02:31 +0900 Subject: [PATCH 20/31] use existing queue name --- osu.ElasticIndexer/Commands/PushFileCommand.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/osu.ElasticIndexer/Commands/PushFileCommand.cs b/osu.ElasticIndexer/Commands/PushFileCommand.cs index 44a8d395..5763f215 100644 --- a/osu.ElasticIndexer/Commands/PushFileCommand.cs +++ b/osu.ElasticIndexer/Commands/PushFileCommand.cs @@ -23,10 +23,7 @@ public int OnExecute(CancellationToken cancellationToken) var value = File.ReadAllText(Filename); var redis = new Redis(); - var database = redis.Connection.GetDatabase(); - var queueName = $"osu-queue:score-index-{AppSettings.Schema}"; - - database.ListLeftPush(queueName, value); + redis.Connection.GetDatabase().ListLeftPush(Processor.QueueName, value); return 0; } From e0e829c22c9b74bca62750fcf757e494342d46ff Mon Sep 17 00:00:00 2001 From: bakaneko Date: Wed, 20 Jul 2022 16:06:55 +0900 Subject: [PATCH 21/31] remove direct deletion support --- README.md | 6 ------ osu.ElasticIndexer/Commands/PushFileCommand.cs | 2 +- osu.ElasticIndexer/Commands/ScoresCommand.cs | 2 +- osu.ElasticIndexer/IndexQueueProcessor.cs | 6 +----- 4 files changed, 3 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 9c03fe99..8517c6da 100644 --- a/README.md +++ b/README.md @@ -132,12 +132,6 @@ will queue the score with `${id}` for indexing; the score will be added or delet See [Queuing items for processing from another client](#queuing-items-for-processing-from-another-client) -## Queuing a specific score for deletion - - schema=${schema} dotnet run scores delete ${id} - -will queue the score with `${id}` for deletion, regardless of whether the score should be indexed or not. - ## Adding existing database records to the queue schema=1 dotnet run all diff --git a/osu.ElasticIndexer/Commands/PushFileCommand.cs b/osu.ElasticIndexer/Commands/PushFileCommand.cs index 5763f215..a4e6211e 100644 --- a/osu.ElasticIndexer/Commands/PushFileCommand.cs +++ b/osu.ElasticIndexer/Commands/PushFileCommand.cs @@ -21,8 +21,8 @@ public int OnExecute(CancellationToken cancellationToken) throw new MissingSchemaException(); var value = File.ReadAllText(Filename); - var redis = new Redis(); + redis.Connection.GetDatabase().ListLeftPush(Processor.QueueName, value); return 0; diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/ScoresCommand.cs index 44ed3fe2..4b26bab3 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/ScoresCommand.cs @@ -13,7 +13,7 @@ public class ScoresCommand : ProcessorCommandBase { [Argument(0)] [Required] - [AllowedValues("delete", "index", IgnoreCase = true)] + [AllowedValues("index", IgnoreCase = true)] public string Action { get; set; } = string.Empty; [Argument(1)] diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index bd1201c0..7fae110c 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -49,11 +49,7 @@ protected override void ProcessResults(IEnumerable items) { var id = (long)item.ScoreId; // doesn't figure out id isn't nullable here... - if (action == "delete") - { - buffer.Remove.Add(id.ToString()); - } - else if (action == "index") + if (action == "index") { buffer.LookupIds.Add(id); } From 25a5fc6692d82cb4486cf2b5f7d72b6c80de3f42 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Wed, 20 Jul 2022 17:00:11 +0900 Subject: [PATCH 22/31] remove action --- README.md | 13 ++++--------- .../Commands/{ScoresCommand.cs => IndexCommand.cs} | 11 +++-------- osu.ElasticIndexer/IndexQueueProcessor.cs | 12 +++--------- osu.ElasticIndexer/Program.cs | 2 +- osu.ElasticIndexer/ScoreItem.cs | 7 +------ 5 files changed, 12 insertions(+), 33 deletions(-) rename osu.ElasticIndexer/Commands/{ScoresCommand.cs => IndexCommand.cs} (69%) diff --git a/README.md b/README.md index 8517c6da..4df4f375 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,7 @@ It should be noted that these items will not exist or match the ones in the data ## Queuing a specific score for indexing - schema=${schema} dotnet run scores index ${id} + schema=${schema} dotnet run index ${id} will queue the score with `${id}` for indexing; the score will be added or deleted as necessary, according to the value of `SoloScore.ShouldIndex`. @@ -164,22 +164,17 @@ Push items into the Redis queue "`osu-queue:score-index-${schema}`" e.g. ```csharp -ListLeftPush("osu-queue:score-index-1", "{ \"Action\": \"index\",\"ScoreId\": 1 }"); +ListLeftPush("osu-queue:score-index-1", "{ \"ScoreId\": 1 }"); ``` or from redis-cli: ``` -LPUSH "osu-queue:score-index-1" "{\"Action\":\"index\",\"ScoreId\":1}" +LPUSH "osu-queue:score-index-1" "{\"ScoreId\":1}" ``` ### Indexing a score by `id` ```json -{ "Action": "index", "ScoreId": 1 } -``` - -### Deleting a score by `id` -```json -{ "Action": "delete", "ScoreId": 1 } +{ "ScoreId": 1 } ``` ### Queuing a whole score diff --git a/osu.ElasticIndexer/Commands/ScoresCommand.cs b/osu.ElasticIndexer/Commands/IndexCommand.cs similarity index 69% rename from osu.ElasticIndexer/Commands/ScoresCommand.cs rename to osu.ElasticIndexer/Commands/IndexCommand.cs index 4b26bab3..c5e1b0c6 100644 --- a/osu.ElasticIndexer/Commands/ScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/IndexCommand.cs @@ -8,14 +8,9 @@ namespace osu.ElasticIndexer.Commands { - [Command("scores", Description = "Queue a score for indexing or deletion by id.")] - public class ScoresCommand : ProcessorCommandBase + [Command("index", Description = "Queue a score for indexing by id.")] + public class IndexCommand : ProcessorCommandBase { - [Argument(0)] - [Required] - [AllowedValues("index", IgnoreCase = true)] - public string Action { get; set; } = string.Empty; - [Argument(1)] [Required] public string ScoreId { get; set; } = string.Empty; @@ -26,7 +21,7 @@ public int OnExecute(CancellationToken cancellationToken) throw new MissingSchemaException(); var id = long.Parse(ScoreId); - var scoreItem = new ScoreItem { Action = Action, ScoreId = id }; + var scoreItem = new ScoreItem { ScoreId = id }; Processor.PushToQueue(scoreItem); Console.WriteLine(ConsoleColor.Cyan, $"Queued to {Processor.QueueName}: {scoreItem}"); diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 7fae110c..566d6c4f 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -43,16 +43,10 @@ protected override void ProcessResults(IEnumerable items) // Figure out what to do with the queue item. foreach (var item in items) { - var action = item.ParsedAction; - - if (item.ScoreId != null && action != null) + if (item.ScoreId != null) { - var id = (long)item.ScoreId; // doesn't figure out id isn't nullable here... - - if (action == "index") - { - buffer.LookupIds.Add(id); - } + // doesn't figure out id isn't nullable here... + buffer.LookupIds.Add((long)item.ScoreId); } else if (item.Score != null) { diff --git a/osu.ElasticIndexer/Program.cs b/osu.ElasticIndexer/Program.cs index c314c01e..48a7b325 100644 --- a/osu.ElasticIndexer/Program.cs +++ b/osu.ElasticIndexer/Program.cs @@ -12,11 +12,11 @@ namespace osu.ElasticIndexer [Subcommand(typeof(CloseIndexCommand))] [Subcommand(typeof(DeleteIndexCommand))] [Subcommand(typeof(ListIndicesCommand))] + [Subcommand(typeof(IndexCommand))] [Subcommand(typeof(OpenIndexCommand))] [Subcommand(typeof(PumpAllScoresCommand))] [Subcommand(typeof(PumpFakeScoresCommand))] [Subcommand(typeof(PushFileCommand))] - [Subcommand(typeof(ScoresCommand))] [Subcommand(typeof(SchemaVersionCommand))] [Subcommand(typeof(UpdateAliasCommand))] [Subcommand(typeof(WatchQueueCommand))] diff --git a/osu.ElasticIndexer/ScoreItem.cs b/osu.ElasticIndexer/ScoreItem.cs index a8aa846e..2076413c 100644 --- a/osu.ElasticIndexer/ScoreItem.cs +++ b/osu.ElasticIndexer/ScoreItem.cs @@ -8,13 +8,8 @@ namespace osu.ElasticIndexer { public class ScoreItem : QueueItem { - // TODO :figure out what's the deal with constructor not working? - public string? Action { get; set; } public long? ScoreId { get; set; } - [IgnoreDataMember] - public string? ParsedAction => Action?.ToLowerInvariant(); - public SoloScore? Score { get; set; } public ScoreItem() @@ -26,6 +21,6 @@ public ScoreItem(SoloScore score) Score = score; } - public override string ToString() => Score != null ? $"ScoreItem Score: {Score.id}" : $"ScoreItem ScoreId: {ScoreId} {Action} "; + public override string ToString() => Score != null ? $"ScoreItem Score: {Score.id}" : $"ScoreItem ScoreId: {ScoreId}"; } } From 3e601b72f0ae912f2d7e33add70975a7028b7c58 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Wed, 20 Jul 2022 17:02:16 +0900 Subject: [PATCH 23/31] more accurate condition of indexing in readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4df4f375..7e5d57df 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,8 @@ A string value is used to indicate the current schema version to be used. ## Adding items to be indexed -Scores with `preserve`=`true` will be added to the index, -scores with `preserve`=`false` will be removed from the index. +Scores with `preserve`=`true` belonging to a user with `user_warnings`=`0` will be added to the index, +scores where any of the previous conditions are false will be removed from the index. Push items to `osu-queue:score-index-${schema}` From 34af8ac0a3882e638d1aa76fcab61ddd905b3186 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 20 Jul 2022 18:33:07 +0900 Subject: [PATCH 24/31] Simplify `ScoreItem` construction --- .../Commands/PumpAllScoresCommand.cs | 2 +- .../Commands/PumpFakeScoresCommand.cs | 2 +- osu.ElasticIndexer/ScoreItem.cs | 16 ++++------------ 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/osu.ElasticIndexer/Commands/PumpAllScoresCommand.cs b/osu.ElasticIndexer/Commands/PumpAllScoresCommand.cs index 5dc013ab..52dda46d 100644 --- a/osu.ElasticIndexer/Commands/PumpAllScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/PumpAllScoresCommand.cs @@ -53,7 +53,7 @@ public int OnExecute(CancellationToken cancellationToken) if (Verbose) Console.WriteLine($"Pushing {score}"); - Processor.PushToQueue(new ScoreItem(score)); + Processor.PushToQueue(new ScoreItem { Score = score }); } if (!Verbose) diff --git a/osu.ElasticIndexer/Commands/PumpFakeScoresCommand.cs b/osu.ElasticIndexer/Commands/PumpFakeScoresCommand.cs index 15a4643d..82198581 100644 --- a/osu.ElasticIndexer/Commands/PumpFakeScoresCommand.cs +++ b/osu.ElasticIndexer/Commands/PumpFakeScoresCommand.cs @@ -54,7 +54,7 @@ public int OnExecute(CancellationToken cancellationToken) preserve = true }; - Processor.PushToQueue(new ScoreItem(score)); + Processor.PushToQueue(new ScoreItem { Score = score }); if (counter % 1000 == 0) Console.WriteLine($"pushed to {Processor.QueueName}, current id: {counter}"); diff --git a/osu.ElasticIndexer/ScoreItem.cs b/osu.ElasticIndexer/ScoreItem.cs index 2076413c..2c9e9f84 100644 --- a/osu.ElasticIndexer/ScoreItem.cs +++ b/osu.ElasticIndexer/ScoreItem.cs @@ -1,25 +1,17 @@ // Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. -using System.Runtime.Serialization; using osu.Server.QueueProcessor; namespace osu.ElasticIndexer { public class ScoreItem : QueueItem { - public long? ScoreId { get; set; } + public long? ScoreId { get; init; } - public SoloScore? Score { get; set; } - - public ScoreItem() - { - } - - public ScoreItem(SoloScore score) - { - Score = score; - } + // ScoreId is always preferred if present (this property is ignored). + // Note that this is generally not used anymore. Consider removing in the future unless a use case comes up? + public SoloScore? Score { get; init; } public override string ToString() => Score != null ? $"ScoreItem Score: {Score.id}" : $"ScoreItem ScoreId: {ScoreId}"; } From 13ccc1dfcd209a5061557c30cd6e4e0c0e359b8c Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 20 Jul 2022 18:37:04 +0900 Subject: [PATCH 25/31] Rename `IndexQueueItems` to `ProcessableItemsBuffer` --- osu.ElasticIndexer/IndexQueueProcessor.cs | 6 +++--- .../{IndexQueueItems.cs => ProcessableItemsBuffer.cs} | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) rename osu.ElasticIndexer/{IndexQueueItems.cs => ProcessableItemsBuffer.cs} (91%) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 566d6c4f..45ba035f 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -38,7 +38,7 @@ internal IndexQueueProcessor(string index, Client client, Action stopCallback) protected override void ProcessResults(IEnumerable items) { - var buffer = new IndexQueueItems(); + var buffer = new ProcessableItemsBuffer(); // Figure out what to do with the queue item. foreach (var item in items) @@ -75,7 +75,7 @@ protected override void ProcessResults(IEnumerable items) } } - private void addToBuffer(SoloScore score, IndexQueueItems buffer) + private void addToBuffer(SoloScore score, ProcessableItemsBuffer buffer) { if (score.ShouldIndex) buffer.Add.Add(score); @@ -148,7 +148,7 @@ private void handleResponse(BulkResponse response, IEnumerable items) // TODO: per-item errors? } - private void performLookup(IndexQueueItems buffer) + private void performLookup(ProcessableItemsBuffer buffer) { if (!buffer.LookupIds.Any()) return; diff --git a/osu.ElasticIndexer/IndexQueueItems.cs b/osu.ElasticIndexer/ProcessableItemsBuffer.cs similarity index 91% rename from osu.ElasticIndexer/IndexQueueItems.cs rename to osu.ElasticIndexer/ProcessableItemsBuffer.cs index 282318f5..661ab1ed 100644 --- a/osu.ElasticIndexer/IndexQueueItems.cs +++ b/osu.ElasticIndexer/ProcessableItemsBuffer.cs @@ -5,7 +5,7 @@ namespace osu.ElasticIndexer { - public class IndexQueueItems + public class ProcessableItemsBuffer { public readonly HashSet LookupIds = new HashSet(); public readonly List Add = new List(); From 1e8f036470b80a2fa6df28989a56cacb2bc8336d Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 20 Jul 2022 18:48:13 +0900 Subject: [PATCH 26/31] Add xmldoc for `ProcessableItemsBuffer` and change deletions to use `long` --- osu.ElasticIndexer/IndexQueueProcessor.cs | 27 +++++++++++--------- osu.ElasticIndexer/ProcessableItemsBuffer.cs | 18 ++++++++++--- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 45ba035f..2eeb73ec 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Elasticsearch.Net; @@ -46,7 +47,7 @@ protected override void ProcessResults(IEnumerable items) if (item.ScoreId != null) { // doesn't figure out id isn't nullable here... - buffer.LookupIds.Add((long)item.ScoreId); + buffer.ScoreIdsForLookup.Add((long)item.ScoreId); } else if (item.Score != null) { @@ -61,13 +62,15 @@ protected override void ProcessResults(IEnumerable items) // Handle any scores that need a lookup. performLookup(buffer); - if (buffer.Add.Any() || buffer.Remove.Any()) + Debug.Assert(buffer.ScoreIdsForLookup.Count == 0); + + if (buffer.Additions.Any() || buffer.Deletions.Any()) { var bulkDescriptor = new BulkDescriptor() .Index(index) - .IndexMany(buffer.Add) - // type is needed for string ids https://github.com/elastic/elasticsearch-net/issues/3500 - .DeleteMany(buffer.Remove); + .IndexMany(buffer.Additions) + // type is needed for ids https://github.com/elastic/elasticsearch-net/issues/3500 + .DeleteMany(buffer.Deletions); var response = client.ElasticClient.Bulk(bulkDescriptor); @@ -78,9 +81,9 @@ protected override void ProcessResults(IEnumerable items) private void addToBuffer(SoloScore score, ProcessableItemsBuffer buffer) { if (score.ShouldIndex) - buffer.Add.Add(score); + buffer.Additions.Add(score); else - buffer.Remove.Add(score.id.ToString()); + buffer.Deletions.Add(score.id); } private BulkResponse dispatch(BulkDescriptor bulkDescriptor) @@ -150,20 +153,20 @@ private void handleResponse(BulkResponse response, IEnumerable items) private void performLookup(ProcessableItemsBuffer buffer) { - if (!buffer.LookupIds.Any()) return; + if (!buffer.ScoreIdsForLookup.Any()) return; - var scores = ElasticModel.Find(buffer.LookupIds); + var scores = ElasticModel.Find(buffer.ScoreIdsForLookup); foreach (var score in scores) { addToBuffer(score, buffer); - buffer.LookupIds.Remove(score.id); + buffer.ScoreIdsForLookup.Remove(score.id); } // Remaining scores do not exist and should be deleted. - buffer.Remove.AddRange(buffer.LookupIds.Select(id => id.ToString())); + buffer.Deletions.AddRange(buffer.ScoreIdsForLookup); - buffer.LookupIds.Clear(); + buffer.ScoreIdsForLookup.Clear(); } private void waitUntilActive() diff --git a/osu.ElasticIndexer/ProcessableItemsBuffer.cs b/osu.ElasticIndexer/ProcessableItemsBuffer.cs index 661ab1ed..c0c1cf44 100644 --- a/osu.ElasticIndexer/ProcessableItemsBuffer.cs +++ b/osu.ElasticIndexer/ProcessableItemsBuffer.cs @@ -7,8 +7,20 @@ namespace osu.ElasticIndexer { public class ProcessableItemsBuffer { - public readonly HashSet LookupIds = new HashSet(); - public readonly List Add = new List(); - public readonly List Remove = new List(); + /// + /// A set of all score IDs which have arrived but are not yet determined to be an addition or deletion. + /// These should be processed into either or . + /// + public readonly HashSet ScoreIdsForLookup = new HashSet(); + + /// + /// New scores which should be indexed. + /// + public readonly List Additions = new List(); + + /// + /// Score IDs which should be purged from the index is they are present. + /// + public readonly List Deletions = new List(); } } From 5745b0049f645321ebc55ff197e790b885a97122 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 20 Jul 2022 19:00:07 +0900 Subject: [PATCH 27/31] Show deleted document count in list command --- osu.ElasticIndexer/Commands/ListIndicesCommand.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osu.ElasticIndexer/Commands/ListIndicesCommand.cs b/osu.ElasticIndexer/Commands/ListIndicesCommand.cs index 47f5261d..b02c519a 100644 --- a/osu.ElasticIndexer/Commands/ListIndicesCommand.cs +++ b/osu.ElasticIndexer/Commands/ListIndicesCommand.cs @@ -26,7 +26,7 @@ public int OnExecute() var schema = indexState.Mappings.Meta?["schema"]; var aliased = indexState.Aliases.ContainsKey(client.AliasName); - Console.WriteLine($"{record.Index} schema:{schema} aliased:{aliased} {record.Status} {record.DocsCount} {record.PrimaryStoreSize}"); + Console.WriteLine($"{record.Index} schema:{schema} aliased:{aliased} {record.Status} docs {record.DocsCount} deleted {record.DocsDeleted} {record.PrimaryStoreSize}"); } } From 47562059dc18be27080c44710da3c085395ec8db Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 20 Jul 2022 19:01:13 +0900 Subject: [PATCH 28/31] Remove unused method --- osu.ElasticIndexer/IndexQueueProcessor.cs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 2eeb73ec..36204a3e 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -6,7 +6,6 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; -using Elasticsearch.Net; using Nest; using osu.Server.QueueProcessor; @@ -86,23 +85,6 @@ private void addToBuffer(SoloScore score, ProcessableItemsBuffer buffer) buffer.Deletions.Add(score.id); } - private BulkResponse dispatch(BulkDescriptor bulkDescriptor) - { - try - { - return client.ElasticClient.Bulk(bulkDescriptor); - } - catch (ElasticsearchClientException ex) - { - // Server disappeared, maybe network failure or it's restarting; spin until it's available again. - Console.WriteLine(ConsoleColor.Red, ex.Message); - Console.WriteLine(ConsoleColor.Yellow, ex.InnerException?.Message); - waitUntilActive(); - - return dispatch(bulkDescriptor); - } - } - private void handleResponse(BulkResponse response, IEnumerable items) { if (!response.IsValid) From cd149e037901520de01ecb480861840ca22c7a5d Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 20 Jul 2022 19:04:19 +0900 Subject: [PATCH 29/31] Tidy up remaining flow a touch --- osu.ElasticIndexer/IndexQueueProcessor.cs | 29 ++++++++++------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 36204a3e..d72d6191 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -45,12 +45,14 @@ protected override void ProcessResults(IEnumerable items) { if (item.ScoreId != null) { - // doesn't figure out id isn't nullable here... - buffer.ScoreIdsForLookup.Add((long)item.ScoreId); + buffer.ScoreIdsForLookup.Add(item.ScoreId.Value); } else if (item.Score != null) { - addToBuffer(item.Score, buffer); + if (item.Score.ShouldIndex) + buffer.Additions.Add(item.Score); + else + buffer.Deletions.Add(item.Score.id); } else { @@ -58,8 +60,8 @@ protected override void ProcessResults(IEnumerable items) } } - // Handle any scores that need a lookup. - performLookup(buffer); + // Handle any scores that need a lookup from the database. + performDatabaseLookup(buffer); Debug.Assert(buffer.ScoreIdsForLookup.Count == 0); @@ -77,14 +79,6 @@ protected override void ProcessResults(IEnumerable items) } } - private void addToBuffer(SoloScore score, ProcessableItemsBuffer buffer) - { - if (score.ShouldIndex) - buffer.Additions.Add(score); - else - buffer.Deletions.Add(score.id); - } - private void handleResponse(BulkResponse response, IEnumerable items) { if (!response.IsValid) @@ -133,7 +127,7 @@ private void handleResponse(BulkResponse response, IEnumerable items) // TODO: per-item errors? } - private void performLookup(ProcessableItemsBuffer buffer) + private void performDatabaseLookup(ProcessableItemsBuffer buffer) { if (!buffer.ScoreIdsForLookup.Any()) return; @@ -141,13 +135,16 @@ private void performLookup(ProcessableItemsBuffer buffer) foreach (var score in scores) { - addToBuffer(score, buffer); + if (score.ShouldIndex) + buffer.Additions.Add(score); + else + buffer.Deletions.Add(score.id); + buffer.ScoreIdsForLookup.Remove(score.id); } // Remaining scores do not exist and should be deleted. buffer.Deletions.AddRange(buffer.ScoreIdsForLookup); - buffer.ScoreIdsForLookup.Clear(); } From 010542049e299de5072510d41a0e9f597894a4f7 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 21 Jul 2022 20:24:59 +0900 Subject: [PATCH 30/31] handle network errors in dispatch instead of simply failing --- osu.ElasticIndexer/IndexQueueProcessor.cs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index d72d6191..68d55c25 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using Elasticsearch.Net; using Nest; using osu.Server.QueueProcessor; @@ -73,12 +74,29 @@ protected override void ProcessResults(IEnumerable items) // type is needed for ids https://github.com/elastic/elasticsearch-net/issues/3500 .DeleteMany(buffer.Deletions); - var response = client.ElasticClient.Bulk(bulkDescriptor); + var response = dispatch(bulkDescriptor); handleResponse(response, items); } } + private BulkResponse dispatch(BulkDescriptor bulkDescriptor) + { + try + { + return client.ElasticClient.Bulk(bulkDescriptor); + } + catch (ElasticsearchClientException ex) + { + // Server disappeared, maybe network failure or it's restarting; spin until it's available again. + Console.WriteLine(ConsoleColor.Red, ex.Message); + Console.WriteLine(ConsoleColor.Yellow, ex.InnerException?.Message); + waitUntilActive(); + + return dispatch(bulkDescriptor); + } + } + private void handleResponse(BulkResponse response, IEnumerable items) { if (!response.IsValid) From 8af1705eec7d17bf8886ab4a8b0832e259437512 Mon Sep 17 00:00:00 2001 From: bakaneko Date: Thu, 21 Jul 2022 21:29:48 +0900 Subject: [PATCH 31/31] add convert lookup --- osu.ElasticIndexer/SoloScore.cs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/osu.ElasticIndexer/SoloScore.cs b/osu.ElasticIndexer/SoloScore.cs index 12662515..2b8aa214 100644 --- a/osu.ElasticIndexer/SoloScore.cs +++ b/osu.ElasticIndexer/SoloScore.cs @@ -15,9 +15,10 @@ namespace osu.ElasticIndexer [SuppressMessage("Style", "IDE1006")] [ElasticsearchType(IdProperty = nameof(id))] [ChunkOn( - Query = @"s.*, pp, country_acronym AS country_code, user_warnings FROM solo_scores s + Query = @"s.*, pp, country_acronym AS country_code, playmode, user_warnings FROM solo_scores s LEFT JOIN solo_scores_performance ON score_id = s.id - JOIN phpbb_users USING (user_id)", + JOIN phpbb_users USING (user_id) + JOIN osu_beatmaps USING (beatmap_id)", CursorColumn = "s.id", Max = "MAX(id) FROM solo_scores" )] @@ -71,12 +72,16 @@ public string data [Computed] [Boolean] - public bool convert { get; set; } + [JsonIgnore] + public bool convert => ruleset_id != playmode; [Computed] [Boolean] public bool passed => scoreData.passed; + [Ignore] + public int playmode { get; set; } + [Number(NumberType.Float)] public double? pp { get; set; }