diff --git a/index.py b/index.py index 719486c..1638c8c 100644 --- a/index.py +++ b/index.py @@ -17,7 +17,7 @@ import boto3 __author__ = "Stefan Reimer" __author_email__ = "stefan@zero-downtime.net" -__version__ = "0.9.3" +__version__ = "0.9.4" # Global alias lookup cache account_aliases = {} @@ -94,7 +94,7 @@ def get_source(region, account_id): class Queue: - url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=None)) + url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=''), scheme='https') passwd = os.getenv('FLUENT_SHARED_KEY', default=None) verify_certs = os.getenv('FLUENTD_VERIFY_CERTS', default=1) @@ -313,55 +313,25 @@ def handler(event, context): source = get_source(region, account_id) source['s3_url'] = '{}/{}'.format(bucket, key) - # try to identify file by looking at first two lines - first = "" - second = "" - try: - with gzip.open(file_path, mode='rt', newline='\n') as data: - first = next(data) - second = next(data) - except (OSError, StopIteration): + alb_regex = re.compile(r"(?P[^ ]*) (?P[^ ]*) (?P[^ ]*) (?P[^ ]*):(?P[0-9]*) (?P[^ ]*)[:-](?P[0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P|[-0-9]*) (?P-|[-0-9]*) (?P[-0-9]*) (?P[-0-9]*) \"(?P[^ ]*) (?P[^ ]*) (?P- |[^ ]*)\" \"(?P[^\"]*)\" (?P[A-Z0-9-]+) (?P[A-Za-z0-9.-]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^\"]*)\" \"(?P[^\"]*)\" (?P[-.0-9]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^ ]*)\" \"(?P[^ ]*)\"") + + # try to identify file type by looking at first lines + with gzip.open(file_path, mode='rt', newline='\n') as data: + header = data.readlines(2048) + + # ALB Access ? + if alb_regex.match(header[0]): + logs = Queue("aws.alb_accesslog") + + # cloudfront access logs + elif len(header) > 1 and re.match('#Version:', header[0]) and re.match('#Fields:', header[1]): + logs = Queue("aws.cloudfront_accesslog") + + else: logger.warning("{}/{}: Unknown type!".format(bucket, key)) return - alb_regex = re.compile(r"(?P[^ ]*) (?P[^ ]*) (?P[^ ]*) (?P[^ ]*):(?P[0-9]*) (?P[^ ]*)[:-](?P[0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P[-.0-9]*) (?P|[-0-9]*) (?P-|[-0-9]*) (?P[-0-9]*) (?P[-0-9]*) \"(?P[^ ]*) (?P[^ ]*) (?P- |[^ ]*)\" \"(?P[^\"]*)\" (?P[A-Z0-9-]+) (?P[A-Za-z0-9.-]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^\"]*)\" \"(?P[^\"]*)\" (?P[-.0-9]*) (?P[^ ]*) \"(?P[^\"]*)\" \"(?P[^ ]*)\" \"(?P[^ ]*)\"") - - # cloudfront access logs - if re.match('#Version:', first) and re.match('#Fields:', second): - logs = Queue("aws.cloudfront_accesslog") - - with gzip.open(file_path, mode='rt', newline='\n') as data: - next(data) - # columns are in second line: first is #Fields, next two are merged into time later - columns = next(data).split()[3:] - - for line in data: - event = {} - parsed = {} - - # Todo hash each line to create source['id'] - # source['id'] = md5.of.line matching ES ids - - row = line.split('\t') - # cloudfront events are logged to the second only, date and time are seperate - event['time'] = fluentd_time(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp()) - - for n, c in enumerate(columns, 2): - value = row[n] - if value not in ['-', '-\n']: - parsed[c] = row[n] - # Copy c-ip to host to allow geoip upstream - if c == 'c-ip': - event['host'] = row[n] - - event[logs.tag] = parsed - event['source'] = source - - logs.send(event) - - elif alb_regex.match(first): - logs = Queue("aws.alb_accesslog") - + if logs.tag == 'aws.alb_accesslog': with gzip.open(file_path, mode='rt', newline='\n') as data: for line in data: event = {} @@ -392,8 +362,36 @@ def handler(event, context): event['source'] = source logs.send(event) - else: - logs = Queue("unknown") + + elif logs.tag == 'aws.cloudfront_accesslog': + with gzip.open(file_path, mode='rt', newline='\n') as data: + next(data) + # columns are in second line: first is #Fields, next two are merged into time later + columns = next(data).split()[3:] + + for line in data: + event = {} + parsed = {} + + # Todo hash each line to create source['id'] + # source['id'] = md5.of.line matching ES ids + + row = line.split('\t') + # cloudfront events are logged to the second only, date and time are seperate + event['time'] = fluentd_time(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp()) + + for n, c in enumerate(columns, 2): + value = row[n] + if value not in ['-', '-\n']: + parsed[c] = row[n] + # Copy c-ip to host to allow geoip upstream + if c == 'c-ip': + event['host'] = row[n] + + event[logs.tag] = parsed + event['source'] = source + + logs.send(event) logs.flush() logs.info() diff --git a/tests/test_parse.py b/tests/test_parse.py index 02921b0..b37e80a 100755 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -36,3 +36,7 @@ def test_parse(): # alb Access Logs via S3 event = json.loads('{ "Records": [ { "eventVersion": "2.0", "eventTime": "1970-01-01T00:00:00.000Z", "requestParameters": { "sourceIPAddress": "127.0.0.1" }, "s3": { "configurationId": "testConfigRule", "object": { "eTag": "0123456789abcdef0123456789abcdef", "sequencer": "0A1B2C3D4E5F678901", "key": "tests/test_alb_accesslogs.gz", "size": 1024 }, "bucket": { "arn": "arn:aws:s3:::mybucket", "name": "file://", "ownerIdentity": { "principalId": "EXAMPLE" } }, "s3SchemaVersion": "1.0" }, "responseElements": { "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", "x-amz-request-id": "EXAMPLE123456789" }, "awsRegion": "us-east-1", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "EXAMPLE" }, "eventSource": "aws:s3" } ] }') index.handler(event, context) + + # unknown file + event = json.loads('{ "Records": [ { "eventVersion": "2.0", "eventTime": "1970-01-01T00:00:00.000Z", "requestParameters": { "sourceIPAddress": "127.0.0.1" }, "s3": { "configurationId": "testConfigRule", "object": { "eTag": "0123456789abcdef0123456789abcdef", "sequencer": "0A1B2C3D4E5F678901", "key": "tests/test_s3_unknown.gz", "size": 1024 }, "bucket": { "arn": "arn:aws:s3:::mybucket", "name": "file://", "ownerIdentity": { "principalId": "EXAMPLE" } }, "s3SchemaVersion": "1.0" }, "responseElements": { "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", "x-amz-request-id": "EXAMPLE123456789" }, "awsRegion": "us-east-1", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "EXAMPLE" }, "eventSource": "aws:s3" } ] }') + index.handler(event, context) diff --git a/tests/test_s3_unknown.gz b/tests/test_s3_unknown.gz new file mode 100644 index 0000000..8324fda Binary files /dev/null and b/tests/test_s3_unknown.gz differ