diff --git a/CHANGES.md b/CHANGES.md index 383af65..705f964 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changelog +## 0.9.7 +- Fix for ALB AccessLog parser to handle spaces in request_url +- Improved VPC FlowLog metadata augmentation + ## 0.9.6 - Augment VPC FlowLogs with ENI metadata incl. global cache diff --git a/index.py b/index.py index 75b5155..6315dd1 100644 --- a/index.py +++ b/index.py @@ -17,14 +17,17 @@ import boto3 __author__ = "Stefan Reimer" __author_email__ = "stefan@zero-downtime.net" -__version__ = "0.9.6" +__version__ = "0.9.7" -# Global alias lookup cache +# IAM Alias lookup cache account_aliases = {} -# Global eni lookup cache +# ENI lookup cache enis = {} +# IP lookup cache +ips = {} + logger = logging.getLogger(__name__) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger('boto3').setLevel(logging.WARNING) @@ -50,7 +53,7 @@ CHUNK_SIZE = 128 DEBUG = boolean(os.getenv('DEBUG', default=False)) TEST = boolean(os.getenv('TEST', default=False)) RESOLVE_ACCOUNT = boolean(os.getenv('RESOLVE_ACCOUNT', default=True)) -RESOLVE_ENI = boolean(os.getenv('RESOLVE_ENI', default=True)) +ENHANCE_FLOWLOG = boolean(os.getenv('ENHANCE_FLOWLOG', default=True)) if DEBUG: logging.getLogger().setLevel(logging.DEBUG) @@ -99,31 +102,64 @@ def get_source(region, account_id): return source -def get_eni_data(eni): - """ returns additional ENI properties - and caches for lifetime of lambda function +def add_flow_metadata(flow): + """ adds metadata to VPC flow: ENI, direction, type + caches the ENI and IP lookup tables for Lambda lifetime """ - global RESOLVE_ENI - if RESOLVE_ENI and not TEST: + global ENHANCE_FLOWLOG + if ENHANCE_FLOWLOG and not TEST: try: - if eni not in enis: + # Check cache and update if missed with all ENIs in one go + if flow['interface-id'] not in enis: ec2 = boto3.client('ec2') interface_iter = ec2.get_paginator('describe_network_interfaces').paginate() for response in interface_iter: for interface in response['NetworkInterfaces']: - enis[interface['NetworkInterfaceId']] = {'eni.az': interface['AvailabilityZone'], - 'eni.subnet': interface['SubnetId']} - if 'Association' in interface and 'PublicIp' in interface['Association']: - enis[interface['NetworkInterfaceId']]['eni.public_ip'] = interface['Association']['PublicIp'] + # Lookup table keyed bt ENI ID + enis[interface['NetworkInterfaceId']] = interface - return enis[eni] + # Lookup table by IP to classify traffic + ips[interface['PrivateIpAddress']] = interface + + eni = enis[flow['interface-id']] + metadata = {'eni.az': eni['AvailabilityZone'], + 'eni.sg': eni['Groups'][0]['GroupName'], + 'eni.subnet': eni['SubnetId']} + + # Add PublicIP if attached + if 'Association' in eni and 'PublicIp' in eni['Association']: + metadata['eni.public_ip'] = eni['Association']['PublicIp'] + + # Determine traffic direction + if eni['PrivateIpAddress'] == flow['srcaddr']: + metadata['direction'] = 'Out' + remote_ip = flow['dstaddr'] + elif eni['PrivateIpAddress'] == flow['dstaddr']: + metadata['direction'] = 'In' + remote_ip = flow['srcaddr'] + + # Try to classify traffic: + # Free,Regional,Out + if remote_ip in ips: + if ips[remote_ip]['AvailabilityZone'] == eni['AvailabilityZone'] and ips[remote_ip]['VpcId'] == eni['VpcId']: + metadata['traffic_class'] = 'Free' + else: + metadata['traffic_class'] = 'Regional' + else: + # Incoming traffic is free 90% of times + if metadata['direction'] == 'In': + metadata['traffic_class'] = 'Free' + else: + metadata['traffic_class'] = 'Out' + + flow.update(metadata) except(KeyError, IndexError): - logger.warning("Could not get additional data for ENI {}".format(eni)) - RESOLVE_ENI = False + logger.warning("Could not get additional data for ENI {}".format(flow['interface-id'])) + ENHANCE_FLOWLOG = False pass - return {} + return flow class Queue: @@ -319,12 +355,8 @@ def handler(event, context): if row[13] == 'NODATA': continue - parsed = {'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7], - 'packets': row[8], 'bytes': row[9], 'start': row[10], 'end': row[11], 'action': row[12], 'log-status': row[13]} - - eni_metadata = get_eni_data(parsed['interface-id']) - if eni_metadata: - parsed.update(eni_metadata) + parsed = add_flow_metadata({'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7], + 'packets': row[8], 'bytes': row[9], 'start': row[10], 'end': row[11], 'action': row[12], 'log-status': row[13]}) # Fallback add raw message else: @@ -364,7 +396,7 @@ def handler(event, context): source = get_source(region, account_id) source['s3_url'] = '{}/{}'.format(bucket, key) - alb_regex = re.compile(r"(?P[^ ]*) (?P[^ ]*) (?P[^ ]*) (?P[^ ]*):(?P[0-9]*) (?P[^ ]*)[:-](?P[0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P|[-0-9]*) (?P-|[-0-9]*) (?P[-0-9]*) (?P[-0-9]*) \"(?P[^ ]*) (?P[^ ]*) (?P- |[^ ]*)\" \"(?P[^\"]*)\" (?P[A-Z0-9-]+) (?P[A-Za-z0-9.-]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^\"]*)\" \"(?P[^\"]*)\" (?P[-.0-9]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^ ]*)\" \"(?P[^ ]*)\"") + alb_regex = re.compile(r"(?P[^ ]*) (?P[^ ]*) (?P[^ ]*) (?P[^ ]*):(?P[0-9]*) (?P[^ ]*)[:-](?P[0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P|[-0-9]*) (?P-|[-0-9]*) (?P[-0-9]*) (?P[-0-9]*) \"(?P[^ ]*) (?P[^\"]*) (?P- |[^ ]*)\" \"(?P[^\"]*)\" (?P[A-Z0-9-]+) (?P[A-Za-z0-9.-]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^\"]*)\" \"(?P[^\"]*)\" (?P[-.0-9]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^ ]*)\" \"(?P[^ ]*)\"") # try to identify file type by looking at first lines with gzip.open(file_path, mode='rt', newline='\n') as data: diff --git a/tests/test_parse.py b/tests/test_parse.py index a2570b7..de7a052 100755 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -29,8 +29,8 @@ def test_parse(): event = json.loads('{"awslogs": {"data": "H4sICILSAl0AA3Rlc3QArVNdb9tGEHzvr7gSfWgB09z7vuObHDOuCpsqKAZoawnGkTy5RChSJY9N0iD/vSslLew2BlykLwSxu9idmZt5H+39NLl7X747+CiNLhfl4u4mW68XV1l0Fg1vej9imTIupNLGAmVY7ob7q3GYD9hJ3Jsp6dy+alwS/BTi3dzXoR36j2PrMHq3xzkG1CbAE8qT22+uF2W2LresrirHgVPhjagtOLPbWa00B23BUIkrprma6rE9HDe+bLvgxylKb6MX3TA3F75v/Hh3c78P0fZ0Lfvd9+E48D5qGzzKhWIMJAjJubVCAWgtlJLGaMmUEUpaC0IwARKPWsus5oIKgYdDi8IEt0eOVEomQQmOy8zZX4Lh+kVdD3Mf2iYlDwXa9NGHsy9DIJ+JoHiVkx9WF2SVk/L7IltcEiqAa4VfvCE4I7fhV3yBJj4hTYne/g/o1DPR3S7zl6vtJhzfPgYeU15SmwqdUnXOmP4FO05UNegq9sL6WNQUYsu9iiuEpAWvtdF6E9zhcN4Nrrn76DOyG4c9mXiaJBOv5vq1D8lD/R/7MKmHftfef5a2lpodoQOj1OJBJqnmilrKjZW4EJSmqAhVlINU/AnauMM+op0Vxar4m7c68dYpQCr5OQdA3sY606D1YwM7HgvXNLFzsIsVBTBm58EJ5F1k19linZHTvpRs+vnQuODJ6DvvJj8ln35Ii0F4m276cnS1r1z9mny7H6aAgzUGgtSu60jnpvDdvzWQQA1XqJwSRmlJgTF6RMHQEJYpqpg1EsCiKFZrgCc0MALgUTSuj278maBB82V+RRY5WebrcpG/yL4m2U/LEoufexIJDP3LjeRSCkXx+Q1DhdHOGAoNGijnWgqBdmQY1KdygnAePUmWX5LC/zbj4BLT+izbfTm6p3LyT3RF9uOqKP8zwHA5j+5ocZynhp5LRfbTJly0Xecb8qDJALBDNuHG74fxHVm3f3gs4/zNBRbdW/Kp8WryeNzqU/0owPbDV38Ce70icCIGAAA=" } }') index.handler(event, context) - # {"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"vpcflowlog","logStream":"eni-123","subscriptionFilters":["CloudBender_Mgmt"],"logEvents":[{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.0.227 10.3.9.111 443 52866 6 2 135 1563806853 1563806911 ACCEPT OK"}]} - event = json.loads('{"awslogs": {"data": "H4sIAC4iN10AAzWPTWsCMRCG7/0VIWcrSeYjibet3Xoo0oLeioirqSzsF7urUsT/3rFFmMPwvDPvzHvVdRqG3TGtf7qkZ/o1W2fbZb5aZYtcT3R7aVIv2DpAYh+isU5w1R4XfXvqRDl3+++qvQj556uxT7tahNSUz7ImdDgVw74vu7Fsm7eyGlM/6NmXnlft6fCSmkPqt8tjPerNn0F+Ts14H7jq8iA+gOycIYMEECOyMd4jM4XgyXFAphgNokNDHkyMLnpAiyiHx1LCjbta/rREjgwjiFmYPEKLvVMefQBGijaQun9tAAJxsXfGFuA9FMqaqZSZOufvPUzj1FqrEEGRC8yKlVMWSFliCIYDwaONMpfN5/nnWn2869vm9vQL/xeu7nMBAAA=" } }') + # {"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"vpcflowlog","logStream":"eni-123","subscriptionFilters":["CloudBender_Mgmt"],"logEvents":[{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.0.227 10.3.9.111 443 52866 6 2 135 1563806853 1563806911 ACCEPT OK"},{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.9.48 10.10.0.227 24224 17234 6 10 1947 1563827256 1563827316 ACCEPT OK"}]} + event = json.loads('{"awslogs": {"data": "H4sIAHN1N10AA82R3UoDMRCF732KkOu6JPOXpHe1rr2QotDeiUh/Ylno7pbdrSLFd3eqFPQNhFwczkzOzMecbJ37frXLy49DtmN7O1lOXublYjGZlXZk2/cmd2p7QGIJMTkPau/b3axrjwetvB02r/v2XZ0ffzF0eVVrITfVtX5Ttz+u+01XHYaqbe6q/ZC73o6f7HTfHrc3udnm7mW+qwf7/B1QvuVmODecbLXVHCQBcOyIEVMicS4EEuEYA4NEEk7JEQE5DuhSghSQPJEOHiqFG1a17umZgZ0QalgcXaA1HkygEFGIk49szls7xMiy3oDzawwB18a7Qp8rAMJZY5EK770hQsMQRYwYMB7ZeBaMTiLjRSbtm0yn5ePSPNzbz9G/o0oFxT+AQABkfNCTK5d3xicKPzgQgOUi0ctvsufPqy8JYO/9TQIAAA==" } }') index.handler(event, context) # Cloudfront Access Logs via S3