#!/usr/bin/env python import os import json import logging import boto3 import dateutil.parser import humanize import urllib import apprise __author__ = "Stefan Reimer" __author_email__ = "stefan@zero-downtime.net" __version__ = "latest" # Global alias lookup cache account_aliases = {} logger = logging.getLogger(__name__) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("boto3").setLevel(logging.WARNING) logging.getLogger("botocore").setLevel(logging.WARNING) def boolean(value): if value in ("t", "T", "true", "True", "TRUE", "1", 1, True): return True return False DEBUG = boolean(os.getenv("DEBUG", default=False)) RESOLVE_ACCOUNT = boolean(os.getenv("RESOLVE_ACCOUNT", default=False)) WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "dbus://") if DEBUG: logging.getLogger().setLevel(logging.DEBUG) else: logging.getLogger().setLevel(logging.INFO) # Ensure slack URLs use ?blocks=yes if "slack.com" in WEBHOOK_URL: scheme, netloc, path, query_string, fragment = urllib.parse.urlsplit( WEBHOOK_URL) query_params = urllib.parse.parse_qs(query_string) query_params["blocks"] = ["yes"] new_query_string = urllib.parse.urlencode(query_params, doseq=True) WEBHOOK_URL = urllib.parse.urlunsplit( (scheme, netloc, path, new_query_string, fragment) ) # Setup apprise asset = apprise.AppriseAsset() # Set our app_id which is also used as User-Agent asset.app_desc = "SNSAlertHub part of ZeroDownTime CloudBender" asset.app_url = "https://zero-downtime.net" asset.image_url_mask = ( "https://cdn.zero-downtime.net/assets/zdt/apprise/{TYPE}-{XY}{EXTENSION}" ) asset.app_id = "{} / {} {}".format("cloudbender", __version__, "zero-downtime.net") apobj = apprise.Apprise(asset=asset) apobj.add(WEBHOOK_URL) def get_alias(account_id): """resolves aws account_id to account alias and caches for lifetime of lambda function""" if RESOLVE_ACCOUNT: try: if account_id not in account_aliases: iam = boto3.client("iam") account_aliases[account_id] = iam.list_account_aliases()[ "AccountAliases" ][0] return account_aliases[account_id] except (KeyError, IndexError): logger.warning("Could not resolve IAM account alias") pass return account_id def handler(event, context): logger.debug(json.dumps({"aws.event": event})) # sort_keys=True, indent=4 (region, account_id) = context.invoked_function_arn.split(":")[3:5] sns = event["Records"][0]["Sns"] # Guess what we have, try to parse as json first try: msg = json.loads(sns["Message"]) except json.decoder.JSONDecodeError: msg = {} pass body = "" title = "" msg_type = apprise.NotifyType.INFO # CloudWatch Alarm ? if "AlarmName" in msg: title = "AWS Cloudwatch Alarm" # Discard NewStateValue == OK && OldStateValue == INSUFFICIENT_DATA as these are triggered by installing new Alarms and only cause confusion if msg["NewStateValue"] == "OK" and msg["OldStateValue"] == "INSUFFICIENT_DATA": logger.info( "Discarding Cloudwatch Metrics Alarm as state is OK and previous state was insufficient data, most likely new alarm being installed" ) return 0 body = msg["AlarmDescription"] msg_context = "{account} - {region} -> ".format( region=region, alarm=msg["AlarmName"], account=get_alias(msg["AWSAccountId"]), ) msg_type = apprise.NotifyType.WARNING try: notify_map = { "ok": apprise.NotifyType.SUCCESS, "alarm": apprise.NotifyType.FAILURE, "insuffcient_data": apprise.NotifyType.INFO, } msg_type = notify_map[msg["NewStateValue"].lower()] # Reduce severtity for CPUCredit Alarms to Warning if msg_type == apprise.NotifyType.FAILURE: if msg["Trigger"]["MetricName"] == "CPUSurplusCreditBalance": msg_type = apprise.NotifyType.WARNING except KeyError: pass body = body + "\n\n_{}_".format(msg_context) elif "Source" in msg and msg["Source"] == "CloudBender": title = "AWS EC2 - CloudBender" try: msg_context = "{account} - {region} - {host} ({instance}) ".format( account=get_alias(msg["AWSAccountId"]), region=msg["Region"], asg=msg["Asg"], instance=msg["Instance"], host=msg["Hostname"], artifact=msg["Artifact"], ) except KeyError: msg_context = "{account} - {region}".format( account=get_alias(msg["AWSAccountId"]), region=msg["Region"] ) msg_type = apprise.NotifyType.WARNING try: notify_map = { "warning": apprise.NotifyType.WARNING, "error": apprise.NotifyType.FAILURE, "info": apprise.NotifyType.INFO, "success": apprise.NotifyType.SUCCESS, } msg_type = notify_map[msg["Level"].lower()] except KeyError: pass if "Subject" in msg and msg["Subject"]: title = msg["Subject"] body = "" if "Message" in msg and msg["Message"]: body = msg["Message"] if "Attachment" in msg and msg["Attachment"]: body = body + "\n```{}```".format(msg["Attachment"]) body = body + "\n\n_{}_".format(msg_context) elif "receiver" in msg and msg["receiver"] == "alerthub-notifications": for alert in msg["alerts"]: # First msg_type msg_type = apprise.NotifyType.WARNING try: if alert["status"] == "resolved": msg_type = apprise.NotifyType.SUCCESS else: if alert["labels"]["severity"] == "critical": msg_type = apprise.NotifyType.FAILURE except KeyError: pass # set title to Alertname try: title = alert["labels"]["alertname"] except KeyError: title = "Alertmanager" # assemble message body try: body = "{}\n{}".format( alert["annotations"]["summary"], alert["annotations"]["description"] ) if alert["status"] == "resolved": body = body + "\nDuration: {}".format( humanize.time.precisedelta( dateutil.parser.parse(alert["startsAt"]) - dateutil.parser.parse(alert["endsAt"]) ) ) else: if "runbook_url" in alert["annotations"]: body = body + " <{}|Runbook>".format( alert["annotations"]["runbook_url"] ) if "generatorURL" in alert["annotations"]: body = body + " <{}|Source>".format( alert["annotations"]["generatorURL"] ) except KeyError: body = "Unknown Alert:\n{}".format(alert) try: msg_context = "{account} - {region} - <{alert_manager_link}/#/alerts?receiver=alerthub-notifications|{cluster}>".format( cluster=alert["labels"]["clusterName"], region=alert["labels"]["awsRegion"], account=get_alias(alert["labels"]["awsAccount"]), alert_manager_link=msg["externalURL"], ) body = body + "\n\n_{}_".format(msg_context) except KeyError: pass # ElasticCache snapshot notifications elif "ElastiCache:SnapshotComplete" in msg: title = "ElastiCache Snapshot complete." body = "Snapshot taken on {}".format( msg["ElastiCache:SnapshotComplete"]) # ElasticCache replacement notifications elif "ElastiCache:NodeReplacementScheduled" in msg: title = "ElastiCache node replacement scheduled" body = "{} will be replaced between {} and {}".format( msg["ElastiCache:NodeReplacementScheduled"], msg["Start Time"], msg["End Time"]) # ElasticCache replacement notifications elif "ElastiCache:CacheNodeReplaceStarted" in msg: title = "ElastiCache fail over stareted" body = "for node {}".format(msg["ElastiCache:CacheNodeReplaceStarted"]) # ElasticCache replacement notifications elif "ElastiCache:FailoverComplete" in msg: title = "ElastiCache fail over complete" body = "for node {}".format(msg["ElastiCache:FailoverComplete"]) # RDS events elif "Event Source" in msg and msg['Event Source'] == "db-instance": try: title = msg["Event Message"] try: name = " ({}).".format( msg["Tags"]["aws:cloudformation:stack-name"]) except (KeyError, IndexError): name = "" body = "RDS Instance: <{}|{}>{}\n<{}|Event docs>".format( msg["Identifier Link"], msg["Source ID"], name, msg["Event ID"]) except KeyError: msg_type = apprise.NotifyType.WARNING body = sns["Message"] pass # Basic ASG events elif "Event" in msg and msg["Event"] in ["autoscaling:EC2_INSTANCE_TERMINATE", "autoscaling:EC2_INSTANCE_LAUNCH"]: title = msg["Description"] body = msg["Cause"] try: msg_context = "{account} - {region} - ".format( region=region, account=get_alias(msg["AccountId"]), asg=msg["AutoScalingGroupName"], ) body = body + "\n\n_{}_".format(msg_context) except KeyError: pass else: title = "Unknown message type" msg_type = apprise.NotifyType.WARNING body = sns["Message"] apobj.notify(body=body, title=title, notify_type=msg_type)