Add support for ElastiCache snapshot notifications, minor code reorg

This commit is contained in:
Stefan Reimer 2023-05-16 14:01:12 +01:00
parent e2d2a3bb89
commit da11514c5f
2 changed files with 16 additions and 9 deletions

View File

@ -3,6 +3,10 @@
## Abstract ## Abstract
AWS SNS/Lambda central alert hub taking SNS messages, parsing and formatting them before sending them to any messaging service, like Slack, Matrix, etc AWS SNS/Lambda central alert hub taking SNS messages, parsing and formatting them before sending them to any messaging service, like Slack, Matrix, etc
## Tests
All env variables are forwarded into the test container.
Simply set WEBHOOK_URL accordingly before running `make test`.
## Resources ## Resources
- https://gallery.ecr.aws/zero-downtime/sns-alert-hub - https://gallery.ecr.aws/zero-downtime/sns-alert-hub
- https://github.com/caronc/apprise - https://github.com/caronc/apprise

21
app.py
View File

@ -96,6 +96,10 @@ def handler(event, context):
msg = {} msg = {}
pass pass
body = ""
title = ""
msg_type = apprise.NotifyType.INFO
# CloudWatch Alarm ? # CloudWatch Alarm ?
if "AlarmName" in msg: if "AlarmName" in msg:
title = "AWS Cloudwatch Alarm" title = "AWS Cloudwatch Alarm"
@ -133,7 +137,6 @@ def handler(event, context):
pass pass
body = body + "\n\n_{}_".format(msg_context) body = body + "\n\n_{}_".format(msg_context)
apobj.notify(body=body, title=title, notify_type=msg_type)
elif "Source" in msg and msg["Source"] == "CloudBender": elif "Source" in msg and msg["Source"] == "CloudBender":
title = "AWS EC2 - CloudBender" title = "AWS EC2 - CloudBender"
@ -175,7 +178,6 @@ def handler(event, context):
body = body + "\n```{}```".format(msg["Attachment"]) body = body + "\n```{}```".format(msg["Attachment"])
body = body + "\n\n_{}_".format(msg_context) body = body + "\n\n_{}_".format(msg_context)
apobj.notify(body=body, title=title, notify_type=msg_type)
elif "receiver" in msg and msg["receiver"] == "alerthub-notifications": elif "receiver" in msg and msg["receiver"] == "alerthub-notifications":
@ -234,13 +236,14 @@ def handler(event, context):
except KeyError: except KeyError:
pass pass
# Finally send each parsed alert # New simple ElasticCache notifications
apobj.notify(body=body, title=title, notify_type=msg_type) elif "ElastiCache:SnapshotComplete" in msg:
title = "ElastiCache Snapshot complete."
body = "Taken on {}".format(msg["ElasticCache:SnapshotComplete"])
else: else:
title = "Unknown message type"
msg_type = apprise.NotifyType.WARNING
body = sns["Message"] body = sns["Message"]
apobj.notify(
body=body, apobj.notify(body=body, title=title, notify_type=msg_type)
title="Unknown message type",
notify_type=apprise.NotifyType.WARNING,
)