Skip to content

Commit

Permalink
Misc fixes to FDM writer (#624)
Browse files Browse the repository at this point in the history
Co-authored-by: cognite-bulldozer[bot] <51074376+cognite-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
einarmo and cognite-bulldozer[bot] authored Apr 18, 2024
1 parent 4041216 commit 7d90aaf
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 13 deletions.
15 changes: 15 additions & 0 deletions Extractor/Config/CogniteConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,20 @@ public ContainerIdentifier ContainerIdentifier(string externalId)
/// Types, and type-related nodes will not be deleted.
/// </summary>
public bool EnableDeletes { get; set; }

/// <summary>
/// Maximum number of parallel instance insertion requests.
/// </summary>
public int InstanceParallelism { get; set; } = 4;

/// <summary>
/// Number of instances per instance create request.
/// </summary>
public int InstanceChunk { get; set; } = 1000;

/// <summary>
/// Number of views and containers per create request.
/// </summary>
public int ModelChunk { get; set; } = 100;
}
}
37 changes: 25 additions & 12 deletions Extractor/Pushers/FDM/FDMWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public class FDMWriter
private ILogger<FDMWriter> log;
private NodeIdContext? context;
private FdmDestinationConfig.ModelInfo modelInfo;
private FdmDestinationConfig fdmConfig;
public FDMWriter(FullConfig config, CogniteDestination destination, ILogger<FDMWriter> log)
{
this.config = config;
this.destination = destination;
this.log = log;
modelInfo = new FdmDestinationConfig.ModelInfo(config.Cognite!.MetadataTargets!.DataModels!);
fdmConfig = config.Cognite!.MetadataTargets!.DataModels!;
}

private async Task IngestInstances(IEnumerable<BaseInstanceWrite> instances, int chunkSize, CancellationToken token)
Expand Down Expand Up @@ -82,7 +84,7 @@ private async Task IngestInstances(IEnumerable<BaseInstanceWrite> instances, int

int taskNum = 0;
await generators.RunThrottled(
4,
fdmConfig.InstanceParallelism,
(_) =>
{
if (chunks.Count > 1)
Expand Down Expand Up @@ -192,7 +194,7 @@ private async Task Initialize(FDMTypeBatch types, CancellationToken token)
var options = new JsonSerializerOptions(Oryx.Cognite.Common.jsonOptions) { WriteIndented = true };

var viewsToInsert = types.Views.Values.ToList();
if (config.Cognite!.MetadataTargets!.DataModels!.SkipSimpleTypes)
if (fdmConfig.SkipSimpleTypes)
{
viewsToInsert = viewsToInsert.Where(v => v.Properties.Count != 0 || types.ViewIsReferenced.GetValueOrDefault(v.ExternalId)).ToList();
}
Expand Down Expand Up @@ -241,15 +243,15 @@ private async Task Initialize(FDMTypeBatch types, CancellationToken token)

log.LogDebug("Creating {Count} containers and {Count2} views", containersToInsert.Count, viewsToInsert.Count);

foreach (var chunk in containersToInsert.ChunkBy(100))
foreach (var chunk in containersToInsert.ChunkBy(fdmConfig.ModelChunk))
{
log.LogDebug("Creating {Count} containers", chunk.Count());
await destination.CogniteClient.Beta.DataModels.UpsertContainers(chunk, token);
}

foreach (var level in viewsToInsert.ChunkByHierarchy(100, v => v.ExternalId, v => v.Implements?.FirstOrDefault()?.ExternalId!))
foreach (var level in viewsToInsert.ChunkByHierarchy(fdmConfig.ModelChunk, v => v.ExternalId, v => v.Implements?.FirstOrDefault()?.ExternalId!))
{
foreach (var chunk in level.ChunkBy(100))
foreach (var chunk in level.ChunkBy(fdmConfig.ModelChunk))
{
log.LogDebug("Creating {Count} views", chunk.Count());
await destination.CogniteClient.Beta.DataModels.UpsertViews(chunk, token);
Expand Down Expand Up @@ -297,7 +299,7 @@ public async Task<bool> PushNodes(
var typeCollector = new NodeTypeCollector(log,
nodes.SelectNonNull(n => n.TypeDefinition).Where(id => !id.IsNullNodeId).ToHashSet(),
typeHierarchy,
config.Cognite!.MetadataTargets!.DataModels!.TypesToMap);
fdmConfig.TypesToMap);

var typeResult = typeCollector.MapTypes();

Expand All @@ -322,6 +324,7 @@ public async Task<bool> PushNodes(
var skipped = new HashSet<NodeId>();
int skippedCount = 0;
log.LogInformation("Filtering {Count} references, removing any non-referenced", references.Count());
var seen = new HashSet<(NodeId, NodeId, NodeId)>();
foreach (var refr in references)
{
if (!refr.IsForward)
Expand All @@ -332,6 +335,8 @@ public async Task<bool> PushNodes(
continue;
}



if (!nodeIds.Contains(refr.Source.Id))
{
log.LogTrace("Missing source node {Node} ({Target})", refr.Source.Id, refr.Target.Id);
Expand All @@ -346,14 +351,22 @@ public async Task<bool> PushNodes(
skippedCount++;
continue;
}
if (!nodeIds.Contains(refr.Type?.Id ?? NodeId.Null))
if (refr.Type == null || !nodeIds.Contains(refr.Type.Id))
{
log.LogTrace("Missing type {Node} ({Source}, {Target})", refr.Type?.Id ?? NodeId.Null, refr.Source.Id, refr.Target.Id);
skipped.Add(refr.Type?.Id ?? NodeId.Null);
skippedCount++;
continue;
}

if (!seen.Add((refr.Source.Id, refr.Type.Id, refr.Target.Id)))
{
log.LogTrace("Skipping reference {Source} {Type} {Target} since it has already been added",
refr.Source.Id, refr.Type.Id, refr.Target.Id);
skippedCount++;
continue;
}

finalReferences.Add(refr);
}

Expand Down Expand Up @@ -405,19 +418,19 @@ public async Task<bool> PushNodes(
await IngestInstances(instanceBuilder.ObjectTypes
.Concat(instanceBuilder.ReferenceTypes)
.Concat(instanceBuilder.DataTypes)
.Concat(typeMeta), 1000, token);
.Concat(typeMeta), fdmConfig.InstanceChunk, token);

// Then ingest variable types
log.LogInformation("Ingesting {Count} variable types", instanceBuilder.VariableTypes.Count);
await IngestInstances(instanceBuilder.VariableTypes, 1000, token);
await IngestInstances(instanceBuilder.VariableTypes, fdmConfig.InstanceChunk, token);

// Ingest instances
log.LogInformation("Ingesting {Count1} objects, {Count2} variables", instanceBuilder.Objects.Count, instanceBuilder.Variables.Count);
await IngestInstances(instanceBuilder.Objects.Concat(instanceBuilder.Variables), 1000, token);
await IngestInstances(instanceBuilder.Objects.Concat(instanceBuilder.Variables), fdmConfig.InstanceChunk, token);

// Finally, ingest edges
log.LogInformation("Ingesting {Count} references", instanceBuilder.References.Count);
await IngestInstances(instanceBuilder.References, 1000, token);
await IngestInstances(instanceBuilder.References, fdmConfig.InstanceChunk, token);

return true;
}
Expand Down Expand Up @@ -587,7 +600,7 @@ await generators.RunThrottled(

public async Task DeleteInFdm(DeletedNodes deletes, SessionContext sessionContext, CancellationToken token)
{
if (!(config.Cognite?.MetadataTargets?.DataModels?.EnableDeletes ?? false)) return;
if (!fdmConfig.EnableDeletes) return;

// First find all edges pointing to or from the nodes we are deleting.
// We pretty much need to do this, since we don't know if anyone has added edges to the nodes.
Expand Down
2 changes: 1 addition & 1 deletion Extractor/Pushers/Writers/RawWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ CancellationToken token
{
if (!keys.Any()) return;
var keySet = new HashSet<string>(keys);
var rows = await WriterUtils.GetRawRows(dbName, tableName, destination, keys, log, token);
var rows = await WriterUtils.GetRawRows(dbName, tableName, destination, null, log, token);
var trueElem = JsonDocument.Parse("true").RootElement;
var toMark = rows.Where(r => keySet.Contains(r.Key)).ToList();
foreach (var row in toMark)
Expand Down
8 changes: 8 additions & 0 deletions manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ schema:
- "https://raw.githubusercontent.com/"

versions:
"2.27.4":
description: Minor fixes to the FDM target prototype.
changelog:
fixed:
- Fixed issue causing errors on duplicate references in FDM writer.
- Fixed bug causing requests to fail on marking many raw rows as deleted.
added:
- Added configuration for controlling FDM chunk sizes.
"2.27.3":
description: Documentation updates.
"2.27.2":
Expand Down

0 comments on commit 7d90aaf

Please sign in to comment.