An event bus abstraction over Dapr pub/sub.
- .NET Core SDK (6.0 or greater)
- Docker Desktop
- MongoDB Docker:
docker run --name mongo -d -p 27017:27017 -v /tmp/mongo/data:/data/db mongo
- MongoDB Client:
- Download Studio 3T.
- Add connection to localhost on port 27017.
- .NET Aspire Workload
dotnet workload update dotnet workload install aspire dotnet workload list
Dapr, which stands for Distributed Application Runtime, uses a sidecar pattern to provide a pub/sub abstraction over message brokers and queuing systems, including AWS SNS+SQS, GCP Pub/Sub, Azure Events Hub and several others.
The Dapr .NET SDK provides an API to perform pub/sub from an ASP.NET service, but it requires the application to be directly aware of Dapr. Publishers need to use DaprClient
to publish an event, and subscribers need to decorate controller actions with the Topic
attribute.
The purpose of the Dapr Event Bus project is to provide a thin abstraction layer over Dapr pub/sub so that applications may publish events and subscribe to topics without any knowledge of Dapr. This allows for better testability and flexibility, especially for worker services that do not natively include an HTTP stack.
- EventDriven.EventBus.Abstractions
- EventDriven.EventBus.Dapr
- EventDriven.EventBus.EventCache.Mongo
- EventDriven.SchemaRegistry.Mongo
-
In both the publisher and subscriber, you need to register the Dapr Event Bus with DI.
- First add the following to appsettings.json.
"DaprEventBusOptions": { "PubSubName": "pubsub" }, "MongoEventCacheOptions": { "AppName": "subscriber" }, "MongoStoreDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "daprStore", "CollectionName": "daprCollection" }, "DaprEventBusSchemaOptions": { "UseSchemaRegistry": true, "SchemaValidatorType": "Json", "SchemaRegistryType": "Mongo", "AddSchemaOnPublish": true, "MongoStateStoreOptions": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "schema-registry", "SchemasCollectionName": "schemas" } }
- Call
builder.Services.AddDaprEventBus
inProgram
. - Then call
builder.Services.AddRedisEventCache
.
builder.Services.AddDaprEventBus(builder.Configuration); builder.Services.AddRedisEventCache(builder.Configuration);
-
Define a C# record that extends
IntegrationEvent
. For example, the followingWeatherForecastEvent
record does so by adding aWeatherForecasts
property.public record WeatherForecastEvent(IEnumerable<WeatherForecast> WeatherForecasts) : IntegrationEvent;
-
In the publisher inject
IEventBus
into the constructor of a controller (Web API projects) or worker class (Worker Service projects). Then callEventBus.PublishAsync
, passing the event you defined in step 2.public class Worker : BackgroundService { public Worker(IEventBus eventBus) { _eventBus = eventBus; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { // Publish event await _eventBus.PublishAsync(new WeatherForecastEvent(weathers)); // Pause await Task.Delay(5000, stoppingToken); } } }
-
In the subscriber create the same
IntegrationEvent
derived class as in the publisher. Then create an event handler that extendsIntegrationEventHandler<TIntegrationEvent>
whereTIntegrationEvent
is the event type you defined earlier.- Override
HandleAsync
to perform a task when an event is received. - For example,
WeatherForecastEventHandler
setsWeatherForecasts
onWeatherForecastRepository
to theWeatherForecasts
property ofWeatherForecastEvent
.
public class WeatherForecastEventHandler : IntegrationEventHandler<WeatherForecastEvent> { private readonly WeatherForecastRepository _weatherRepo; public WeatherForecastEventHandler(WeatherForecastRepository weatherRepo) { _weatherRepo = weatherRepo; } public override Task HandleAsync(WeatherForecastEvent @event) { _weatherRepo.WeatherForecasts = @event.WeatherForecasts; return Task.CompletedTask; } }
- Override
-
Lastly, in
app.UseEndpoints
callendpoints.MapDaprEventBus
, passing an action that subscribes toDaprEventBus
events with one or more event handlers.- Also call
app.UseRouting
,app.UseCloudEvents
,endpoints.MapSubscribeHandler
.
app.UseRouting(); app.UseCloudEvents(); #pragma warning disable ASP0014 // Need to use endpoints to map event bus app.UseEndpoints(endpoints => { endpoints.MapControllers(); endpoints.MapSubscribeHandler(); endpoints.MapDaprEventBus(eventBus => { // Subscribe with a handler var forecastEventHandler = app.Services.GetRequiredService<WeatherForecastEventHandler>(); eventBus.Subscribe(forecastEventHandler, null, "v1"); }); }); #pragma warning restore ASP0014
- Also call
When you enable Schema Registry for the Dapr Event Bus, messages sent to the Event Bus will be validated using schemas registered for a given topic. By default Json Schema will be used to validate messages (other schema types may be supported in the future). This helps ensure that message schemas will not change in a way that will cause deserialization errors when consumers receive messages for a specific topic.
Note: Schema evolution rules for Json allow the addition of fields, which are then ignored by consumers. If fields are not required, they can be omitted and consumers will get default values when they deserialize messages.
The Schema Registry only validates messages when they are published to the Event Bus. Therefore, it is only necessary to enable Schema Registry for publishers, not subscribers.
UseSchemaRegistry
enables use of the Schema Registry. SchemaValidatorType
specifies the type of schema validator to use (the default is Json
). AddSchemaOnPublish
will add a generated schema to the Schema Registry if no schema has been previously registered for a given topic.
Note: EventDriven.SchemaValidator.Json uses
JSchemaGenerator
from Newtonsoft.Json.Schema.Generation, which makes all fields required by default. To make fields optional, you need to use EventDriven.SchemaRegistry.Api to update the schema by removing required fields.
To view all the registered schemas you can connect to the schema datastore directly, for example, using a MongoDB client such as Robot 3T.
It is recommended you enable schema registry to so that subscribers may be protected from breakage should a publisher change an event schema in a way that is not backwards compatible.
However, if you wish to disable schema registry in a publisher, you may do so as follows.
- Update the
DaprEventBusSchemaOptions
section in your appsettings.json file by settingUseSchemaRegistry
tofalse
. - Or simply remove the
DaprEventBusSchemaOptions
section altogether.
It is recommended you keep event cache enabled to make your subscriber idempotent and filter out duplicate events. This is the default behavior.
However, if you wish to disable event cache in a subscriber, you may do so as follows.
- Add a
MongoEventCacheOptions
section in your appsettings.json file and setEnableEventCache
tofalse
.
The samples folder contains two sample applications which use the Dapr Event Bus: SimplePubSub and DuplexPubSub.
- The SimplePubSub sample contains two projects: Publisher and Subscriber. Every 5 seconds the Publisher creates a new set of weather forecasts and publishes them to the event bus. The Subscriber subscribes to the event by setting the
WeatherForecasts
property ofWeatherForecastRepository
, which is returned by theGet
method ofWeatherForecastController
. - The DuplexPubSub sample contains four projects: Frontend, Backend, WeatherGenerator and Common. The Backend publishes a
WeatherForecastRequestedEvent
to the event bus in theGet
method of theWeatherForecastController
. The WebGenerator handles the event by creating a set of weather forecasts and publishing them to the event bus with aWeatherForecastGeneratedEvent
, which is handled by the Backend by setting theWeatherForecasts
property of theWeatherForecastRepository
, so that new weather forecasts are returned by theWeatherForecastController
. The Frontend initiates the pub/sub process by using anHttpClient
to call the Backend when the user clicks the "Get Weather Forecasts" button.
The EventDriven.EventBus.Abstractions package includes interfaces and abstract classes which provide an abstraction layer for interacting with any messsaging subsystem. This allows you to potentially exchange the Dapr implementation with another one without altering application code.
This package contains an IEventBus
interface implemented by an EventBus
abstract class.
public interface IEventBus
{
Dictionary<string, List<IIntegrationEventHandler>> Topics { get; }
void Subscribe(
IIntegrationEventHandler handler,
string topic = null,
string prefix = null);
void UnSubscribe(
IIntegrationEventHandler handler,
string topic = null,
string prefix = null);
Task PublishAsync<TIntegrationEvent>(
TIntegrationEvent @event,
string topic = null,
string prefix = null)
where TIntegrationEvent : IIntegrationEvent;
}
When a subscriber calls Subscribe
, it passes a class that extends IntegrationEventHandler
, which implements IIntegrationEventHandler
. The event handler is added to a topic which can have one more handlers. A topic name may be specified explicitly, as well as a prefix which may contain a version number. There are non-generic and generic versions of IIntegrationEventHandler
.
public interface IIntegrationEventHandler
{
string Topic { get; set; }
Task HandleAsync(IIntegrationEvent @event);
}
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
where TIntegrationEvent : IIntegrationEvent
{
Task HandleAsync(TIntegrationEvent @event);
}
The generic version of IntegrationEventHandler
includes a TIntegrationEvent
type argument that must implement IIntegrationEvent
. The IntegrationEvent
abstract record provides defaults for both Id
and CreationDate
properties.
public interface IIntegrationEvent
{
string Id { get; }
DateTime CreationDate { get; }
}
public abstract record IntegrationEvent : IIntegrationEvent
{
public string Id { get; init; } = Guid.NewGuid().ToString();
public DateTime CreationDate { get; init; } = DateTime.UtcNow;
}
The EventDriven.EventBus.Dapr package has a DaprEventBus
class that extends EventBus
by injecting DaprClient
. It also injects DaprEventBusOptions
for the pubsub component name needed by DaprClient.PublishAsync
. The event topic defaults to the type name of the the event, but it can also be supplied explicitly.
public class DaprEventBus : EventBus
{
private readonly IOptions<DaprEventBusOptions> _options;
private readonly DaprClient _dapr;
public DaprEventBus(IOptions<DaprEventBusOptions> options, DaprClient dapr)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_dapr = dapr ?? throw new ArgumentNullException(nameof(dapr));
}
public override async Task PublishAsync<TIntegrationEvent>(
TIntegrationEvent @event,
string topic = null,
string prefix = null)
{
if (@event is null) throw new ArgumentNullException(nameof(@event));
var topicName = GetTopicName(@event.GetType(), topic, prefix);
await _dapr.PublishEventAsync(_options.Value.PubSubName, topicName, @event);
}
}
The ServiceCollectionExtensions
class has a AddDaprEventBus
method that registers DaprClient
and DaprEventBus
, and it configures DaprEventBusOptions
for specifying the PubSubName
option.
The DaprEventBusEndpointRouteBuilderExtensions
class has a MapDaprEventBus
method that allows the caller to subscribe to DaprEventBus
by adding handlers. It maps an HTTP Post endpoint for each event bus topic that is called by Dapr when a message is sent to the registered pub/sub component. The default component Redis, but Dapr can be configured to use another message broker, such as AWS SNS+SQS.