Default https scheme, improved S3 file detection also handles one line ALB accesslogs
This commit is contained in:
parent
b26205658e
commit
b0bd435604
98
index.py
98
index.py
@ -17,7 +17,7 @@ import boto3
|
||||
|
||||
__author__ = "Stefan Reimer"
|
||||
__author_email__ = "stefan@zero-downtime.net"
|
||||
__version__ = "0.9.3"
|
||||
__version__ = "0.9.4"
|
||||
|
||||
# Global alias lookup cache
|
||||
account_aliases = {}
|
||||
@ -94,7 +94,7 @@ def get_source(region, account_id):
|
||||
|
||||
|
||||
class Queue:
|
||||
url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=None))
|
||||
url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=''), scheme='https')
|
||||
passwd = os.getenv('FLUENT_SHARED_KEY', default=None)
|
||||
|
||||
verify_certs = os.getenv('FLUENTD_VERIFY_CERTS', default=1)
|
||||
@ -313,55 +313,25 @@ def handler(event, context):
|
||||
source = get_source(region, account_id)
|
||||
source['s3_url'] = '{}/{}'.format(bucket, key)
|
||||
|
||||
# try to identify file by looking at first two lines
|
||||
first = ""
|
||||
second = ""
|
||||
try:
|
||||
with gzip.open(file_path, mode='rt', newline='\n') as data:
|
||||
first = next(data)
|
||||
second = next(data)
|
||||
except (OSError, StopIteration):
|
||||
alb_regex = re.compile(r"(?P<type>[^ ]*) (?P<timestamp>[^ ]*) (?P<elb>[^ ]*) (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*) (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*) (?P<request_processing_time>[-.0-9]*) (?P<target_processing_time>[-.0-9]*) (?P<response_processing_time>[-.0-9]*) (?P<elb_status_code>|[-0-9]*) (?P<target_status_code>-|[-0-9]*) (?P<received_bytes>[-0-9]*) (?P<sent_bytes>[-0-9]*) \"(?P<request_verb>[^ ]*) (?P<request_url>[^ ]*) (?P<request_proto>- |[^ ]*)\" \"(?P<user_agent>[^\"]*)\" (?P<ssl_cipher>[A-Z0-9-]+) (?P<ssl_protocol>[A-Za-z0-9.-]*) (?P<target_group_arn>[^ ]*) \"(?P<trace_id>[^\"]*)\" \"(?P<domain_name>[^\"]*)\" \"(?P<chosen_cert_arn>[^\"]*)\" (?P<matched_rule_priority>[-.0-9]*) (?P<request_creation_time>[^ ]*) \"(?P<actions_executed>[^\"]*)\" \"(?P<redirect_url>[^ ]*)\" \"(?P<error_reason>[^ ]*)\"")
|
||||
|
||||
# try to identify file type by looking at first lines
|
||||
with gzip.open(file_path, mode='rt', newline='\n') as data:
|
||||
header = data.readlines(2048)
|
||||
|
||||
# ALB Access ?
|
||||
if alb_regex.match(header[0]):
|
||||
logs = Queue("aws.alb_accesslog")
|
||||
|
||||
# cloudfront access logs
|
||||
elif len(header) > 1 and re.match('#Version:', header[0]) and re.match('#Fields:', header[1]):
|
||||
logs = Queue("aws.cloudfront_accesslog")
|
||||
|
||||
else:
|
||||
logger.warning("{}/{}: Unknown type!".format(bucket, key))
|
||||
return
|
||||
|
||||
alb_regex = re.compile(r"(?P<type>[^ ]*) (?P<timestamp>[^ ]*) (?P<elb>[^ ]*) (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*) (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*) (?P<request_processing_time>[-.0-9]*) (?P<target_processing_time>[-.0-9]*) (?P<response_processing_time>[-.0-9]*) (?P<elb_status_code>|[-0-9]*) (?P<target_status_code>-|[-0-9]*) (?P<received_bytes>[-0-9]*) (?P<sent_bytes>[-0-9]*) \"(?P<request_verb>[^ ]*) (?P<request_url>[^ ]*) (?P<request_proto>- |[^ ]*)\" \"(?P<user_agent>[^\"]*)\" (?P<ssl_cipher>[A-Z0-9-]+) (?P<ssl_protocol>[A-Za-z0-9.-]*) (?P<target_group_arn>[^ ]*) \"(?P<trace_id>[^\"]*)\" \"(?P<domain_name>[^\"]*)\" \"(?P<chosen_cert_arn>[^\"]*)\" (?P<matched_rule_priority>[-.0-9]*) (?P<request_creation_time>[^ ]*) \"(?P<actions_executed>[^\"]*)\" \"(?P<redirect_url>[^ ]*)\" \"(?P<error_reason>[^ ]*)\"")
|
||||
|
||||
# cloudfront access logs
|
||||
if re.match('#Version:', first) and re.match('#Fields:', second):
|
||||
logs = Queue("aws.cloudfront_accesslog")
|
||||
|
||||
with gzip.open(file_path, mode='rt', newline='\n') as data:
|
||||
next(data)
|
||||
# columns are in second line: first is #Fields, next two are merged into time later
|
||||
columns = next(data).split()[3:]
|
||||
|
||||
for line in data:
|
||||
event = {}
|
||||
parsed = {}
|
||||
|
||||
# Todo hash each line to create source['id']
|
||||
# source['id'] = md5.of.line matching ES ids
|
||||
|
||||
row = line.split('\t')
|
||||
# cloudfront events are logged to the second only, date and time are seperate
|
||||
event['time'] = fluentd_time(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp())
|
||||
|
||||
for n, c in enumerate(columns, 2):
|
||||
value = row[n]
|
||||
if value not in ['-', '-\n']:
|
||||
parsed[c] = row[n]
|
||||
# Copy c-ip to host to allow geoip upstream
|
||||
if c == 'c-ip':
|
||||
event['host'] = row[n]
|
||||
|
||||
event[logs.tag] = parsed
|
||||
event['source'] = source
|
||||
|
||||
logs.send(event)
|
||||
|
||||
elif alb_regex.match(first):
|
||||
logs = Queue("aws.alb_accesslog")
|
||||
|
||||
if logs.tag == 'aws.alb_accesslog':
|
||||
with gzip.open(file_path, mode='rt', newline='\n') as data:
|
||||
for line in data:
|
||||
event = {}
|
||||
@ -392,8 +362,36 @@ def handler(event, context):
|
||||
event['source'] = source
|
||||
|
||||
logs.send(event)
|
||||
else:
|
||||
logs = Queue("unknown")
|
||||
|
||||
elif logs.tag == 'aws.cloudfront_accesslog':
|
||||
with gzip.open(file_path, mode='rt', newline='\n') as data:
|
||||
next(data)
|
||||
# columns are in second line: first is #Fields, next two are merged into time later
|
||||
columns = next(data).split()[3:]
|
||||
|
||||
for line in data:
|
||||
event = {}
|
||||
parsed = {}
|
||||
|
||||
# Todo hash each line to create source['id']
|
||||
# source['id'] = md5.of.line matching ES ids
|
||||
|
||||
row = line.split('\t')
|
||||
# cloudfront events are logged to the second only, date and time are seperate
|
||||
event['time'] = fluentd_time(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp())
|
||||
|
||||
for n, c in enumerate(columns, 2):
|
||||
value = row[n]
|
||||
if value not in ['-', '-\n']:
|
||||
parsed[c] = row[n]
|
||||
# Copy c-ip to host to allow geoip upstream
|
||||
if c == 'c-ip':
|
||||
event['host'] = row[n]
|
||||
|
||||
event[logs.tag] = parsed
|
||||
event['source'] = source
|
||||
|
||||
logs.send(event)
|
||||
|
||||
logs.flush()
|
||||
logs.info()
|
||||
|
@ -36,3 +36,7 @@ def test_parse():
|
||||
# alb Access Logs via S3
|
||||
event = json.loads('{ "Records": [ { "eventVersion": "2.0", "eventTime": "1970-01-01T00:00:00.000Z", "requestParameters": { "sourceIPAddress": "127.0.0.1" }, "s3": { "configurationId": "testConfigRule", "object": { "eTag": "0123456789abcdef0123456789abcdef", "sequencer": "0A1B2C3D4E5F678901", "key": "tests/test_alb_accesslogs.gz", "size": 1024 }, "bucket": { "arn": "arn:aws:s3:::mybucket", "name": "file://", "ownerIdentity": { "principalId": "EXAMPLE" } }, "s3SchemaVersion": "1.0" }, "responseElements": { "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", "x-amz-request-id": "EXAMPLE123456789" }, "awsRegion": "us-east-1", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "EXAMPLE" }, "eventSource": "aws:s3" } ] }')
|
||||
index.handler(event, context)
|
||||
|
||||
# unknown file
|
||||
event = json.loads('{ "Records": [ { "eventVersion": "2.0", "eventTime": "1970-01-01T00:00:00.000Z", "requestParameters": { "sourceIPAddress": "127.0.0.1" }, "s3": { "configurationId": "testConfigRule", "object": { "eTag": "0123456789abcdef0123456789abcdef", "sequencer": "0A1B2C3D4E5F678901", "key": "tests/test_s3_unknown.gz", "size": 1024 }, "bucket": { "arn": "arn:aws:s3:::mybucket", "name": "file://", "ownerIdentity": { "principalId": "EXAMPLE" } }, "s3SchemaVersion": "1.0" }, "responseElements": { "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", "x-amz-request-id": "EXAMPLE123456789" }, "awsRegion": "us-east-1", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "EXAMPLE" }, "eventSource": "aws:s3" } ] }')
|
||||
index.handler(event, context)
|
||||
|
BIN
tests/test_s3_unknown.gz
Normal file
BIN
tests/test_s3_unknown.gz
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user