Fix for ALB access log parser, improved vpc flowlog augmentation

This commit is contained in:
Stefan Reimer 2019-07-28 11:34:05 +00:00
parent 44818f4c9a
commit 2066e2ba11
3 changed files with 63 additions and 27 deletions

View File

@ -1,5 +1,9 @@
# Changelog # Changelog
## 0.9.7
- Fix for ALB AccessLog parser to handle spaces in request_url
- Improved VPC FlowLog metadata augmentation
## 0.9.6 ## 0.9.6
- Augment VPC FlowLogs with ENI metadata incl. global cache - Augment VPC FlowLogs with ENI metadata incl. global cache

View File

@ -17,14 +17,17 @@ import boto3
__author__ = "Stefan Reimer" __author__ = "Stefan Reimer"
__author_email__ = "stefan@zero-downtime.net" __author_email__ = "stefan@zero-downtime.net"
__version__ = "0.9.6" __version__ = "0.9.7"
# Global alias lookup cache # IAM Alias lookup cache
account_aliases = {} account_aliases = {}
# Global eni lookup cache # ENI lookup cache
enis = {} enis = {}
# IP lookup cache
ips = {}
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger('boto3').setLevel(logging.WARNING) logging.getLogger('boto3').setLevel(logging.WARNING)
@ -50,7 +53,7 @@ CHUNK_SIZE = 128
DEBUG = boolean(os.getenv('DEBUG', default=False)) DEBUG = boolean(os.getenv('DEBUG', default=False))
TEST = boolean(os.getenv('TEST', default=False)) TEST = boolean(os.getenv('TEST', default=False))
RESOLVE_ACCOUNT = boolean(os.getenv('RESOLVE_ACCOUNT', default=True)) RESOLVE_ACCOUNT = boolean(os.getenv('RESOLVE_ACCOUNT', default=True))
RESOLVE_ENI = boolean(os.getenv('RESOLVE_ENI', default=True)) ENHANCE_FLOWLOG = boolean(os.getenv('ENHANCE_FLOWLOG', default=True))
if DEBUG: if DEBUG:
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
@ -99,31 +102,64 @@ def get_source(region, account_id):
return source return source
def get_eni_data(eni): def add_flow_metadata(flow):
""" returns additional ENI properties """ adds metadata to VPC flow: ENI, direction, type
and caches for lifetime of lambda function caches the ENI and IP lookup tables for Lambda lifetime
""" """
global RESOLVE_ENI global ENHANCE_FLOWLOG
if RESOLVE_ENI and not TEST: if ENHANCE_FLOWLOG and not TEST:
try: try:
if eni not in enis: # Check cache and update if missed with all ENIs in one go
if flow['interface-id'] not in enis:
ec2 = boto3.client('ec2') ec2 = boto3.client('ec2')
interface_iter = ec2.get_paginator('describe_network_interfaces').paginate() interface_iter = ec2.get_paginator('describe_network_interfaces').paginate()
for response in interface_iter: for response in interface_iter:
for interface in response['NetworkInterfaces']: for interface in response['NetworkInterfaces']:
enis[interface['NetworkInterfaceId']] = {'eni.az': interface['AvailabilityZone'], # Lookup table keyed bt ENI ID
'eni.subnet': interface['SubnetId']} enis[interface['NetworkInterfaceId']] = interface
if 'Association' in interface and 'PublicIp' in interface['Association']:
enis[interface['NetworkInterfaceId']]['eni.public_ip'] = interface['Association']['PublicIp']
return enis[eni] # Lookup table by IP to classify traffic
ips[interface['PrivateIpAddress']] = interface
eni = enis[flow['interface-id']]
metadata = {'eni.az': eni['AvailabilityZone'],
'eni.sg': eni['Groups'][0]['GroupName'],
'eni.subnet': eni['SubnetId']}
# Add PublicIP if attached
if 'Association' in eni and 'PublicIp' in eni['Association']:
metadata['eni.public_ip'] = eni['Association']['PublicIp']
# Determine traffic direction
if eni['PrivateIpAddress'] == flow['srcaddr']:
metadata['direction'] = 'Out'
remote_ip = flow['dstaddr']
elif eni['PrivateIpAddress'] == flow['dstaddr']:
metadata['direction'] = 'In'
remote_ip = flow['srcaddr']
# Try to classify traffic:
# Free,Regional,Out
if remote_ip in ips:
if ips[remote_ip]['AvailabilityZone'] == eni['AvailabilityZone'] and ips[remote_ip]['VpcId'] == eni['VpcId']:
metadata['traffic_class'] = 'Free'
else:
metadata['traffic_class'] = 'Regional'
else:
# Incoming traffic is free 90% of times
if metadata['direction'] == 'In':
metadata['traffic_class'] = 'Free'
else:
metadata['traffic_class'] = 'Out'
flow.update(metadata)
except(KeyError, IndexError): except(KeyError, IndexError):
logger.warning("Could not get additional data for ENI {}".format(eni)) logger.warning("Could not get additional data for ENI {}".format(flow['interface-id']))
RESOLVE_ENI = False ENHANCE_FLOWLOG = False
pass pass
return {} return flow
class Queue: class Queue:
@ -319,12 +355,8 @@ def handler(event, context):
if row[13] == 'NODATA': if row[13] == 'NODATA':
continue continue
parsed = {'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7], parsed = add_flow_metadata({'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7],
'packets': row[8], 'bytes': row[9], 'start': row[10], 'end': row[11], 'action': row[12], 'log-status': row[13]} 'packets': row[8], 'bytes': row[9], 'start': row[10], 'end': row[11], 'action': row[12], 'log-status': row[13]})
eni_metadata = get_eni_data(parsed['interface-id'])
if eni_metadata:
parsed.update(eni_metadata)
# Fallback add raw message # Fallback add raw message
else: else:
@ -364,7 +396,7 @@ def handler(event, context):
source = get_source(region, account_id) source = get_source(region, account_id)
source['s3_url'] = '{}/{}'.format(bucket, key) source['s3_url'] = '{}/{}'.format(bucket, key)
alb_regex = re.compile(r"(?P<type>[^ ]*) (?P<timestamp>[^ ]*) (?P<elb>[^ ]*) (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*) (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*) (?P<request_processing_time>[-.0-9]*) (?P<target_processing_time>[-.0-9]*) (?P<response_processing_time>[-.0-9]*) (?P<elb_status_code>|[-0-9]*) (?P<target_status_code>-|[-0-9]*) (?P<received_bytes>[-0-9]*) (?P<sent_bytes>[-0-9]*) \"(?P<request_verb>[^ ]*) (?P<request_url>[^ ]*) (?P<request_proto>- |[^ ]*)\" \"(?P<user_agent>[^\"]*)\" (?P<ssl_cipher>[A-Z0-9-]+) (?P<ssl_protocol>[A-Za-z0-9.-]*) (?P<target_group_arn>[^ ]*) \"(?P<trace_id>[^\"]*)\" \"(?P<domain_name>[^\"]*)\" \"(?P<chosen_cert_arn>[^\"]*)\" (?P<matched_rule_priority>[-.0-9]*) (?P<request_creation_time>[^ ]*) \"(?P<actions_executed>[^\"]*)\" \"(?P<redirect_url>[^ ]*)\" \"(?P<error_reason>[^ ]*)\"") alb_regex = re.compile(r"(?P<type>[^ ]*) (?P<timestamp>[^ ]*) (?P<elb>[^ ]*) (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*) (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*) (?P<request_processing_time>[-.0-9]*) (?P<target_processing_time>[-.0-9]*) (?P<response_processing_time>[-.0-9]*) (?P<elb_status_code>|[-0-9]*) (?P<target_status_code>-|[-0-9]*) (?P<received_bytes>[-0-9]*) (?P<sent_bytes>[-0-9]*) \"(?P<request_verb>[^ ]*) (?P<request_url>[^\"]*) (?P<request_proto>- |[^ ]*)\" \"(?P<user_agent>[^\"]*)\" (?P<ssl_cipher>[A-Z0-9-]+) (?P<ssl_protocol>[A-Za-z0-9.-]*) (?P<target_group_arn>[^ ]*) \"(?P<trace_id>[^\"]*)\" \"(?P<domain_name>[^\"]*)\" \"(?P<chosen_cert_arn>[^\"]*)\" (?P<matched_rule_priority>[-.0-9]*) (?P<request_creation_time>[^ ]*) \"(?P<actions_executed>[^\"]*)\" \"(?P<redirect_url>[^ ]*)\" \"(?P<error_reason>[^ ]*)\"")
# try to identify file type by looking at first lines # try to identify file type by looking at first lines
with gzip.open(file_path, mode='rt', newline='\n') as data: with gzip.open(file_path, mode='rt', newline='\n') as data:

View File

@ -29,8 +29,8 @@ def test_parse():
event = json.loads('{"awslogs": {"data": "H4sICILSAl0AA3Rlc3QArVNdb9tGEHzvr7gSfWgB09z7vuObHDOuCpsqKAZoawnGkTy5RChSJY9N0iD/vSslLew2BlykLwSxu9idmZt5H+39NLl7X747+CiNLhfl4u4mW68XV1l0Fg1vej9imTIupNLGAmVY7ob7q3GYD9hJ3Jsp6dy+alwS/BTi3dzXoR36j2PrMHq3xzkG1CbAE8qT22+uF2W2LresrirHgVPhjagtOLPbWa00B23BUIkrprma6rE9HDe+bLvgxylKb6MX3TA3F75v/Hh3c78P0fZ0Lfvd9+E48D5qGzzKhWIMJAjJubVCAWgtlJLGaMmUEUpaC0IwARKPWsus5oIKgYdDi8IEt0eOVEomQQmOy8zZX4Lh+kVdD3Mf2iYlDwXa9NGHsy9DIJ+JoHiVkx9WF2SVk/L7IltcEiqAa4VfvCE4I7fhV3yBJj4hTYne/g/o1DPR3S7zl6vtJhzfPgYeU15SmwqdUnXOmP4FO05UNegq9sL6WNQUYsu9iiuEpAWvtdF6E9zhcN4Nrrn76DOyG4c9mXiaJBOv5vq1D8lD/R/7MKmHftfef5a2lpodoQOj1OJBJqnmilrKjZW4EJSmqAhVlINU/AnauMM+op0Vxar4m7c68dYpQCr5OQdA3sY606D1YwM7HgvXNLFzsIsVBTBm58EJ5F1k19linZHTvpRs+vnQuODJ6DvvJj8ln35Ii0F4m276cnS1r1z9mny7H6aAgzUGgtSu60jnpvDdvzWQQA1XqJwSRmlJgTF6RMHQEJYpqpg1EsCiKFZrgCc0MALgUTSuj278maBB82V+RRY5WebrcpG/yL4m2U/LEoufexIJDP3LjeRSCkXx+Q1DhdHOGAoNGijnWgqBdmQY1KdygnAePUmWX5LC/zbj4BLT+izbfTm6p3LyT3RF9uOqKP8zwHA5j+5ocZynhp5LRfbTJly0Xecb8qDJALBDNuHG74fxHVm3f3gs4/zNBRbdW/Kp8WryeNzqU/0owPbDV38Ce70icCIGAAA=" } }') event = json.loads('{"awslogs": {"data": "H4sICILSAl0AA3Rlc3QArVNdb9tGEHzvr7gSfWgB09z7vuObHDOuCpsqKAZoawnGkTy5RChSJY9N0iD/vSslLew2BlykLwSxu9idmZt5H+39NLl7X747+CiNLhfl4u4mW68XV1l0Fg1vej9imTIupNLGAmVY7ob7q3GYD9hJ3Jsp6dy+alwS/BTi3dzXoR36j2PrMHq3xzkG1CbAE8qT22+uF2W2LresrirHgVPhjagtOLPbWa00B23BUIkrprma6rE9HDe+bLvgxylKb6MX3TA3F75v/Hh3c78P0fZ0Lfvd9+E48D5qGzzKhWIMJAjJubVCAWgtlJLGaMmUEUpaC0IwARKPWsus5oIKgYdDi8IEt0eOVEomQQmOy8zZX4Lh+kVdD3Mf2iYlDwXa9NGHsy9DIJ+JoHiVkx9WF2SVk/L7IltcEiqAa4VfvCE4I7fhV3yBJj4hTYne/g/o1DPR3S7zl6vtJhzfPgYeU15SmwqdUnXOmP4FO05UNegq9sL6WNQUYsu9iiuEpAWvtdF6E9zhcN4Nrrn76DOyG4c9mXiaJBOv5vq1D8lD/R/7MKmHftfef5a2lpodoQOj1OJBJqnmilrKjZW4EJSmqAhVlINU/AnauMM+op0Vxar4m7c68dYpQCr5OQdA3sY606D1YwM7HgvXNLFzsIsVBTBm58EJ5F1k19linZHTvpRs+vnQuODJ6DvvJj8ln35Ii0F4m276cnS1r1z9mny7H6aAgzUGgtSu60jnpvDdvzWQQA1XqJwSRmlJgTF6RMHQEJYpqpg1EsCiKFZrgCc0MALgUTSuj278maBB82V+RRY5WebrcpG/yL4m2U/LEoufexIJDP3LjeRSCkXx+Q1DhdHOGAoNGijnWgqBdmQY1KdygnAePUmWX5LC/zbj4BLT+izbfTm6p3LyT3RF9uOqKP8zwHA5j+5ocZynhp5LRfbTJly0Xecb8qDJALBDNuHG74fxHVm3f3gs4/zNBRbdW/Kp8WryeNzqU/0owPbDV38Ce70icCIGAAA=" } }')
index.handler(event, context) index.handler(event, context)
# {"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"vpcflowlog","logStream":"eni-123","subscriptionFilters":["CloudBender_Mgmt"],"logEvents":[{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.0.227 10.3.9.111 443 52866 6 2 135 1563806853 1563806911 ACCEPT OK"}]} # {"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"vpcflowlog","logStream":"eni-123","subscriptionFilters":["CloudBender_Mgmt"],"logEvents":[{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.0.227 10.3.9.111 443 52866 6 2 135 1563806853 1563806911 ACCEPT OK"},{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.9.48 10.10.0.227 24224 17234 6 10 1947 1563827256 1563827316 ACCEPT OK"}]}
event = json.loads('{"awslogs": {"data": "H4sIAC4iN10AAzWPTWsCMRCG7/0VIWcrSeYjibet3Xoo0oLeioirqSzsF7urUsT/3rFFmMPwvDPvzHvVdRqG3TGtf7qkZ/o1W2fbZb5aZYtcT3R7aVIv2DpAYh+isU5w1R4XfXvqRDl3+++qvQj556uxT7tahNSUz7ImdDgVw74vu7Fsm7eyGlM/6NmXnlft6fCSmkPqt8tjPerNn0F+Ts14H7jq8iA+gOycIYMEECOyMd4jM4XgyXFAphgNokNDHkyMLnpAiyiHx1LCjbta/rREjgwjiFmYPEKLvVMefQBGijaQun9tAAJxsXfGFuA9FMqaqZSZOufvPUzj1FqrEEGRC8yKlVMWSFliCIYDwaONMpfN5/nnWn2869vm9vQL/xeu7nMBAAA=" } }') event = json.loads('{"awslogs": {"data": "H4sIAHN1N10AA82R3UoDMRCF732KkOu6JPOXpHe1rr2QotDeiUh/Ylno7pbdrSLFd3eqFPQNhFwczkzOzMecbJ37frXLy49DtmN7O1lOXublYjGZlXZk2/cmd2p7QGIJMTkPau/b3axrjwetvB02r/v2XZ0ffzF0eVVrITfVtX5Ttz+u+01XHYaqbe6q/ZC73o6f7HTfHrc3udnm7mW+qwf7/B1QvuVmODecbLXVHCQBcOyIEVMicS4EEuEYA4NEEk7JEQE5DuhSghSQPJEOHiqFG1a17umZgZ0QalgcXaA1HkygEFGIk49szls7xMiy3oDzawwB18a7Qp8rAMJZY5EK770hQsMQRYwYMB7ZeBaMTiLjRSbtm0yn5ePSPNzbz9G/o0oFxT+AQABkfNCTK5d3xicKPzgQgOUi0ctvsufPqy8JYO/9TQIAAA==" } }')
index.handler(event, context) index.handler(event, context)
# Cloudfront Access Logs via S3 # Cloudfront Access Logs via S3