Parsing improvements

This commit is contained in:
Stefan Reimer 2019-06-24 16:14:40 +00:00
parent 5b3e04566b
commit b26205658e
2 changed files with 18 additions and 7 deletions

View File

@ -1,5 +1,10 @@
# Changelog # Changelog
## 0.9.3
- improved parsing of timestamps incl. subsecond resolution
- improved parsing of Lambda events to catch RequestIds
- skip over empty cloudwatch log events
## 0.9.2 ## 0.9.2
- fixed parser for multi line Lambda events - fixed parser for multi line Lambda events

View File

@ -17,7 +17,7 @@ import boto3
__author__ = "Stefan Reimer" __author__ = "Stefan Reimer"
__author_email__ = "stefan@zero-downtime.net" __author_email__ = "stefan@zero-downtime.net"
__version__ = "0.9.2" __version__ = "0.9.3"
# Global alias lookup cache # Global alias lookup cache
account_aliases = {} account_aliases = {}
@ -195,6 +195,11 @@ def handler(event, context):
source = get_source(region, account_id) source = get_source(region, account_id)
parsed = {} parsed = {}
# Remove whitespace / empty events & skip over empty events
e['message'] = e['message'].strip()
if re.match(r'^\s*$', e['message']):
continue
# inject existing data from subscrition filters # inject existing data from subscrition filters
if('extractedFields' in e.keys()): if('extractedFields' in e.keys()):
for key in e['extractedFields']: for key in e['extractedFields']:
@ -202,7 +207,6 @@ def handler(event, context):
# lambda ? # lambda ?
if logs.tag == 'aws.lambda': if logs.tag == 'aws.lambda':
# First look for the three AWS Lambda entries # First look for the three AWS Lambda entries
mg = re.match(r'(?P<type>(START|END|REPORT)) RequestId: (?P<request>\S*)', e['message']) mg = re.match(r'(?P<type>(START|END|REPORT)) RequestId: (?P<request>\S*)', e['message'])
if mg: if mg:
@ -228,10 +232,12 @@ def handler(event, context):
# This normalizes print vs. logging entries and allows requestid tracking # This normalizes print vs. logging entries and allows requestid tracking
# "[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t%(message)s\n" # "[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t%(message)s\n"
_msg = e['message'] _msg = e['message']
pattern = r'(?:\[(?P<level>[^\]]*)\])\t(?P<time>.*?)\t(?P<RequestId>\S*?)\t(?P<message>.*)' pattern = r'(?:\[(?P<level>[^\]]*)\]\s)?(?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}Z)\s(?P<RequestId>\S*?)\s(?P<message>.*)'
data = re.match(pattern, e['message'], flags=re.DOTALL) data = re.match(pattern, e['message'], flags=re.DOTALL)
if data: if data:
event['level'] = data.group('level') if data.group('level'):
event['level'] = data.group('level')
event['time'] = fluentd_time(datetime.datetime.strptime(data.group('time'), '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
parsed['RequestId'] = data.group('RequestId') parsed['RequestId'] = data.group('RequestId')
_msg = data.group('message') _msg = data.group('message')
@ -252,7 +258,7 @@ def handler(event, context):
parsed = json.loads(e['message']) parsed = json.loads(e['message'])
# use eventTime and eventID from the event itself # use eventTime and eventID from the event itself
event['time'] = fluentd_time(time.mktime(datetime.datetime.strptime(parsed['eventTime'], '%Y-%m-%dT%H:%M:%SZ').timetuple())) event['time'] = fluentd_time(datetime.datetime.strptime(parsed['eventTime'], '%Y-%m-%dT%H:%M:%SZ').timestamp())
event['id'] = parsed['eventID'] event['id'] = parsed['eventID']
# override region from cloudtrail event # override region from cloudtrail event
source['region'] = parsed['awsRegion'] source['region'] = parsed['awsRegion']
@ -338,7 +344,7 @@ def handler(event, context):
row = line.split('\t') row = line.split('\t')
# cloudfront events are logged to the second only, date and time are seperate # cloudfront events are logged to the second only, date and time are seperate
event['time'] = fluentd_time(time.mktime(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timetuple())) event['time'] = fluentd_time(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp())
for n, c in enumerate(columns, 2): for n, c in enumerate(columns, 2):
value = row[n] value = row[n]
@ -378,7 +384,7 @@ def handler(event, context):
logger.warning("Could not parse ALB access log entry: {}".format(line)) logger.warning("Could not parse ALB access log entry: {}".format(line))
continue continue
event['time'] = fluentd_time(time.mktime(datetime.datetime.strptime(parsed['request_creation_time'], '%Y-%m-%dT%H:%M:%S.%fZ').timetuple())) event['time'] = fluentd_time(datetime.datetime.strptime(parsed['request_creation_time'], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
# Copy to host to allow geoip upstream # Copy to host to allow geoip upstream
event['host'] = parsed['client_ip'] event['host'] = parsed['client_ip']