diff --git a/README.md b/README.md index 198b3b6..5dd967f 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ No modules. | [aws_cloudwatch_event_rule.msk_health_lambda_schedule](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_rule) | resource | | [aws_cloudwatch_event_target.msk_health_lambda_target](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_target) | resource | | [aws_cloudwatch_log_group.msk_health_lambda_log_groups](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource | +| [aws_cloudwatch_metric_alarm.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_metric_alarm) | resource | | [aws_iam_policy.msk_health_lambda_role_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource | | [aws_iam_role.msk_health_lambda_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource | | [aws_iam_role_policy_attachment.msk_health_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource | @@ -50,11 +51,14 @@ No modules. | Name | Description | Type | Default | Required | |------|-------------|------|---------|:--------:| +| [cluster\_arns](#input\_cluster\_arns) | List of MSK cluster ARNs. Default is empty list. | `list(string)` | `[]` | no | | [email](#input\_email) | List of e-mail addresses subscribing to the SNS topic. Default is empty list. | `list(string)` | `[]` | no | +| [enable\_cloudwatch\_alarms](#input\_enable\_cloudwatch\_alarms) | Setup CloudWatch alarms for the MSK clusters state. For each state a separate alarm will be created. Default is false. | `bool` | `false` | no | +| [enable\_sns\_notifications](#input\_enable\_sns\_notifications) | Setup SNS notifications for the MSK clusters state. Default is false. | `bool` | `false` | no | | [ignore\_states](#input\_ignore\_states) | Suppress warnings for the listed MSK states. Default: ['MAINTENANCE'] | `list(string)` |
[
"MAINTENANCE"
]
| no | | [log\_retion\_period\_in\_days](#input\_log\_retion\_period\_in\_days) | Number of days logs will be retained. Default is 365 days. | `number` | `365` | no | | [memory\_size](#input\_memory\_size) | Amount of memory in MByte that the Lambda Function can use at runtime. Default is 160. | `number` | `160` | no | -| [schedule\_expression](#input\_schedule\_expression) | The schedule expression for the CloudWatch event rule. Default is 'rate(15 minutes)'. | `string` | `"rate(15 minutes)"` | no | +| [schedule\_expression](#input\_schedule\_expression) | The schedule expression for the CloudWatch event rule. Default is 'rate(5 minutes)'. | `string` | `"rate(5 minutes)"` | no | | [tags](#input\_tags) | A map of tags to add to all resources. Default is empty map. | `map(string)` | `{}` | no | ## Outputs diff --git a/examples/01_default_configuration/main.tf b/examples/01_default_configuration/main.tf index 754cf3d..d812308 100644 --- a/examples/01_default_configuration/main.tf +++ b/examples/01_default_configuration/main.tf @@ -1,5 +1,8 @@ module "msk_monitor" { - source = "../.." + source = "../.." + cluster_arns = [] + enable_cloudwatch_alarms = true + schedule_expression = "rate(2 minutes)" tags = { "Name" = "msk-monitor" } diff --git a/functions/check-msk-status/index.py b/functions/check-msk-status/index.py index cf8eb20..df2543b 100644 --- a/functions/check-msk-status/index.py +++ b/functions/check-msk-status/index.py @@ -3,13 +3,21 @@ def lambda_handler(event, context): + CLUSTER_ARNS = os.environ["CLUSTER_ARNS"].split(",") + ENABLE_CLOUDWATCH_METRICS = os.environ["ENABLE_CLOUDWATCH_METRICS"] + ENABLE_SNS_NOTIFICATIONS = os.environ["ENABLE_SNS_NOTIFICATIONS"] LAMBDASNSTOPIC = os.environ["SNS_TOPIC_ARN"] SUPPRESS_STATES = os.environ["SUPPRESS_STATES"].split(",") + region = "eu-central-1" - # Create an MSK client - client = boto3.client("kafka", region_name=region) + + # Create boto clients + kafka = boto3.client("kafka", region_name=region) + cloudwatch = boto3.client("cloudwatch") + sns = boto3.client("sns") + # Retrieve a list of clusters - response = client.list_clusters_v2() + response = kafka.list_clusters_v2() # Extract the cluster ARNs from the response cluster_arns = response["ClusterInfoList"] @@ -20,28 +28,65 @@ def lambda_handler(event, context): ) ) - for cluster in cluster_arns: - arn = cluster["ClusterArn"] - response = client.describe_cluster_v2(ClusterArn=arn) + for arn in CLUSTER_ARNS: + try: + response = kafka.describe_cluster_v2(ClusterArn=arn) + except Exception as e: + print(f"An error occurred when trying to describe the cluster {arn}: {e}") + continue + status = response["ClusterInfo"]["State"] - print("The cluster {} is in state {}.".format(arn,status)) - sns_client = boto3.client("sns") - if status not in valid_states: - print("The MSK cluster: {} needs attention.".format(arn)) - sns_client.publish( - TopicArn=LAMBDASNSTOPIC, - Message="MSK cluster: " - + arn - + " needs attention. The status is: " - + status, - Subject="MSK Health Warning!", + cluster_name = response["ClusterInfo"]["ClusterName"] + arn_parts = arn.split(":") + account_id = arn_parts[4] + print( + "The cluster {} in account {} is in state {}.".format( + cluster_name, account_id, status ) + ) + + # Cover situation where cluster has been deleted. + if ENABLE_CLOUDWATCH_METRICS: + x = 1 if status not in valid_states else 0 + put_custom_metric(cloudwatch=cloudwatch, cluster_name=cluster_name, value=x) + print( + "Put custom metric for cluster: {} with value: {}".format( + cluster_name, x + ) + ) + if ENABLE_SNS_NOTIFICATIONS: + if status not in valid_states: + print("The MSK cluster: {} needs attention.".format(arn)) + sns.publish( + TopicArn=LAMBDASNSTOPIC, + Message="MSK cluster " + + cluster_name + + " needs attention. The status is " + + status, + Subject="MSK Health Warning!", + ) else: print( - "The MSK cluster: {} is in a healthy state, and is reachable and available for use.".format( + "The MSK cluster {} is in a healthy state, and is reachable and available for use.".format( arn ) ) # Return the status return {"statusCode": 200, "body": "OK"} + + +def put_custom_metric(cloudwatch, cluster_name: str, value: int): + return cloudwatch.put_metric_data( + MetricData=[ + { + "MetricName": "Status", + "Dimensions": [ + {"Name": "ClusterName", "Value": cluster_name}, + ], + "Unit": "None", + "Value": value, + }, + ], + Namespace="Custom/Kafka", + ) diff --git a/main.tf b/main.tf index b8883cb..5f3c1d6 100644 --- a/main.tf +++ b/main.tf @@ -48,8 +48,6 @@ resource "aws_iam_role_policy_attachment" "msk_health_permissions" { aws_iam_role.msk_health_lambda_role] } -### TODO: check describe ClusterV2 permissions -# iam policy for lambda role resource "aws_iam_policy" "msk_health_lambda_role_policy" { name = "msk-health-lambda-role-policy-${random_id.id.hex}" path = "/" @@ -80,6 +78,13 @@ resource "aws_iam_policy" "msk_health_lambda_role_policy" { "Resource": "arn:aws:kafka:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:cluster/*", "Effect": "Allow" }, + { + "Action": [ + "cloudwatch:PutMetricData" + ], + "Resource": "*", + "Effect": "Allow" + }, { "Action": [ "sns:Publish" @@ -110,8 +115,11 @@ resource "aws_lambda_function" "msk_health_lambda" { } environment { variables = { - SNS_TOPIC_ARN = aws_sns_topic.msk_health_sns_topic.arn - SUPPRESS_STATES = join(",", var.ignore_states) + CLUSTER_ARNS = join(",", var.cluster_arns) + SNS_TOPIC_ARN = aws_sns_topic.msk_health_sns_topic.arn + ENABLE_CLOUDWATCH_METRICS = var.enable_cloudwatch_alarms + ENABLE_SNS_NOTIFICATIONS = var.enable_sns_notifications + SUPPRESS_STATES = join(",", var.ignore_states) } } @@ -146,3 +154,26 @@ resource "aws_cloudwatch_log_group" "msk_health_lambda_log_groups" { retention_in_days = var.log_retion_period_in_days tags = var.tags } + + +resource "aws_cloudwatch_metric_alarm" "this" { + for_each = toset(local.cluster_names) + namespace = "Custom/Kafka" + period = 300 + metric_name = "Status" + alarm_name = "msk_status_monitor-${each.key}-${random_id.id.hex}" + comparison_operator = "GreaterThanThreshold" + alarm_description = "This alarm triggers on MSK cluster status" + evaluation_periods = 2 + statistic = "Average" + threshold = 0 + insufficient_data_actions = [] + dimensions = { + ClusterName = each.key + } + tags = var.tags +} + +locals { + cluster_names = var.enable_cloudwatch_alarms ? sort([for arn in var.cluster_arns : element(split("/", arn), 1)]) : [] +} diff --git a/variables.tf b/variables.tf index a0e1a43..71d8697 100644 --- a/variables.tf +++ b/variables.tf @@ -1,9 +1,27 @@ +variable "cluster_arns" { + description = "List of MSK cluster ARNs. Default is empty list." + type = list(string) + default = [] +} + variable "email" { description = "List of e-mail addresses subscribing to the SNS topic. Default is empty list." type = list(string) default = [] } +variable "enable_cloudwatch_alarms" { + description = "Setup CloudWatch alarms for the MSK clusters state. For each state a separate alarm will be created. Default is false." + type = bool + default = false +} + +variable "enable_sns_notifications" { + description = "Setup SNS notifications for the MSK clusters state. Default is false." + type = bool + default = false +} + variable "ignore_states" { description = "Suppress warnings for the listed MSK states. Default: ['MAINTENANCE']" type = list(string) @@ -35,9 +53,9 @@ variable "memory_size" { } variable "schedule_expression" { - description = "The schedule expression for the CloudWatch event rule. Default is 'rate(15 minutes)'." + description = "The schedule expression for the CloudWatch event rule. Default is 'rate(5 minutes)'." type = string - default = "rate(15 minutes)" + default = "rate(5 minutes)" } variable "tags" {