Skip to content

Commit

Permalink
Make event usage a distributed cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
ejsmith committed Oct 23, 2024
1 parent 61abb71 commit e9512dd
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 101 deletions.
33 changes: 17 additions & 16 deletions src/Exceptionless.Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using Exceptionless.Serializer;
using FluentValidation;
using Foundatio.Caching;
using Foundatio.Extensions.Hosting.Cronos;
using Foundatio.Extensions.Hosting.Jobs;
using Foundatio.Extensions.Hosting.Startup;
using Foundatio.Jobs;
Expand Down Expand Up @@ -252,22 +253,22 @@ private static async Task CreateSampleDataAsync(IServiceProvider container)

public static void AddHostedJobs(IServiceCollection services, ILoggerFactory loggerFactory)
{
services.AddJob<CloseInactiveSessionsJob>(o => o.WaitForStartupActions(true));
services.AddJob<DailySummaryJob>(o => o.WaitForStartupActions(true));
services.AddJob<EventNotificationsJob>(o => o.WaitForStartupActions(true));
services.AddJob<EventPostsJob>(o => o.WaitForStartupActions(true));
services.AddJob<EventUserDescriptionsJob>(o => o.WaitForStartupActions(true));
services.AddJob<MailMessageJob>(o => o.WaitForStartupActions(true));
services.AddJob<StackStatusJob>(o => o.WaitForStartupActions(true));
services.AddJob<StackEventCountJob>(o => o.WaitForStartupActions(true));
services.AddJob<WebHooksJob>(o => o.WaitForStartupActions(true));
services.AddJob<WorkItemJob>(o => o.WaitForStartupActions(true));
services.AddJob<EventUsageJob>(o => o.WaitForStartupActions(true));

services.AddCronJob<CleanupDataJob>("30 */4 * * *");
services.AddCronJob<CleanupOrphanedDataJob>("45 */8 * * *");
services.AddCronJob<DownloadGeoIPDatabaseJob>("0 1 * * *");
services.AddCronJob<MaintainIndexesJob>("10 */2 * * *");
services.AddJob<CloseInactiveSessionsJob>(o => o.WaitForStartupActions());
services.AddJob<DailySummaryJob>(o => o.WaitForStartupActions());
services.AddJob<EventNotificationsJob>(o => o.WaitForStartupActions());
services.AddJob<EventPostsJob>(o => o.WaitForStartupActions());
services.AddJob<EventUserDescriptionsJob>(o => o.WaitForStartupActions());
services.AddJob<MailMessageJob>(o => o.WaitForStartupActions());
services.AddJob<StackStatusJob>(o => o.WaitForStartupActions());
services.AddJob<StackEventCountJob>(o => o.WaitForStartupActions());
services.AddJob<WebHooksJob>(o => o.WaitForStartupActions());
services.AddJob<WorkItemJob>(o => o.WaitForStartupActions());

services.AddDistributedCronJob<EventUsageJob>(Cron.Minutely());
services.AddDistributedCronJob<CleanupDataJob>("30 */4 * * *");
services.AddDistributedCronJob<CleanupOrphanedDataJob>("45 */8 * * *");
services.AddDistributedCronJob<DownloadGeoIPDatabaseJob>(Cron.Daily(1));
services.AddDistributedCronJob<MaintainIndexesJob>("10 */2 * * *");

var logger = loggerFactory.CreateLogger<Bootstrapper>();
logger.LogWarning("Jobs running in process");
Expand Down
4 changes: 4 additions & 0 deletions src/Exceptionless.Core/Services/UsageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private async Task SavePendingOrganizationUsageAsync(DateTime utcNow)
if (lastUsageSaveCache.HasValue)
lastUsageSave = lastUsageSaveCache.Value.Add(_bucketSize);

_logger.LogInformation("Saving organization usage starting from: {LastUsageSave}...", lastUsageSave);

var bucketUtc = lastUsageSave;
var currentBucketUtc = utcNow.Floor(_bucketSize);

Expand Down Expand Up @@ -127,6 +129,8 @@ private async Task SavePendingProjectUsageAsync(DateTime utcNow)
if (lastUsageSaveCache.HasValue)
lastUsageSave = lastUsageSaveCache.Value.Add(_bucketSize);

_logger.LogInformation("Saving project usage starting from: {LastUsageSave}...", lastUsageSave);

var bucketUtc = lastUsageSave;
var currentBucketUtc = utcNow.Floor(_bucketSize);

Expand Down
86 changes: 1 addition & 85 deletions src/Exceptionless.Web/ApmExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ public static IHostBuilder AddApm(this IHostBuilder builder, ApmConfig config)
b.AddConsoleExporter();

if (config.EnableExporter)
b.AddFilteredOtlpExporter(c =>
{
c.Filter = a => a.Duration > TimeSpan.FromMilliseconds(config.MinDurationMs) || a.GetTagItem("db.system") is not null;
});
b.AddOtlpExporter();
});

services.AddOpenTelemetry().WithMetrics(b =>
Expand Down Expand Up @@ -177,84 +174,3 @@ public ApmConfig(IConfigurationRoot config, string processName, string? serviceV
public bool Debug => _apmConfig.GetValue("Debug", false);
public bool Console => _apmConfig.GetValue("Console", false);
}

public sealed class CustomFilterProcessor : CompositeProcessor<Activity>
{
private readonly Func<Activity, bool>? _filter;

public CustomFilterProcessor(BaseProcessor<Activity> processor, Func<Activity, bool>? filter) : base(new[] { processor })
{
_filter = filter;
}

public override void OnEnd(Activity activity)
{
if (_filter is null || _filter(activity))
base.OnEnd(activity);
}
}

public static class CustomFilterProcessorExtensions
{
public static TracerProviderBuilder AddFilteredOtlpExporter(this TracerProviderBuilder builder, Action<FilteredOtlpExporterOptions>? configure = null)
{
ArgumentNullException.ThrowIfNull(builder);

if (builder is IDeferredTracerProviderBuilder deferredTracerProviderBuilder)
{
return deferredTracerProviderBuilder.Configure((sp, providerBuilder) =>
{
var oltpOptions = sp.GetService<IOptions<FilteredOtlpExporterOptions>>()?.Value ?? new FilteredOtlpExporterOptions();
AddFilteredOtlpExporter(providerBuilder, oltpOptions, configure, sp);
});
}

return AddFilteredOtlpExporter(builder, new FilteredOtlpExporterOptions(), configure, serviceProvider: null);
}

internal static TracerProviderBuilder AddFilteredOtlpExporter(
TracerProviderBuilder builder,
FilteredOtlpExporterOptions exporterOptions,
Action<FilteredOtlpExporterOptions>? configure,
IServiceProvider? serviceProvider,
Func<BaseExporter<Activity>, BaseExporter<Activity>>? configureExporterInstance = null)
{

configure?.Invoke(exporterOptions);

exporterOptions.TryEnableIHttpClientFactoryIntegration(serviceProvider, "OtlpTraceExporter");

BaseExporter<Activity> otlpExporter = new OtlpTraceExporter(exporterOptions);

if (configureExporterInstance is not null)
otlpExporter = configureExporterInstance(otlpExporter);

if (exporterOptions.ExportProcessorType == ExportProcessorType.Simple)
{
return builder.AddProcessor(new CustomFilterProcessor(new SimpleActivityExportProcessor(otlpExporter), exporterOptions.Filter));
}

var batchOptions = exporterOptions.BatchExportProcessorOptions ?? new();
return builder.AddProcessor(new CustomFilterProcessor(new BatchActivityExportProcessor(
otlpExporter,
batchOptions.MaxQueueSize,
batchOptions.ScheduledDelayMilliseconds,
batchOptions.ExporterTimeoutMilliseconds,
batchOptions.MaxExportBatchSize), exporterOptions.Filter));
}

public static void TryEnableIHttpClientFactoryIntegration(this OtlpExporterOptions options, IServiceProvider? serviceProvider, string httpClientName)
{
// use reflection to call the method
var exporterExtensionsType = typeof(OtlpExporterOptions).Assembly.GetType("OpenTelemetry.Exporter.OtlpExporterOptionsExtensions");
exporterExtensionsType?.GetMethod("TryEnableIHttpClientFactoryIntegration")?.Invoke(null, [options,
serviceProvider!,
httpClientName
]);
}
}

public class FilteredOtlpExporterOptions : OtlpExporterOptions
{
public Func<Activity, bool>? Filter { get; set; }
}

0 comments on commit e9512dd

Please sign in to comment.