Skip to content

Commit

Permalink
finalized workflow-module
Browse files Browse the repository at this point in the history
  • Loading branch information
bindsi committed Jun 26, 2024
1 parent 0470696 commit 06ce593
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

// Additional configuration is required to successfully run gRPC on macOS.
// For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682
// builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5001, op => op.Protocols =
// HttpProtocols.Http2));
builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5001, op => op.Protocols =
HttpProtocols.Http2));

// Add services to the container.
builder.Services.AddGrpc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
{
using System;
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Workflow;
using Distributed.IoT.Edge.WorkflowModule;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
using Dapr.Workflow;
using Distributed.IoT.Edge.WorkflowModule;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

public class PublishActivity : WorkflowActivity<string, bool>
{
private readonly ILogger<PublishActivity> _logger;
private readonly DaprClient _daprClient;
private readonly string _senderPubsubName;
private readonly string _senderPubsubTopicName;
private readonly string? _senderPubsubName;
private readonly string? _senderPubsubTopicName;

public PublishActivity(
ILogger<PublishActivity> logger,
Expand All @@ -28,14 +27,8 @@ public PublishActivity(
throw new ArgumentNullException(nameof(parameters));
}

_senderPubsubName = parameters.SenderPubSubName ??
throw new ArgumentNullException(
nameof(parameters.SenderPubSubName),
"Parameter cannot be null.");
_senderPubsubTopicName = parameters.SenderPubSubTopicName ??
throw new ArgumentNullException(
nameof(parameters.SenderPubSubTopicName),
"Parameter cannot be null.");
_senderPubsubName = parameters.ReceiverPubSubName;
_senderPubsubTopicName = parameters.SenderPubSubTopicName;

_logger.LogTrace($"Entering: {nameof(PublishActivity)}");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: local-pub-sub
spec:
type: pubsub.mqtt3
version: v1
metadata:
- name: url
value: mqtt://localhost:1883
- name: retain
value: "false"
- name: cleanSession
value: "false"
- name: qos
value: "1"
- name: consumerID
value: "workflow-module"
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
initTimeout: 1m
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
Expand All @@ -14,21 +14,23 @@
<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.7.5" />
<PackageReference Include="CommandLineParser" Version="2.9.1" />
<PackageReference Include="Dapr.AspNetCore" Version="1.10.0" />
<PackageReference Include="Dapr.Workflow" Version="1.10.0" />
<PackageReference Include="Google.Api.CommonProtos" Version="2.8.0" />
<PackageReference Include="Google.Protobuf" Version="3.22.0" />
<PackageReference Include="Grpc.AspNetCore" Version="2.51.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.51.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.0" />
<PackageReference Include="Dapr.AspNetCore" Version="1.13.*-*" />
<PackageReference Include="Dapr.Workflow" Version="1.13.*-*" />
<PackageReference Include="Google.Api.CommonProtos" Version="2.15.0" />
<PackageReference Include="Google.Protobuf" Version="3.27.1" />
<PackageReference Include="Grpc.AspNetCore" Version="2.63.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.63.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets"
Version="1.20.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Distributed.IoT.Edge.Common\Distributed.IoT.Edge.Common.csproj" />
<ProjectReference
Include="..\Distributed.IoT.Edge.Common\Distributed.IoT.Edge.Common.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Microsoft.CodeAnalysis.NetAnalyzers" Version="7.0.1 ">
<PackageReference Update="Microsoft.CodeAnalysis.NetAnalyzers" Version="8.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand All @@ -39,4 +41,4 @@
</Content>
</ItemGroup>

</Project>
</Project>
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// Local run cmd line.
// dapr run --app-id workflow-module --app-protocol grpc --app-port 5000 --resources-path=../../../deployment/helm/iot-edge-accelerator/templates/dapr -- dotnet run -- --receiverPubSubName "local-pub-sub" --receiverPubSubTopicName "telemetry" --senderPubSubName "local-pub-sub" --senderPubSubTopicName "enriched-telemetry"
// dapr run --app-id workflow-module --app-protocol grpc --app-port 5002 --resources-path=./Components --dapr-grpc-port 5005 -- dotnet run -- --receiverPubSubName "local-pub-sub-2" --receiverPubSubTopicName "telemetry" --senderPubSubName "local-pub-sub" --senderPubSubTopicName "enriched-telemetry"
// mosquitto_pub -h localhost -p 1883 -t telemetry -m '{"ambient":{"temperature":10}}'
// mosquitto_sub -h localhost -p 1883 -t enriched-telemetry
// dapr publish --publish-app-id workflow-module --pubsub local-pub-sub --topic telemetry --data '{"ambient":{"temperature":10}}' --metadata '{"rawPayload":"true"}'

using System.Collections.Immutable;
using CommandLine;
using Dapr.Client;
using Dapr.Workflow;
using Distributed.IoT.Edge.WorkflowModule;
using Distributed.IoT.Edge.WorkflowModule.Services;
using Distributed.IoT.Edge.WorkflowModule.Workflows;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection.Extensions;
using WorkflowConsoleApp.Activities;

// Environment.SetEnvironmentVariable("DAPR_GRPC_PORT", "50001");
Expand All @@ -31,28 +31,27 @@
builder.Services.AddSingleton<WorkflowParameters>(sp => parameters);

// Already registered by AddDaprWorkflow extension
builder.Services.AddSingleton<DaprClient>(new DaprClientBuilder().Build());
//builder.Services.AddSingleton<DaprClient>(new DaprClientBuilder().Build());
builder.Services.AddTransient<SubscriptionService>(
sp => new SubscriptionService(
sp.GetRequiredService<ILogger<SubscriptionService>>(),
sp.GetRequiredService<DaprClient>(),
sp.GetRequiredService<WorkflowEngineClient>(),
sp.GetRequiredService<DaprWorkflowClient>(),
parameters));
})
.WithNotParsed(errors =>
{
Environment.Exit(1);
});

// builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5001, op => op.Protocols =
// HttpProtocols.Http2));
builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5002, op => op.Protocols =
HttpProtocols.Http2));

// Additional configuration is required to successfully run gRPC on macOS.
// For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682
builder.Services.AddGrpc();

var app = builder.Build();

app.UseRouting();
// Configure the HTTP request pipeline.
app.MapGrpcService<SubscriptionService>();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,33 @@
using Dapr.AppCallback.Autogen.Grpc.v1;
using Dapr.Client;
using Dapr.Client.Autogen.Grpc.v1;
using Dapr.Workflow;
using Distributed.IoT.Edge.WorkflowModule.Workflows;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;

namespace Distributed.IoT.Edge.WorkflowModule.Services
{
using Dapr.AppCallback.Autogen.Grpc.v1;
using Dapr.Workflow;
using Distributed.IoT.Edge.WorkflowModule.Workflows;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;

public class SubscriptionService : AppCallback.AppCallbackBase
{
private readonly ILogger<SubscriptionService> _logger;
private readonly DaprClient _daprClient;
private readonly WorkflowEngineClient _workflowClient;
private readonly DaprWorkflowClient _workflowClient;
private readonly string _receiverPubsubName;
private readonly string _receiverPubsubTopicName;

public SubscriptionService(
ILogger<SubscriptionService> logger,
DaprClient daprClient,
WorkflowEngineClient workflowClient,
DaprWorkflowClient workflowClient,
WorkflowParameters parameters)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient));
_workflowClient = workflowClient ?? throw new ArgumentNullException(nameof(workflowClient));
_workflowClient = workflowClient ?? throw new ArgumentNullException(nameof(workflowClient));
if (parameters == null)
{
throw new ArgumentNullException(nameof(parameters));
}

_receiverPubsubName = parameters.ReceiverPubSubName ??
throw new ArgumentNullException(
nameof(parameters.ReceiverPubSubName),
"Parameter cannot be null.");
_receiverPubsubTopicName = parameters.ReceiverPubSubTopicName ??
throw new ArgumentNullException(
nameof(parameters.ReceiverPubSubTopicName),
"Parameter cannot be null.");
_receiverPubsubName = parameters.ReceiverPubSubName;
_receiverPubsubTopicName = parameters.ReceiverPubSubTopicName;

}

public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(
Expand Down Expand Up @@ -68,30 +58,30 @@ public override async Task<TopicEventResponse> OnTopicEvent(
throw new ArgumentNullException(nameof(request));
}

var topicString = request.Data.ToStringUtf8();
_logger.LogTrace($"requestPath: {request.Path}");
var topicString = request.Extensions.ToString();
_logger.LogTrace($"Sending event to workflow, object json: {topicString}");

var instanceId = Guid.NewGuid().ToString();
// starting workflow to enrich and transform the data
await _workflowClient.ScheduleNewWorkflowAsync(
name: nameof(EnrichTelemetryWorkflow),
instanceId: request.Id,
instanceId: instanceId,
input: topicString);

// Wait a second to allow workflow to start
await Task.Delay(TimeSpan.FromSeconds(1));
WorkflowState state = await _workflowClient.GetWorkflowStateAsync(
instanceId: request.Id,
instanceId: instanceId,
getInputsAndOutputs: true);

_logger.LogTrace($"Your workflow {request.Id} has started. Here is the status of the workflow: {state.RuntimeStatus}");
while (!state.IsWorkflowCompleted)
{
await Task.Delay(TimeSpan.FromSeconds(5));
state = await _workflowClient.GetWorkflowStateAsync(
instanceId: request.Id,
instanceId: instanceId,
getInputsAndOutputs: true);
_logger.LogTrace($"State of workflow {request.Id} is: {state.RuntimeStatus}");
_logger.LogTrace($"State of workflow {instanceId} is: {state.RuntimeStatus}");
}

// Depending on the status return dapr side will either retry or drop the message from underlying pubsub.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using CommandLine;

namespace Distributed.IoT.Edge.WorkflowModule
namespace Distributed.IoT.Edge.WorkflowModule
{
using CommandLine;

public class WorkflowParameters
{
[Option(
Expand Down

0 comments on commit 06ce593

Please sign in to comment.