sns-alert-hub/app.py

309 lines
11 KiB
Python

#!/usr/bin/env python
import os
import json
import logging
import boto3
import dateutil.parser
import humanize
import urllib
import apprise
__author__ = "Stefan Reimer"
__author_email__ = "stefan@zero-downtime.net"
__version__ = "latest"
# Global alias lookup cache
account_aliases = {}
logger = logging.getLogger(__name__)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
def boolean(value):
if value in ("t", "T", "true", "True", "TRUE", "1", 1, True):
return True
return False
DEBUG = boolean(os.getenv("DEBUG", default=False))
RESOLVE_ACCOUNT = boolean(os.getenv("RESOLVE_ACCOUNT", default=False))
WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "dbus://")
if DEBUG:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
# Ensure slack URLs use ?blocks=yes
if "slack.com" in WEBHOOK_URL:
scheme, netloc, path, query_string, fragment = urllib.parse.urlsplit(
WEBHOOK_URL)
query_params = urllib.parse.parse_qs(query_string)
query_params["blocks"] = ["yes"]
new_query_string = urllib.parse.urlencode(query_params, doseq=True)
WEBHOOK_URL = urllib.parse.urlunsplit(
(scheme, netloc, path, new_query_string, fragment)
)
# Setup apprise
asset = apprise.AppriseAsset()
# Set our app_id which is also used as User-Agent
asset.app_desc = "SNSAlertHub part of ZeroDownTime CloudBender"
asset.app_url = "https://zero-downtime.net"
asset.image_url_mask = (
"https://cdn.zero-downtime.net/assets/zdt/apprise/{TYPE}-{XY}{EXTENSION}"
)
asset.app_id = "{} / {} {}".format("cloudbender",
__version__, "zero-downtime.net")
apobj = apprise.Apprise(asset=asset)
apobj.add(WEBHOOK_URL)
def get_alias(account_id):
"""resolves aws account_id to account alias and caches for lifetime of lambda function"""
if RESOLVE_ACCOUNT:
try:
if account_id not in account_aliases:
iam = boto3.client("iam")
account_aliases[account_id] = iam.list_account_aliases()[
"AccountAliases"
][0]
return account_aliases[account_id]
except (KeyError, IndexError):
logger.warning("Could not resolve IAM account alias")
pass
return account_id
def handler(event, context):
logger.debug(json.dumps({"aws.event": event})) # sort_keys=True, indent=4
(region, account_id) = context.invoked_function_arn.split(":")[3:5]
sns = event["Records"][0]["Sns"]
# Guess what we have, try to parse as json first
try:
msg = json.loads(sns["Message"])
except json.decoder.JSONDecodeError:
msg = {}
pass
body = ""
title = ""
msg_type = apprise.NotifyType.INFO
# CloudWatch Alarm ?
if "AlarmName" in msg:
title = "AWS Cloudwatch Alarm"
# Discard NewStateValue == OK && OldStateValue == INSUFFICIENT_DATA as
# these are triggered by installing new Alarms and only cause confusion
if msg["NewStateValue"] == "OK" and msg["OldStateValue"] == "INSUFFICIENT_DATA":
logger.info(
"Discarding Cloudwatch Metrics Alarm as state is OK and previous state was insufficient data, most likely new alarm being installed"
)
return 0
body = msg["AlarmDescription"]
msg_context = "{account} - {region} -> <https://{region}.console.aws.amazon.com/cloudwatch/home?region={region}#alarmsV2:alarm/{alarm}|Alarm Details>".format(
region=region,
alarm=msg["AlarmName"],
account=get_alias(msg["AWSAccountId"]),
)
msg_type = apprise.NotifyType.WARNING
try:
notify_map = {
"ok": apprise.NotifyType.SUCCESS,
"alarm": apprise.NotifyType.FAILURE,
"insuffcient_data": apprise.NotifyType.INFO,
}
msg_type = notify_map[msg["NewStateValue"].lower()]
# Reduce severtity for CPUCredit Alarms to Warning
if msg_type == apprise.NotifyType.FAILURE:
if msg["Trigger"]["MetricName"] == "CPUSurplusCreditBalance":
msg_type = apprise.NotifyType.WARNING
except KeyError:
pass
body = body + "\n\n_{}_".format(msg_context)
elif "Source" in msg and msg["Source"] == "CloudBender":
title = "AWS EC2 - CloudBender"
try:
msg_context = "{account} - {region} - {host} ({instance}) <https://{region}.console.aws.amazon.com/ec2/home?region={region}#AutoScalingGroupDetails:id={asg};view=activity|{artifact} ASG>".format(
account=get_alias(msg["AWSAccountId"]),
region=msg["Region"],
asg=msg["Asg"],
instance=msg["Instance"],
host=msg["Hostname"],
artifact=msg["Artifact"],
)
except KeyError:
msg_context = "{account} - {region}".format(
account=get_alias(msg["AWSAccountId"]), region=msg["Region"]
)
msg_type = apprise.NotifyType.WARNING
try:
notify_map = {
"warning": apprise.NotifyType.WARNING,
"error": apprise.NotifyType.FAILURE,
"info": apprise.NotifyType.INFO,
"success": apprise.NotifyType.SUCCESS,
}
msg_type = notify_map[msg["Level"].lower()]
except KeyError:
pass
if "Subject" in msg and msg["Subject"]:
title = msg["Subject"]
body = ""
if "Message" in msg and msg["Message"]:
body = msg["Message"]
if "Attachment" in msg and msg["Attachment"]:
body = body + "\n```{}```".format(msg["Attachment"])
body = body + "\n\n_{}_".format(msg_context)
elif "receiver" in msg and msg["receiver"] == "alerthub-notifications":
for alert in msg["alerts"]:
# First msg_type
msg_type = apprise.NotifyType.WARNING
try:
if alert["status"] == "resolved":
msg_type = apprise.NotifyType.SUCCESS
else:
if alert["labels"]["severity"] == "critical":
msg_type = apprise.NotifyType.FAILURE
except KeyError:
pass
# set title to Alertname
try:
title = alert["labels"]["alertname"]
except KeyError:
title = "Alertmanager"
# assemble message body
try:
body = "{}\n{}".format(
alert["annotations"]["summary"], alert["annotations"]["description"]
)
if alert["status"] == "resolved":
body = body + "\nDuration: {}".format(
humanize.time.precisedelta(
dateutil.parser.parse(alert["startsAt"])
- dateutil.parser.parse(alert["endsAt"])
)
)
else:
if "runbook_url" in alert["annotations"]:
body = body + " <{}|Runbook>".format(
alert["annotations"]["runbook_url"]
)
if "generatorURL" in alert["annotations"]:
body = body + " <{}|Source>".format(
alert["annotations"]["generatorURL"]
)
except KeyError:
body = "Unknown Alert:\n{}".format(alert)
try:
msg_context = "{account} - {region} - <{alert_manager_link}/#/alerts?receiver=alerthub-notifications|{cluster}>".format(
cluster=alert["labels"]["clusterName"],
region=alert["labels"]["awsRegion"],
account=get_alias(alert["labels"]["awsAccount"]),
alert_manager_link=msg["externalURL"],
)
body = body + "\n\n_{}_".format(msg_context)
except KeyError:
pass
# ElasticCache snapshot notifications
elif "ElastiCache:SnapshotComplete" in msg:
title = "ElastiCache Snapshot complete."
body = "Snapshot taken on {}".format(
msg["ElastiCache:SnapshotComplete"])
# ElasticCache replacement notifications
elif "ElastiCache:NodeReplacementScheduled" in msg:
title = "ElastiCache node replacement scheduled"
body = "{} will be replaced between {} and {}".format(
msg["ElastiCache:NodeReplacementScheduled"], msg["Start Time"], msg["End Time"])
# ElasticCache replacement notifications
elif "ElastiCache:CacheNodeReplaceStarted" in msg:
title = "ElastiCache fail over stareted"
body = "for node {}".format(msg["ElastiCache:CacheNodeReplaceStarted"])
# ElasticCache replacement notifications
elif "ElastiCache:FailoverComplete" in msg:
title = "ElastiCache fail over complete"
body = "for node {}".format(msg["ElastiCache:FailoverComplete"])
# ElasticCache update notifications
elif "ElastiCache:ServiceUpdateAvailableForNode" in msg:
title = "ElastiCache update available"
body = "for node {}".format(msg["ElastiCache:ServiceUpdateAvailableForNode"])
# known RDS events
elif "Event Source" in msg and msg['Event Source'] in ["db-instance", "db-cluster-snapshot", "db-snapshot"]:
try:
title = msg["Event Message"]
try:
name = " ({}).".format(
msg["Tags"]["Name"])
except (KeyError, IndexError):
name = ""
body = "RDS {}: <{}|{}>{}\n<{}|Event docs>".format(msg["Event Source"].replace("db-", ""),
msg["Identifier Link"], msg["Source ID"], name, msg["Event ID"])
except KeyError:
msg_type = apprise.NotifyType.WARNING
body = sns["Message"]
pass
# Basic ASG events
elif "Event" in msg and msg["Event"] in ["autoscaling:EC2_INSTANCE_TERMINATE", "autoscaling:EC2_INSTANCE_LAUNCH"]:
title = msg["Description"]
body = msg["Cause"]
try:
msg_context = "{account} - {region} - <https://{region}.console.aws.amazon.com/ec2/home?region={region}#AutoScalingGroupDetails:id={asg};view=activity|{asg} ASG>".format(
region=region,
account=get_alias(msg["AccountId"]),
asg=msg["AutoScalingGroupName"],
)
body = body + "\n\n_{}_".format(msg_context)
except KeyError:
pass
else:
title = "Unknown message type"
msg_type = apprise.NotifyType.WARNING
body = sns["Message"]
if not apobj.notify(body=body, title=title, notify_type=msg_type):
logger.error("Error during notify!")