sns-alert-hub/app.py

247 lines
8.4 KiB
Python

#!/usr/bin/env python
import os
import json
import logging
import boto3
import dateutil.parser
import humanize
import urllib
import apprise
__author__ = "Stefan Reimer"
__author_email__ = "stefan@zero-downtime.net"
__version__ = "latest"
# Global alias lookup cache
account_aliases = {}
logger = logging.getLogger(__name__)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
def boolean(value):
if value in ("t", "T", "true", "True", "TRUE", "1", 1, True):
return True
return False
DEBUG = boolean(os.getenv("DEBUG", default=False))
RESOLVE_ACCOUNT = boolean(os.getenv("RESOLVE_ACCOUNT", default=False))
WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "dbus://")
if DEBUG:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
# Ensure slack URLs use ?blocks=yes
if "slack.com" in WEBHOOK_URL:
scheme, netloc, path, query_string, fragment = urllib.parse.urlsplit(WEBHOOK_URL)
query_params = urllib.parse.parse_qs(query_string)
query_params["blocks"] = ["yes"]
new_query_string = urllib.parse.urlencode(query_params, doseq=True)
WEBHOOK_URL = urllib.parse.urlunsplit(
(scheme, netloc, path, new_query_string, fragment)
)
# Setup apprise
asset = apprise.AppriseAsset()
# Set our app_id which is also used as User-Agent
asset.app_desc = "SNSAlertHub part of ZeroDownTime CloudBender"
asset.app_url = "https://zero-downtime.net"
asset.image_url_mask = (
"https://cdn.zero-downtime.net/assets/zdt/apprise/{TYPE}-{XY}{EXTENSION}"
)
asset.app_id = "{} / {} {}".format("cloudbender", __version__, "zero-downtime.net")
apobj = apprise.Apprise(asset=asset)
apobj.add(WEBHOOK_URL)
def get_alias(account_id):
"""resolves aws account_id to account alias and caches for lifetime of lambda function"""
if RESOLVE_ACCOUNT:
try:
if account_id not in account_aliases:
iam = boto3.client("iam")
account_aliases[account_id] = iam.list_account_aliases()[
"AccountAliases"
][0]
return account_aliases[account_id]
except (KeyError, IndexError):
logger.warning("Could not resolve IAM account alias")
pass
return account_id
def handler(event, context):
logger.debug(json.dumps({"aws.event": event})) # sort_keys=True, indent=4
(region, account_id) = context.invoked_function_arn.split(":")[3:5]
sns = event["Records"][0]["Sns"]
# Guess what we have, try to parse as json first
try:
msg = json.loads(sns["Message"])
except json.decoder.JSONDecodeError:
msg = {}
pass
# CloudWatch Alarm ?
if "AlarmName" in msg:
title = "AWS Cloudwatch Alarm"
# Discard NewStateValue == OK && OldStateValue == INSUFFICIENT_DATA as these are triggered by installing new Alarms and only cause confusion
if msg["NewStateValue"] == "OK" and msg["OldStateValue"] == "INSUFFICIENT_DATA":
logger.info(
"Discarding Cloudwatch Metrics Alarm as state is OK and previous state was insufficient data, most likely new alarm being installed"
)
return 0
body = msg["AlarmDescription"]
msg_context = "{account} - {region} -> <https://{region}.console.aws.amazon.com/cloudwatch/home?region={region}#alarmsV2:alarm/{alarm}|Alarm Details>".format(
region=region,
alarm=msg["AlarmName"],
account=get_alias(msg["AWSAccountId"]),
)
msg_type = apprise.NotifyType.WARNING
try:
notify_map = {
"ok": apprise.NotifyType.SUCCESS,
"alarm": apprise.NotifyType.FAILURE,
"insuffcient_data": apprise.NotifyType.INFO,
}
msg_type = notify_map[msg["NewStateValue"].lower()]
# Reduce severtity for CPUCredit Alarms to Warning
if msg_type == apprise.NotifyType.FAILURE:
if msg["Trigger"]["MetricName"] == "CPUSurplusCreditBalance":
msg_type = apprise.NotifyType.WARNING
except KeyError:
pass
body = body + "\n\n_{}_".format(msg_context)
apobj.notify(body=body, title=title, notify_type=msg_type)
elif "Source" in msg and msg["Source"] == "CloudBender":
title = "AWS EC2 - CloudBender"
try:
msg_context = "{account} - {region} - {host} ({instance}) <https://{region}.console.aws.amazon.com/ec2/autoscaling/home?region={region}#AutoScalingGroups:id={asg};view=history|{artifact} ASG>".format(
account=get_alias(msg["AWSAccountId"]),
region=msg["Region"],
asg=msg["Asg"],
instance=msg["Instance"],
host=msg["Hostname"],
artifact=msg["Artifact"],
)
except KeyError:
msg_context = "{account} - {region}".format(
account=get_alias(msg["AWSAccountId"]), region=msg["Region"]
)
msg_type = apprise.NotifyType.WARNING
try:
notify_map = {
"warning": apprise.NotifyType.WARNING,
"error": apprise.NotifyType.FAILURE,
"info": apprise.NotifyType.INFO,
"success": apprise.NotifyType.SUCCESS,
}
msg_type = notify_map[msg["Level"].lower()]
except KeyError:
pass
if "Subject" in msg and msg["Subject"]:
title = msg["Subject"]
body = ""
if "Message" in msg and msg["Message"]:
body = msg["Message"]
if "Attachment" in msg and msg["Attachment"]:
body = body + "\n```{}```".format(msg["Attachment"])
body = body + "\n\n_{}_".format(msg_context)
apobj.notify(body=body, title=title, notify_type=msg_type)
elif "receiver" in msg and msg["receiver"] == "alerthub-notifications":
for alert in msg["alerts"]:
# First msg_type
msg_type = apprise.NotifyType.WARNING
try:
if alert["status"] == "resolved":
msg_type = apprise.NotifyType.SUCCESS
else:
if alert["labels"]["severity"] == "critical":
msg_type = apprise.NotifyType.FAILURE
except KeyError:
pass
# set title to Alertname
try:
title = alert["labels"]["alertname"]
except KeyError:
title = "Alertmanager"
# assemble message body
try:
body = "{}\n{}".format(
alert["annotations"]["summary"], alert["annotations"]["description"]
)
if alert["status"] == "resolved":
body = body + "\nDuration: {}".format(
humanize.time.precisedelta(
dateutil.parser.parse(alert["startsAt"])
- dateutil.parser.parse(alert["endsAt"])
)
)
else:
if "runbook_url" in alert["annotations"]:
body = body + " <{}|Runbook>".format(
alert["annotations"]["runbook_url"]
)
if "generatorURL" in alert["annotations"]:
body = body + " <{}|Source>".format(
alert["annotations"]["generatorURL"]
)
except KeyError:
body = "Unknown Alert:\n{}".format(alert)
try:
msg_context = "{account} - {region} - <{alert_manager_link}/#/alerts?receiver=alerthub-notifications|{cluster}>".format(
cluster=alert["labels"]["clusterName"],
region=alert["labels"]["awsRegion"],
account=get_alias(alert["labels"]["awsAccount"]),
alert_manager_link=msg["externalURL"],
)
body = body + "\n\n_{}_".format(msg_context)
except KeyError:
pass
# Finally send each parsed alert
apobj.notify(body=body, title=title, notify_type=msg_type)
else:
body = sns["Message"]
apobj.notify(
body=body,
title="Unknown message type",
notify_type=apprise.NotifyType.WARNING,
)