Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Aspire.Hosting.Apache.Pulsar #4146

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "playground\kafk
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "playground\kafka\Producer\Producer.csproj", "{FEE2F9B0-F32D-41B3-8917-0C13DE4F5953}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aspire.Hosting.Apache.Pulsar", "src\Aspire.Hosting.Apache.Pulsar\Aspire.Hosting.Apache.Pulsar.csproj", "{2BE6B31D-25E0-4641-BE98-BF40C1A43204}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "apache-pulsar", "apache-pulsar", "{3CF517C3-3F47-40F6-9330-10E0174A8800}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ApachePulsar.AppHost", "playground\apache-pulsar\ApachePulsar.AppHost\ApachePulsar.AppHost.csproj", "{BC17A942-37AB-4E5A-8C7B-70846AE8934C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ApachePulsar.Api", "playground\apache-pulsar\ApachePulsar.Api\ApachePulsar.Api.csproj", "{484C6267-F5D1-45B4-B458-B12F0EC90884}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1218,6 +1226,18 @@ Global
{FEE2F9B0-F32D-41B3-8917-0C13DE4F5953}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FEE2F9B0-F32D-41B3-8917-0C13DE4F5953}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FEE2F9B0-F32D-41B3-8917-0C13DE4F5953}.Release|Any CPU.Build.0 = Release|Any CPU
{2BE6B31D-25E0-4641-BE98-BF40C1A43204}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2BE6B31D-25E0-4641-BE98-BF40C1A43204}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2BE6B31D-25E0-4641-BE98-BF40C1A43204}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2BE6B31D-25E0-4641-BE98-BF40C1A43204}.Release|Any CPU.Build.0 = Release|Any CPU
{BC17A942-37AB-4E5A-8C7B-70846AE8934C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BC17A942-37AB-4E5A-8C7B-70846AE8934C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BC17A942-37AB-4E5A-8C7B-70846AE8934C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BC17A942-37AB-4E5A-8C7B-70846AE8934C}.Release|Any CPU.Build.0 = Release|Any CPU
{484C6267-F5D1-45B4-B458-B12F0EC90884}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{484C6267-F5D1-45B4-B458-B12F0EC90884}.Debug|Any CPU.Build.0 = Debug|Any CPU
{484C6267-F5D1-45B4-B458-B12F0EC90884}.Release|Any CPU.ActiveCfg = Release|Any CPU
{484C6267-F5D1-45B4-B458-B12F0EC90884}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1439,6 +1459,10 @@ Global
{A39389A0-E780-4B97-808B-DC95CF59B35C} = {920BB263-E68F-4FA2-93FC-2E385EEA405B}
{7AA4C56C-3BB2-4FF0-BB03-F3F0D6A4FDAB} = {920BB263-E68F-4FA2-93FC-2E385EEA405B}
{FEE2F9B0-F32D-41B3-8917-0C13DE4F5953} = {920BB263-E68F-4FA2-93FC-2E385EEA405B}
{2BE6B31D-25E0-4641-BE98-BF40C1A43204} = {B80354C7-BE58-43F6-8928-9F3A74AB7F47}
{3CF517C3-3F47-40F6-9330-10E0174A8800} = {D173887B-AF42-4576-B9C1-96B9E9B3D9C0}
{BC17A942-37AB-4E5A-8C7B-70846AE8934C} = {3CF517C3-3F47-40F6-9330-10E0174A8800}
{484C6267-F5D1-45B4-B458-B12F0EC90884} = {3CF517C3-3F47-40F6-9330-10E0174A8800}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="$(MicrosoftExtensionsHttpResiliencePackageVersion)" />
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="$(MicrosoftExtensionsTimeProviderTestingVersion)" />
<!-- external dependencies -->
<PackageVersion Include="DotPulsar" Version="3.2.1" />
<PackageVersion Include="Confluent.Kafka" Version="2.4.0" />
<PackageVersion Include="Dapper" Version="2.1.37" />
<PackageVersion Include="DnsClient" Version="1.7.0" />
Expand Down
3 changes: 2 additions & 1 deletion NuGet.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<clear />
Expand All @@ -17,6 +17,7 @@
<add key="dotnet8" value="https://pkgs.dev.azure.com/dnceng/public/_packaging/dotnet8/nuget/v3/index.json" />
<add key="dotnet-libraries" value="https://pkgs.dev.azure.com/dnceng/public/_packaging/dotnet-libraries/nuget/v3/index.json" />
<add key="dotnet9-transport" value="https://dnceng.pkgs.visualstudio.com/public/_packaging/dotnet9-transport/nuget/v3/index.json" />
<add key="ISeeIShouldNotBeDoingThisButHowDoIAddOtherPackagesToPlayground" value="https://api.nuget.org/v3/index.json" />
</packageSources>
<packageSourceMapping>
<packageSource key="dotnet9-transport">
Expand Down
20 changes: 20 additions & 0 deletions playground/apache-pulsar/ApachePulsar.Api/ApachePulsar.Api.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<WarningsNotAsErrors>CS8002</WarningsNotAsErrors>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="DotPulsar" />
<PackageReference Include="Swashbuckle.AspNetCore" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Playground.ServiceDefaults\Playground.ServiceDefaults.csproj" />
</ItemGroup>

</Project>
110 changes: 110 additions & 0 deletions playground/apache-pulsar/ApachePulsar.Api/Players.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;

public abstract class Player(
IConsumerBuilder<string> consumerB,
IProducerBuilder<string> producerB,
MatchCoordinator coordinator,
ILogger logger
) : BackgroundService
{
private uint _receivedBalls;
public uint ReceivedBalls => _receivedBalls;

protected abstract string Move { get; }

/// <summary>
/// Kick the ball (message) into opponent field (topic)
/// </summary>
public async Task SmackTheBall(CancellationToken cancellationToken = default)
{
if (coordinator.MatchHalt)
{
logger.LogWarning("Match halted");
return;
}

var producer = producerB.Create();

logger.LogInformation("Sending: {message}", Move);

await Task.Delay(700, cancellationToken); // add some sim...

await producer.Send(new MessageMetadata(), Move, cancellationToken);
}

/// <summary>
/// Observes your own field (topic) for players responses (messages) so you can respond back
/// </summary>
private async Task ReceiveBall(CancellationToken cancellation)
{
var consumer = consumerB.Create();
await foreach (var message in consumer.Messages(cancellation))
{
logger.LogInformation("Received: {message}", message);

Interlocked.Increment(ref _receivedBalls);

await SmackTheBall(cancellation);
}
}

// Listener
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var attempt = 0;
while (true)
{
try
{
await ReceiveBall(stoppingToken);
}
catch (DotPulsarException e)
{
logger.LogWarning("Pulsar is still warming up, retry connection attempt {attempt}.", ++attempt);
logger.LogDebug(e, "Pulsar faulted");

await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}
}

public sealed class PingPlayer(
[FromKeyedServices(typeof(PingPlayer))] IConsumerBuilder<string> consumerB,
[FromKeyedServices(typeof(PingPlayer))] IProducerBuilder<string> producerB,
MatchCoordinator coordinator,
ILogger<PingPlayer> logger
) : Player(consumerB, producerB, coordinator, logger)
{
protected override string Move => "ping";
}

public sealed class PongPlayer(
[FromKeyedServices(typeof(PongPlayer))] IConsumerBuilder<string> consumerB,
[FromKeyedServices(typeof(PongPlayer))] IProducerBuilder<string> producerB,
MatchCoordinator coordinator,
ILogger<PongPlayer> logger
) : Player(consumerB, producerB, coordinator, logger)
{
protected override string Move => "pong";
}

public sealed class MatchCoordinator(ILogger<MatchCoordinator> logger)
{
public bool MatchHalt { get; private set; }

public async Task HaltMatch()
{
MatchHalt = true;
logger.LogWarning("Match halted, match will be able to resume after 3 seconds timeout");

await Task.Delay(TimeSpan.FromSeconds(3));
MatchHalt = false;
}
}
56 changes: 56 additions & 0 deletions playground/apache-pulsar/ApachePulsar.Api/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using DotPulsar;
using DotPulsar.Extensions;
using Microsoft.AspNetCore.Mvc;

var builder = WebApplication.CreateBuilder(args);
var services = builder.AddServiceDefaults().Services.AddEndpointsApiExplorer().AddSwaggerGen();

var pulsarConnection = new Uri(builder.Configuration.GetConnectionString("Pulsar")!);
var pulsarClient = PulsarClient
.Builder()
.ServiceUrl(pulsarConnection)
.Build();

Console.WriteLine($"Pulsar connection string {pulsarConnection}");

var pulsarNamespace = "persistent://public/default";
var pingTopic = $"{pulsarNamespace}/ping-field";
var pongTopic = $"{pulsarNamespace}/pong-field";

// Each player plays (produces) a move (message) into opponents field (topic)
// Each player then responds to opponent moves (messages) being played into their field (topic)

var pingProducerB = pulsarClient.NewProducer(Schema.String).Topic(pongTopic);
var pingConsumerB = pulsarClient.NewConsumer(Schema.String).Topic(pingTopic).SubscriptionName("ping-player");

var pongProducerB = pulsarClient.NewProducer(Schema.String).Topic(pingTopic);
var pongConsumerB = pulsarClient.NewConsumer(Schema.String).Topic(pongTopic).SubscriptionName("pong-player");

services.AddSingleton<MatchCoordinator>();

services
.AddSingleton<PingPlayer>()
.AddHostedService(sp => sp.GetRequiredService<PingPlayer>())
.AddKeyedSingleton(typeof(PingPlayer), (_, _) => pingProducerB)
.AddKeyedSingleton(typeof(PingPlayer), (_, _) => pingConsumerB);

services
.AddSingleton<PongPlayer>()
.AddHostedService(sp => sp.GetRequiredService<PongPlayer>())
.AddKeyedSingleton(typeof(PongPlayer), (_, _) => pongProducerB)
.AddKeyedSingleton(typeof(PongPlayer), (_, _) => pongConsumerB);

var app = builder.Build();

app.UseSwagger().UseSwaggerUI();

app.MapPost("/match/start", async ([FromServices] PingPlayer p) => await p.SmackTheBall()).WithOpenApi();
app.MapPost("/match/stop", async ([FromServices] MatchCoordinator mc) => await mc.HaltMatch()).WithOpenApi();

app.MapGet("/ping-player/received", ([FromServices] PingPlayer p) => Results.Ok(p.ReceivedBalls)).WithOpenApi();
app.MapGet("/pong-player/received", ([FromServices] PongPlayer p) => Results.Ok(p.ReceivedBalls)).WithOpenApi();

app.Run();
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5226",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
9 changes: 9 additions & 0 deletions playground/apache-pulsar/ApachePulsar.Api/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
}
10 changes: 10 additions & 0 deletions playground/apache-pulsar/ApachePulsar.Api/requests.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
@Url=http://localhost:5226

POST {{Url}}/match/start
###
POST {{Url}}/match/stop
###
GET {{Url}}/ping-player/received
###
GET {{Url}}/pong-player/received
###
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsAspireHost>true</IsAspireHost>
</PropertyGroup>

<ItemGroup>
<Compile Include="$(SharedDir)KnownResourceNames.cs" Link="KnownResourceNames.cs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Aspire.Dashboard\Aspire.Dashboard.csproj" />
<ProjectReference Include="..\..\..\src\Aspire.Hosting.Azure\Aspire.Hosting.Azure.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\..\src\Aspire.Hosting.AppHost\Aspire.Hosting.AppHost.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\..\src\Aspire.Hosting.Apache.Pulsar\Aspire.Hosting.Apache.Pulsar.csproj" IsAspireProjectResource="false" />

<ProjectReference Include="..\ApachePulsar.Api\ApachePulsar.Api.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="application.properties">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="bkvm.conf">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<Project>

<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

<!-- NOTE: This line is only required because we are using P2P references, not NuGet. It will not exist in real apps. -->
<Import Project="../../../src/Aspire.Hosting.AppHost/build/Aspire.Hosting.AppHost.props" />

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project>

<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.targets', '$(MSBuildThisFileDirectory)../'))" />

<!-- NOTE: These lines are only required because we are using P2P references, not NuGet. They will not exist in real apps. -->
<Import Project="..\..\..\src\Aspire.Hosting.AppHost\build\Aspire.Hosting.AppHost.targets" />
<Import Project="..\..\..\src\Aspire.Hosting.Sdk\SDK\Sdk.targets" />

</Project>
32 changes: 32 additions & 0 deletions playground/apache-pulsar/ApachePulsar.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

var builder = DistributedApplication.CreateBuilder(args);

var pulsar = builder
.AddPulsar(
name: "pulsar",
targetPort: 8080,
port: 6650
)
.WithPulsarManager(
name: "pulsar-manager",
frontendPort: 9527,
backendPort: 7750,
configureContainer: c => c
.WithApplicationProperties()
.WithDefaultEnvironment("pulsar-playground")
);

builder.AddProject<Projects.ApachePulsar_Api>("api")
.WithExternalHttpEndpoints()
.WithReference(pulsar);

// This project is only added in playground projects to support development/debugging
// of the dashboard. It is not required in end developer code. Comment out this code
// to test end developer dashboard launch experience. Refer to Directory.Build.props
// for the path to the dashboard binary (defaults to the Aspire.Dashboard bin output
// in the artifacts dir).
builder.AddProject<Projects.Aspire_Dashboard>(KnownResourceNames.AspireDashboard);

builder.Build().Run();
Loading
Loading