Skip to content

Commit

Permalink
feat: add support for cloudwatch alarms
Browse files Browse the repository at this point in the history
This commit adds support for CloudWatch alarms to monitor the state of MSK clusters. It introduces a new input variable `enable_cloudwatch_alarms` which, when set to true, enables the creation of separate alarms for each cluster state.
  • Loading branch information
stefanfreitag committed Nov 3, 2023
1 parent 8ab63ac commit d61aced
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 26 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ No modules.
| [aws_cloudwatch_event_rule.msk_health_lambda_schedule](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_rule) | resource |
| [aws_cloudwatch_event_target.msk_health_lambda_target](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_target) | resource |
| [aws_cloudwatch_log_group.msk_health_lambda_log_groups](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_cloudwatch_metric_alarm.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_metric_alarm) | resource |
| [aws_iam_policy.msk_health_lambda_role_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_role.msk_health_lambda_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
| [aws_iam_role_policy_attachment.msk_health_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
Expand All @@ -50,11 +51,14 @@ No modules.

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_cluster_arns"></a> [cluster\_arns](#input\_cluster\_arns) | List of MSK cluster ARNs. Default is empty list. | `list(string)` | `[]` | no |
| <a name="input_email"></a> [email](#input\_email) | List of e-mail addresses subscribing to the SNS topic. Default is empty list. | `list(string)` | `[]` | no |
| <a name="input_enable_cloudwatch_alarms"></a> [enable\_cloudwatch\_alarms](#input\_enable\_cloudwatch\_alarms) | Setup CloudWatch alarms for the MSK clusters state. For each state a separate alarm will be created. Default is false. | `bool` | `false` | no |
| <a name="input_enable_sns_notifications"></a> [enable\_sns\_notifications](#input\_enable\_sns\_notifications) | Setup SNS notifications for the MSK clusters state. Default is false. | `bool` | `false` | no |
| <a name="input_ignore_states"></a> [ignore\_states](#input\_ignore\_states) | Suppress warnings for the listed MSK states. Default: ['MAINTENANCE'] | `list(string)` | <pre>[<br> "MAINTENANCE"<br>]</pre> | no |
| <a name="input_log_retion_period_in_days"></a> [log\_retion\_period\_in\_days](#input\_log\_retion\_period\_in\_days) | Number of days logs will be retained. Default is 365 days. | `number` | `365` | no |
| <a name="input_memory_size"></a> [memory\_size](#input\_memory\_size) | Amount of memory in MByte that the Lambda Function can use at runtime. Default is 160. | `number` | `160` | no |
| <a name="input_schedule_expression"></a> [schedule\_expression](#input\_schedule\_expression) | The schedule expression for the CloudWatch event rule. Default is 'rate(15 minutes)'. | `string` | `"rate(15 minutes)"` | no |
| <a name="input_schedule_expression"></a> [schedule\_expression](#input\_schedule\_expression) | The schedule expression for the CloudWatch event rule. Default is 'rate(5 minutes)'. | `string` | `"rate(5 minutes)"` | no |
| <a name="input_tags"></a> [tags](#input\_tags) | A map of tags to add to all resources. Default is empty map. | `map(string)` | `{}` | no |

## Outputs
Expand Down
5 changes: 4 additions & 1 deletion examples/01_default_configuration/main.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
module "msk_monitor" {
source = "../.."
source = "../.."
cluster_arns = []
enable_cloudwatch_alarms = true
schedule_expression = "rate(2 minutes)"
tags = {
"Name" = "msk-monitor"
}
Expand Down
81 changes: 63 additions & 18 deletions functions/check-msk-status/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@


def lambda_handler(event, context):
CLUSTER_ARNS = os.environ["CLUSTER_ARNS"].split(",")
ENABLE_CLOUDWATCH_METRICS = os.environ["ENABLE_CLOUDWATCH_METRICS"]
ENABLE_SNS_NOTIFICATIONS = os.environ["ENABLE_SNS_NOTIFICATIONS"]
LAMBDASNSTOPIC = os.environ["SNS_TOPIC_ARN"]
SUPPRESS_STATES = os.environ["SUPPRESS_STATES"].split(",")

region = "eu-central-1"
# Create an MSK client
client = boto3.client("kafka", region_name=region)

# Create boto clients
kafka = boto3.client("kafka", region_name=region)
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")

# Retrieve a list of clusters
response = client.list_clusters_v2()
response = kafka.list_clusters_v2()
# Extract the cluster ARNs from the response
cluster_arns = response["ClusterInfoList"]

Expand All @@ -20,28 +28,65 @@ def lambda_handler(event, context):
)
)

for cluster in cluster_arns:
arn = cluster["ClusterArn"]
response = client.describe_cluster_v2(ClusterArn=arn)
for arn in CLUSTER_ARNS:
try:
response = kafka.describe_cluster_v2(ClusterArn=arn)
except Exception as e:
print(f"An error occurred when trying to describe the cluster {arn}: {e}")
continue

status = response["ClusterInfo"]["State"]
print("The cluster {} is in state {}.".format(arn,status))
sns_client = boto3.client("sns")
if status not in valid_states:
print("The MSK cluster: {} needs attention.".format(arn))
sns_client.publish(
TopicArn=LAMBDASNSTOPIC,
Message="MSK cluster: "
+ arn
+ " needs attention. The status is: "
+ status,
Subject="MSK Health Warning!",
cluster_name = response["ClusterInfo"]["ClusterName"]
arn_parts = arn.split(":")
account_id = arn_parts[4]
print(
"The cluster {} in account {} is in state {}.".format(
cluster_name, account_id, status
)
)

# Cover situation where cluster has been deleted.
if ENABLE_CLOUDWATCH_METRICS:
x = 1 if status not in valid_states else 0
put_custom_metric(cloudwatch=cloudwatch, cluster_name=cluster_name, value=x)
print(
"Put custom metric for cluster: {} with value: {}".format(
cluster_name, x
)
)
if ENABLE_SNS_NOTIFICATIONS:
if status not in valid_states:
print("The MSK cluster: {} needs attention.".format(arn))
sns.publish(
TopicArn=LAMBDASNSTOPIC,
Message="MSK cluster "
+ cluster_name
+ " needs attention. The status is "
+ status,
Subject="MSK Health Warning!",
)
else:
print(
"The MSK cluster: {} is in a healthy state, and is reachable and available for use.".format(
"The MSK cluster {} is in a healthy state, and is reachable and available for use.".format(
arn
)
)

# Return the status
return {"statusCode": 200, "body": "OK"}


def put_custom_metric(cloudwatch, cluster_name: str, value: int):
return cloudwatch.put_metric_data(
MetricData=[
{
"MetricName": "Status",
"Dimensions": [
{"Name": "ClusterName", "Value": cluster_name},
],
"Unit": "None",
"Value": value,
},
],
Namespace="Custom/Kafka",
)
39 changes: 35 additions & 4 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ resource "aws_iam_role_policy_attachment" "msk_health_permissions" {
aws_iam_role.msk_health_lambda_role]
}

### TODO: check describe ClusterV2 permissions
# iam policy for lambda role
resource "aws_iam_policy" "msk_health_lambda_role_policy" {
name = "msk-health-lambda-role-policy-${random_id.id.hex}"
path = "/"
Expand Down Expand Up @@ -80,6 +78,13 @@ resource "aws_iam_policy" "msk_health_lambda_role_policy" {
"Resource": "arn:aws:kafka:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:cluster/*",
"Effect": "Allow"
},
{
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*",
"Effect": "Allow"
},
{
"Action": [
"sns:Publish"
Expand Down Expand Up @@ -110,8 +115,11 @@ resource "aws_lambda_function" "msk_health_lambda" {
}
environment {
variables = {
SNS_TOPIC_ARN = aws_sns_topic.msk_health_sns_topic.arn
SUPPRESS_STATES = join(",", var.ignore_states)
CLUSTER_ARNS = join(",", var.cluster_arns)
SNS_TOPIC_ARN = aws_sns_topic.msk_health_sns_topic.arn
ENABLE_CLOUDWATCH_METRICS = var.enable_cloudwatch_alarms
ENABLE_SNS_NOTIFICATIONS = var.enable_sns_notifications
SUPPRESS_STATES = join(",", var.ignore_states)
}
}

Expand Down Expand Up @@ -146,3 +154,26 @@ resource "aws_cloudwatch_log_group" "msk_health_lambda_log_groups" {
retention_in_days = var.log_retion_period_in_days
tags = var.tags
}


resource "aws_cloudwatch_metric_alarm" "this" {
for_each = toset(local.cluster_names)
namespace = "Custom/Kafka"
period = 300
metric_name = "Status"
alarm_name = "msk_status_monitor-${each.key}-${random_id.id.hex}"
comparison_operator = "GreaterThanThreshold"
alarm_description = "This alarm triggers on MSK cluster status"
evaluation_periods = 2
statistic = "Average"
threshold = 0
insufficient_data_actions = []
dimensions = {
ClusterName = each.key
}
tags = var.tags
}

locals {
cluster_names = var.enable_cloudwatch_alarms ? sort([for arn in var.cluster_arns : element(split("/", arn), 1)]) : []
}
22 changes: 20 additions & 2 deletions variables.tf
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
variable "cluster_arns" {
description = "List of MSK cluster ARNs. Default is empty list."
type = list(string)
default = []
}

variable "email" {
description = "List of e-mail addresses subscribing to the SNS topic. Default is empty list."
type = list(string)
default = []
}

variable "enable_cloudwatch_alarms" {
description = "Setup CloudWatch alarms for the MSK clusters state. For each state a separate alarm will be created. Default is false."
type = bool
default = false
}

variable "enable_sns_notifications" {
description = "Setup SNS notifications for the MSK clusters state. Default is false."
type = bool
default = false
}

variable "ignore_states" {
description = "Suppress warnings for the listed MSK states. Default: ['MAINTENANCE']"
type = list(string)
Expand Down Expand Up @@ -35,9 +53,9 @@ variable "memory_size" {
}

variable "schedule_expression" {
description = "The schedule expression for the CloudWatch event rule. Default is 'rate(15 minutes)'."
description = "The schedule expression for the CloudWatch event rule. Default is 'rate(5 minutes)'."
type = string
default = "rate(15 minutes)"
default = "rate(5 minutes)"
}

variable "tags" {
Expand Down

0 comments on commit d61aced

Please sign in to comment.