From 44818f4c9a8609c964207408c86ac3483aa17b57 Mon Sep 17 00:00:00 2001 From: Stefan Reimer Date: Tue, 23 Jul 2019 15:24:36 +0000 Subject: [PATCH] Add ENI metadata augmentation to VPC Flowlog parsing --- CHANGES.md | 3 +++ index.py | 37 ++++++++++++++++++++++++++++++++++++- tests/test_parse.py | 4 ++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 7186a68..383af65 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,8 @@ # Changelog +## 0.9.6 +- Augment VPC FlowLogs with ENI metadata incl. global cache + ## 0.9.5 - Added support for VPC FlowLogs diff --git a/index.py b/index.py index adfc218..75b5155 100644 --- a/index.py +++ b/index.py @@ -17,11 +17,14 @@ import boto3 __author__ = "Stefan Reimer" __author_email__ = "stefan@zero-downtime.net" -__version__ = "0.9.5" +__version__ = "0.9.6" # Global alias lookup cache account_aliases = {} +# Global eni lookup cache +enis = {} + logger = logging.getLogger(__name__) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger('boto3').setLevel(logging.WARNING) @@ -47,6 +50,7 @@ CHUNK_SIZE = 128 DEBUG = boolean(os.getenv('DEBUG', default=False)) TEST = boolean(os.getenv('TEST', default=False)) RESOLVE_ACCOUNT = boolean(os.getenv('RESOLVE_ACCOUNT', default=True)) +RESOLVE_ENI = boolean(os.getenv('RESOLVE_ENI', default=True)) if DEBUG: logging.getLogger().setLevel(logging.DEBUG) @@ -95,6 +99,33 @@ def get_source(region, account_id): return source +def get_eni_data(eni): + """ returns additional ENI properties + and caches for lifetime of lambda function + """ + global RESOLVE_ENI + if RESOLVE_ENI and not TEST: + try: + if eni not in enis: + ec2 = boto3.client('ec2') + interface_iter = ec2.get_paginator('describe_network_interfaces').paginate() + for response in interface_iter: + for interface in response['NetworkInterfaces']: + enis[interface['NetworkInterfaceId']] = {'eni.az': interface['AvailabilityZone'], + 'eni.subnet': interface['SubnetId']} + if 'Association' in interface and 'PublicIp' in interface['Association']: + enis[interface['NetworkInterfaceId']]['eni.public_ip'] = interface['Association']['PublicIp'] + + return enis[eni] + + except(KeyError, IndexError): + logger.warning("Could not get additional data for ENI {}".format(eni)) + RESOLVE_ENI = False + pass + + return {} + + class Queue: url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=''), scheme='https') passwd = os.getenv('FLUENT_SHARED_KEY', default=None) @@ -291,6 +322,10 @@ def handler(event, context): parsed = {'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7], 'packets': row[8], 'bytes': row[9], 'start': row[10], 'end': row[11], 'action': row[12], 'log-status': row[13]} + eni_metadata = get_eni_data(parsed['interface-id']) + if eni_metadata: + parsed.update(eni_metadata) + # Fallback add raw message else: event['message'] = e['message'] diff --git a/tests/test_parse.py b/tests/test_parse.py index b37e80a..a2570b7 100755 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -29,6 +29,10 @@ def test_parse(): event = json.loads('{"awslogs": {"data": "H4sICILSAl0AA3Rlc3QArVNdb9tGEHzvr7gSfWgB09z7vuObHDOuCpsqKAZoawnGkTy5RChSJY9N0iD/vSslLew2BlykLwSxu9idmZt5H+39NLl7X747+CiNLhfl4u4mW68XV1l0Fg1vej9imTIupNLGAmVY7ob7q3GYD9hJ3Jsp6dy+alwS/BTi3dzXoR36j2PrMHq3xzkG1CbAE8qT22+uF2W2LresrirHgVPhjagtOLPbWa00B23BUIkrprma6rE9HDe+bLvgxylKb6MX3TA3F75v/Hh3c78P0fZ0Lfvd9+E48D5qGzzKhWIMJAjJubVCAWgtlJLGaMmUEUpaC0IwARKPWsus5oIKgYdDi8IEt0eOVEomQQmOy8zZX4Lh+kVdD3Mf2iYlDwXa9NGHsy9DIJ+JoHiVkx9WF2SVk/L7IltcEiqAa4VfvCE4I7fhV3yBJj4hTYne/g/o1DPR3S7zl6vtJhzfPgYeU15SmwqdUnXOmP4FO05UNegq9sL6WNQUYsu9iiuEpAWvtdF6E9zhcN4Nrrn76DOyG4c9mXiaJBOv5vq1D8lD/R/7MKmHftfef5a2lpodoQOj1OJBJqnmilrKjZW4EJSmqAhVlINU/AnauMM+op0Vxar4m7c68dYpQCr5OQdA3sY606D1YwM7HgvXNLFzsIsVBTBm58EJ5F1k19linZHTvpRs+vnQuODJ6DvvJj8ln35Ii0F4m276cnS1r1z9mny7H6aAgzUGgtSu60jnpvDdvzWQQA1XqJwSRmlJgTF6RMHQEJYpqpg1EsCiKFZrgCc0MALgUTSuj278maBB82V+RRY5WebrcpG/yL4m2U/LEoufexIJDP3LjeRSCkXx+Q1DhdHOGAoNGijnWgqBdmQY1KdygnAePUmWX5LC/zbj4BLT+izbfTm6p3LyT3RF9uOqKP8zwHA5j+5ocZynhp5LRfbTJly0Xecb8qDJALBDNuHG74fxHVm3f3gs4/zNBRbdW/Kp8WryeNzqU/0owPbDV38Ce70icCIGAAA=" } }') index.handler(event, context) + # {"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"vpcflowlog","logStream":"eni-123","subscriptionFilters":["CloudBender_Mgmt"],"logEvents":[{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.0.227 10.3.9.111 443 52866 6 2 135 1563806853 1563806911 ACCEPT OK"}]} + event = json.loads('{"awslogs": {"data": "H4sIAC4iN10AAzWPTWsCMRCG7/0VIWcrSeYjibet3Xoo0oLeioirqSzsF7urUsT/3rFFmMPwvDPvzHvVdRqG3TGtf7qkZ/o1W2fbZb5aZYtcT3R7aVIv2DpAYh+isU5w1R4XfXvqRDl3+++qvQj556uxT7tahNSUz7ImdDgVw74vu7Fsm7eyGlM/6NmXnlft6fCSmkPqt8tjPerNn0F+Ts14H7jq8iA+gOycIYMEECOyMd4jM4XgyXFAphgNokNDHkyMLnpAiyiHx1LCjbta/rREjgwjiFmYPEKLvVMefQBGijaQun9tAAJxsXfGFuA9FMqaqZSZOufvPUzj1FqrEEGRC8yKlVMWSFliCIYDwaONMpfN5/nnWn2869vm9vQL/xeu7nMBAAA=" } }') + index.handler(event, context) + # Cloudfront Access Logs via S3 event = json.loads('{ "Records": [ { "eventVersion": "2.0", "eventTime": "1970-01-01T00:00:00.000Z", "requestParameters": { "sourceIPAddress": "127.0.0.1" }, "s3": { "configurationId": "testConfigRule", "object": { "eTag": "0123456789abcdef0123456789abcdef", "sequencer": "0A1B2C3D4E5F678901", "key": "tests/test_cloudfront.gz", "size": 1024 }, "bucket": { "arn": "arn:aws:s3:::mybucket", "name": "file://", "ownerIdentity": { "principalId": "EXAMPLE" } }, "s3SchemaVersion": "1.0" }, "responseElements": { "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", "x-amz-request-id": "EXAMPLE123456789" }, "awsRegion": "us-east-1", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "EXAMPLE" }, "eventSource": "aws:s3" } ] }') index.handler(event, context)