Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSPS-140 Send email notifications for succeeded and failed jobs #181

Merged
merged 28 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8974dda
wip
mmorgantaylor Dec 11, 2024
d35f5fe
resolve merge conflicts
mmorgantaylor Dec 19, 2024
8ee74b7
resolve merge conflicts
mmorgantaylor Dec 19, 2024
a72e8e1
add failed notification hook
mmorgantaylor Dec 16, 2024
6a8751c
add log message for topic notification
mmorgantaylor Dec 16, 2024
11ad24c
add catch for error in PubsubService
mmorgantaylor Dec 16, 2024
b40ce69
add another log
mmorgantaylor Dec 16, 2024
104e73d
don't try to create topic
mmorgantaylor Dec 16, 2024
8c708bc
add back @PostConstruct
mmorgantaylor Dec 16, 2024
3a24d4b
remove createTopic PostConstruct
mmorgantaylor Dec 16, 2024
ae3ea1b
shut down pubsub publisher after sending message
mmorgantaylor Dec 17, 2024
c6b9cba
add log for projectId and topicId
mmorgantaylor Dec 17, 2024
8a08658
use the correct env var :facepalm:
mmorgantaylor Dec 17, 2024
5ffb6e4
update gha versions, some misc cleanup
mmorgantaylor Dec 18, 2024
3b6f617
things working, some tests
mmorgantaylor Dec 18, 2024
392cb7b
resolve conflicts
mmorgantaylor Dec 19, 2024
f60ac84
resolve conflicts
mmorgantaylor Dec 19, 2024
ed60cb6
resolve conflicts
mmorgantaylor Dec 19, 2024
c7fb87b
dumb test don't even try
mmorgantaylor Dec 18, 2024
eff5b8a
update db changelog, try creating publisher once
mmorgantaylor Dec 19, 2024
e10a1f3
refactor notification classes, other pr comments
mmorgantaylor Dec 19, 2024
7d35e9b
add step expectations of input params and working map
mmorgantaylor Dec 19, 2024
140e677
add test for initPublisher
mmorgantaylor Dec 19, 2024
61f3525
add test, try to get logging working properly with ApiFutureCallback
mmorgantaylor Dec 19, 2024
34815b6
try another logging thing
mmorgantaylor Dec 19, 2024
8bb9531
suppress unboxing warning
mmorgantaylor Dec 19, 2024
7fd29a3
wait up to 30 seconds to retrieve pubsub message publishing result
mmorgantaylor Dec 19, 2024
6e92a56
improve tests by removing some any() inputs to mocks
mmorgantaylor Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/tag-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ jobs:

- name: Auth to GCP
id: 'auth'
uses: google-github-actions/auth@v0
uses: google-github-actions/auth@v2
with:
token_format: 'access_token'
workload_identity_provider: 'projects/1038484894585/locations/global/workloadIdentityPools/github-wi-pool/providers/github-wi-provider'
service_account: 'dsp-artifact-registry-push@dsp-artifact-registry.iam.gserviceaccount.com'

# Install gcloud, `setup-gcloud` automatically picks up authentication from `auth`.
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'
uses: google-github-actions/setup-gcloud@v2

- name: Explicitly auth Docker for Artifact Registry
run: gcloud auth configure-docker $GOOGLE_DOCKER_REPOSITORY --quiet
Expand Down
1 change: 1 addition & 0 deletions service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ dependencies {
// gcs
implementation platform('com.google.cloud:libraries-bom:26.44.0')
implementation 'com.google.cloud:google-cloud-storage'
implementation 'com.google.cloud:google-cloud-pubsub'

liquibaseRuntime 'info.picocli:picocli:4.6.1'
liquibaseRuntime 'org.postgresql:postgresql:42.6.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bio.terra.pipelines.app.configuration.internal;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("pipelines.notifications")
public record NotificationConfiguration(String projectId, String topicId) {}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.dependencies.wds.WdsService;
import bio.terra.pipelines.dependencies.workspacemanager.WorkspaceManagerService;
import bio.terra.pipelines.notifications.NotificationService;
import bio.terra.pipelines.service.PipelineInputsOutputsService;
import bio.terra.pipelines.service.PipelineRunsService;
import bio.terra.pipelines.service.PipelinesService;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class FlightBeanBag {
private final WorkspaceManagerService workspaceManagerService;
private final RawlsService rawlsService;
private final QuotasService quotasService;
private final NotificationService notificationService;
private final ImputationConfiguration imputationConfiguration;
private final CbasConfiguration cbasConfiguration;
private final WdlPipelineConfiguration wdlPipelineConfiguration;
Expand All @@ -54,6 +56,7 @@ public FlightBeanBag(
CbasService cbasService,
RawlsService rawlsService,
QuotasService quotasService,
NotificationService notificationService,
WorkspaceManagerService workspaceManagerService,
ImputationConfiguration imputationConfiguration,
CbasConfiguration cbasConfiguration,
Expand All @@ -68,6 +71,7 @@ public FlightBeanBag(
this.workspaceManagerService = workspaceManagerService;
this.rawlsService = rawlsService;
this.quotasService = quotasService;
this.notificationService = notificationService;
this.imputationConfiguration = imputationConfiguration;
this.cbasConfiguration = cbasConfiguration;
this.wdlPipelineConfiguration = wdlPipelineConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package bio.terra.pipelines.common.utils;

import static bio.terra.pipelines.common.utils.FlightUtils.flightMapKeyIsTrue;

import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.notifications.NotificationService;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.FlightStatus;
import bio.terra.stairway.HookAction;
import bio.terra.stairway.StairwayHook;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
* A {@link StairwayHook} that sends a Job Failed Notification email via pubsub/Thurloe upon flight
* failure.
*
* <p>This hook action will only run if the flight's input parameters contain the JobMapKeys key for
* DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK and the flight's status is not SUCCESS.
*
* <p>The JobMapKeys key for PIPELINE_NAME is required to send the notification.
*/
@Component
public class StairwaySendFailedJobNotificationHook implements StairwayHook {
private final NotificationService notificationService;
private static final Logger logger =
LoggerFactory.getLogger(StairwaySendFailedJobNotificationHook.class);

public StairwaySendFailedJobNotificationHook(NotificationService notificationService) {
this.notificationService = notificationService;
}

@Override
public HookAction endFlight(FlightContext context) {

FlightMap inputParameters = context.getInputParameters();

if (flightMapKeyIsTrue(inputParameters, JobMapKeys.DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK)
&& context.getFlightStatus() != FlightStatus.SUCCESS) {
logger.info(
"Flight has status {}, sending failed job notification email", context.getFlightStatus());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log the flight id as well here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chatted with faces, this already has the flight id associated with the log itself


FlightUtils.validateRequiredEntries(inputParameters, JobMapKeys.USER_ID);

UUID jobId = UUID.fromString(context.getFlightId());
String userId = inputParameters.get(JobMapKeys.USER_ID, String.class);

// send email notification
notificationService.configureAndSendPipelineRunFailedNotification(jobId, userId, context);
}
return HookAction.CONTINUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class PipelineRun {
@Column(name = "description")
private String description;

@Column(name = "quota_consumed")
private Integer quotaConsumed;

/** Constructor for in progress or complete PipelineRun. */
public PipelineRun(
UUID jobId,
Expand All @@ -83,7 +86,8 @@ public PipelineRun(
Instant created,
Instant updated,
CommonPipelineRunStatusEnum status,
String description) {
String description,
Integer quotaConsumed) {
this.jobId = jobId;
this.userId = userId;
this.pipelineId = pipelineId;
Expand All @@ -97,6 +101,7 @@ public PipelineRun(
this.updated = updated;
this.status = status;
this.description = description;
this.quotaConsumed = quotaConsumed;
}

/** Constructor for creating a new GCP pipeline run. Timestamps are auto-generated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class JobMapKeys {
"do_set_pipeline_run_status_failed_hook";
public static final String DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK =
"do_increment_metrics_failed_counter_hook";
public static final String DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK =
"do_send_job_failure_notification_hook";

JobMapKeys() {
throw new IllegalStateException("Attempted to instantiate utility class JobMapKeys");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import bio.terra.pipelines.common.utils.FlightBeanBag;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.common.utils.StairwayFailedMetricsCounterHook;
import bio.terra.pipelines.common.utils.StairwaySendFailedJobNotificationHook;
import bio.terra.pipelines.common.utils.StairwaySetPipelineRunStatusHook;
import bio.terra.pipelines.dependencies.stairway.exception.*;
import bio.terra.pipelines.dependencies.stairway.model.EnumeratedJob;
Expand Down Expand Up @@ -112,6 +113,8 @@ public void initialize() {
.addHook(new StairwayLoggingHook())
.addHook(new MonitoringHook(openTelemetry))
.addHook(new StairwayFailedMetricsCounterHook())
.addHook(
new StairwaySendFailedJobNotificationHook(flightBeanBag.getNotificationService()))
.addHook(new StairwaySetPipelineRunStatusHook(flightBeanBag.getPipelineRunsService()))
.exceptionSerializer(new StairwayExceptionSerializer(objectMapper)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package bio.terra.pipelines.notifications;

import lombok.Getter;

/** Base class for Teaspoons job notifications. Contains common fields for all job notifications. */
@Getter
public abstract class BaseTeaspoonsJobNotification {
protected String notificationType;
protected String recipientUserId;
protected String pipelineDisplayName;
protected String jobId;
protected String timeSubmitted;
protected String timeCompleted;
protected String quotaRemaining;
protected String quotaConsumedByJob;
protected String userDescription;

protected BaseTeaspoonsJobNotification(
String recipientUserId,
String pipelineDisplayName,
String jobId,
String timeSubmitted,
String timeCompleted,
String quotaRemaining,
String userDescription) {
this.recipientUserId = recipientUserId;
this.pipelineDisplayName = pipelineDisplayName;
this.jobId = jobId;
this.timeSubmitted = timeSubmitted;
this.timeCompleted = timeCompleted;
this.quotaRemaining = quotaRemaining;
this.userDescription = userDescription;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package bio.terra.pipelines.notifications;

import static bio.terra.pipelines.app.controller.JobApiUtils.buildApiErrorReport;

import bio.terra.pipelines.app.configuration.internal.NotificationConfiguration;
import bio.terra.pipelines.db.entities.Pipeline;
import bio.terra.pipelines.db.entities.PipelineRun;
import bio.terra.pipelines.db.entities.UserQuota;
import bio.terra.pipelines.generated.model.ApiErrorReport;
import bio.terra.pipelines.service.PipelineRunsService;
import bio.terra.pipelines.service.PipelinesService;
import bio.terra.pipelines.service.QuotasService;
import bio.terra.stairway.FlightContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
* Service to encapsulate the logic for composing and sending email notifications to users about
* completed pipeline runs. Works with the Terra Thurloe service via PubSub messages.
*/
@Service
public class NotificationService {
private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);

private final PipelineRunsService pipelineRunsService;
private final PipelinesService pipelinesService;
private final QuotasService quotasService;
private final PubsubService pubsubService;
private final NotificationConfiguration notificationConfiguration;
private final ObjectMapper objectMapper;

public NotificationService(
PipelineRunsService pipelineRunsService,
PipelinesService pipelinesService,
QuotasService quotasService,
PubsubService pubsubService,
NotificationConfiguration notificationConfiguration,
ObjectMapper objectMapper) {
this.pipelineRunsService = pipelineRunsService;
this.pipelinesService = pipelinesService;
this.quotasService = quotasService;
this.pubsubService = pubsubService;
this.notificationConfiguration = notificationConfiguration;
this.objectMapper = objectMapper;
}

/**
* Pull together the common fields for a notification.
*
* @param jobId the job id
* @param userId the user id
* @param context the flight context (only needed for failed notifications)
* @param isSuccess whether the notification is for a succeeded job; if false, creates a failed
* notification
* @return the base notification object
*/
private BaseTeaspoonsJobNotification createTeaspoonsJobNotification(
UUID jobId, String userId, FlightContext context, boolean isSuccess) {
PipelineRun pipelineRun = pipelineRunsService.getPipelineRun(jobId, userId);
Pipeline pipeline = pipelinesService.getPipelineById(pipelineRun.getPipelineId());
String pipelineDisplayName = pipeline.getDisplayName();

// if flight fails before quota steps on user's first run, there won't be a row for them yet
// in the quotas table
UserQuota userQuota =
quotasService.getOrCreateQuotaForUserAndPipeline(userId, pipeline.getName());
String quotaRemaining = String.valueOf(userQuota.getQuota() - userQuota.getQuotaConsumed());

if (isSuccess) { // succeeded
return new TeaspoonsJobSucceededNotification(
userId,
pipelineDisplayName,
jobId.toString(),
formatInstantToReadableString(pipelineRun.getCreated()),
formatInstantToReadableString(pipelineRun.getUpdated()),
pipelineRun.getQuotaConsumed().toString(),
quotaRemaining,
pipelineRun.getDescription());
} else { // failed
// get exception
Optional<Exception> exception = context.getResult().getException();
String errorMessage;
if (exception.isPresent()) {
ApiErrorReport errorReport =
buildApiErrorReport(exception.get()); // use same logic that the status endpoint uses
errorMessage = errorReport.getMessage();
} else {
logger.error(
"No exception found in flight result for flight {} with status {}",
context.getFlightId(),
context.getFlightStatus());
errorMessage = "Unknown error";
}
return new TeaspoonsJobFailedNotification(
userId,
pipelineDisplayName,
jobId.toString(),
errorMessage,
formatInstantToReadableString(pipelineRun.getCreated()),
formatInstantToReadableString(pipelineRun.getUpdated()),
quotaRemaining,
pipelineRun.getDescription());
}
}

/**
* Format an Instant as a date time string in UTC using the RFC-1123 date-time formatter, such as
* 'Tue, 3 Jun 2008 11:05:30 GMT'.
*
* @param dateTime the Instant to format
* @return the formatted date time string
*/
protected String formatInstantToReadableString(Instant dateTime) {
return dateTime.atZone(ZoneId.of("UTC")).format(DateTimeFormatter.RFC_1123_DATE_TIME);
}

/**
* Configure and send a notification that a job has succeeded.
*
* @param jobId the job id
* @param userId the user id
*/
public void configureAndSendPipelineRunSucceededNotification(UUID jobId, String userId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks liek this function and the failure equivalent are really the ones that should be called outside of this class, can we maek the rest of the functions private?

try {
pubsubService.publishMessage(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what information does thurloe use in the message to figure out what template to use and what values to put where?

Copy link
Collaborator Author

@mmorgantaylor mmorgantaylor Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the notificationType field is recognized by Thurloe and converted into the SendGrid template ID (mapping defined in terra-helmfile: https://github.com/broadinstitute/terra-helmfile/blob/master/charts/thurloe/values.yaml#L47)

notificationConfiguration.projectId(),
notificationConfiguration.topicId(),
objectMapper.writeValueAsString(
createTeaspoonsJobNotification(jobId, userId, null, true)));
} catch (IOException e) {
logger.error("Error sending pipelineRunSucceeded notification", e);
}
}

/**
* Configure and send a notification that a job has failed.
*
* @param jobId the job id
* @param userId the user id
* @param context the flight context
*/
public void configureAndSendPipelineRunFailedNotification(
UUID jobId, String userId, FlightContext context) {
try {
pubsubService.publishMessage(
notificationConfiguration.projectId(),
notificationConfiguration.topicId(),
objectMapper.writeValueAsString(
createTeaspoonsJobNotification(jobId, userId, context, false)));
} catch (IOException e) {
logger.error("Error sending pipelineRunFailed notification", e);
}
}
}
Loading
Loading