diff --git a/CentralHub.Api/Services/LocalizationService.cs b/CentralHub.Api/Services/LocalizationService.cs index 7b257db..acd3463 100644 --- a/CentralHub.Api/Services/LocalizationService.cs +++ b/CentralHub.Api/Services/LocalizationService.cs @@ -3,6 +3,7 @@ using CentralHub.Api.Dtos; using CentralHub.Api.Model; using CentralHub.Api.Model.Responses.Room; +using CentralHub.Api.Threading; namespace CentralHub.Api.Services; @@ -37,34 +38,39 @@ public async Task DoWorkAsync(CancellationToken stoppingToken) public async Task AggregateMeasurementsAsync(CancellationToken stoppingToken) { var roomMeasurementGroups = await _aggregatorRepository.GetRoomMeasurementGroupsAsync(stoppingToken); - var measuredRooms = new List(); + var allRooms = await _roomRepository.GetRoomsAsync(stoppingToken); + using var measuredRoomsMutex = new CancellableMutex>(new List()); - foreach (var (roomId, measurementGroups) in roomMeasurementGroups) + await Parallel.ForEachAsync( + roomMeasurementGroups, + stoppingToken, + async (roomMeasurementGroup, stoppingToken) => { - var room = await _roomRepository.GetRoomByIdAsync( - roomId, - stoppingToken); + var room = allRooms.SingleOrDefault(r => r.RoomDtoId == roomMeasurementGroup.Key); if (room == null) { - _logger.LogWarning("Room with id {RoomId} was not found.", roomId); - continue; + _logger.LogWarning("Room with id {RoomId} was not found.", roomMeasurementGroup.Key); + return; } - measuredRooms.Add(room); + + await measuredRoomsMutex.Lock(m => m.Add(room), stoppingToken); await _aggregatorRepository.AddAggregatedMeasurementAsync( - CreateAggregatedMeasurement(room, measurementGroups), + CreateAggregatedMeasurement(room, roomMeasurementGroup.Value), stoppingToken); - } - - var allRooms = await _roomRepository.GetRoomsAsync(stoppingToken); + }); - foreach (var room in allRooms.Where(r => !measuredRooms.Contains(r))) + await measuredRoomsMutex.Lock(async m => { - await _aggregatorRepository.AddAggregatedMeasurementAsync( - CreateAggregatedMeasurement(room, new List()), - stoppingToken); - } + foreach (var room in allRooms.Where(r => !m.Contains(r))) + { + await _aggregatorRepository.AddAggregatedMeasurementAsync( + CreateAggregatedMeasurement(room, new List()), + stoppingToken); + } + }, + stoppingToken); } private static AggregatedMeasurementDto CreateAggregatedMeasurement(RoomDto room, IReadOnlyCollection measurementGroups)