Compare commits

...

7 Commits

Author SHA1 Message Date
Stefan Reimer b5fd5b9fe9 fix: reformat, run tests with Python 3.9 2022-04-18 23:57:51 +02:00
Stefan Reimer c04e5e8756 ci: remove aws cli pypi req 2022-04-14 20:28:19 +02:00
Stefan Reimer 358d1449d8 Add test for cloudtrail event 2021-03-04 14:47:36 +01:00
Stefan Reimer 1b59eb1628 Update aws.lamda parser 2020-03-20 14:17:18 +00:00
Stefan Reimer ce57cf70f4 Introduce shorter timeouts for AWS API calls to fail fast and prevent cost explosions due to high log volume
continuous-integration/drone/push Build encountered an error Details
2020-01-04 16:11:52 +00:00
Stefan Reimer f0fd0c1e07 Release 0.9.8
continuous-integration/drone/push Build is passing Details
2019-08-06 23:20:11 +00:00
Stefan Reimer 2066e2ba11 Fix for ALB access log parser, improved vpc flowlog augmentation
continuous-integration/drone/push Build is passing Details
2019-07-28 11:34:05 +00:00
6 changed files with 300 additions and 144 deletions

View File

@ -1,5 +1,18 @@
# Changelog
## 0.9.10
- Updates aws.lambda parser for Init Duration, incl. tests
## 0.9.9
- Improved error handling / descreased timouts calling AWS APIs to fail fast in case AWS throttles API calls
which causes each Lambda call to timout and run for 300s
- Skip over more invalid vpc flowlog entries
## 0.9.8
- Fix for ALB AccessLog parser to handle spaces in request_url
- Improved VPC FlowLog metadata augmentation
- better error handling for VPC FlowLog parsing
## 0.9.6
- Augment VPC FlowLogs with ENI metadata incl. global cache

View File

@ -11,7 +11,7 @@ PACKAGE_FILE := dist/$(PACKAGE)
all: test build
test:
flake8 --ignore=E501 index.py tests
flake8 --ignore=E501,W503 index.py tests
TEST=True pytest --log-cli-level=DEBUG
clean:
@ -22,7 +22,7 @@ build: $(PACKAGE_FILE)
$(PACKAGE_FILE):
rm -rf dist && mkdir dist
cp -r index.py dist/
pip install --target dist --no-compile msgpack requests
pip install --isolated --target dist --no-compile msgpack requests
cd dist && zip -q -r $(PACKAGE) *
upload: $(PACKAGE_FILE)

View File

@ -4,6 +4,6 @@ requests
flake8
pytest
cfnlambda
awscli
#awscli
#pytest-profiling
#tuna

399
index.py
View File

@ -14,43 +14,47 @@ import io
import urllib
import datetime
import boto3
import botocore
__author__ = "Stefan Reimer"
__author_email__ = "stefan@zero-downtime.net"
__version__ = "0.9.6"
__version__ = "0.9.10"
# Global alias lookup cache
# IAM Alias lookup cache
account_aliases = {}
# Global eni lookup cache
# ENI lookup cache
enis = {}
# IP lookup cache
ips = {}
logger = logging.getLogger(__name__)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger('boto3').setLevel(logging.WARNING)
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
def boolean(value):
if value in ('t', 'T', 'true', 'True', 'TRUE', '1', 1, True):
if value in ("t", "T", "true", "True", "TRUE", "1", 1, True):
return True
return False
def decrypt(encrypted):
try:
kms = boto3.client('kms')
plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))['Plaintext']
kms = boto3.client("kms")
plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))["Plaintext"]
return plaintext.decode()
except Exception:
logging.exception("Failed to decrypt via KMS")
CHUNK_SIZE = 128
DEBUG = boolean(os.getenv('DEBUG', default=False))
TEST = boolean(os.getenv('TEST', default=False))
RESOLVE_ACCOUNT = boolean(os.getenv('RESOLVE_ACCOUNT', default=True))
RESOLVE_ENI = boolean(os.getenv('RESOLVE_ENI', default=True))
DEBUG = boolean(os.getenv("DEBUG", default=False))
TEST = boolean(os.getenv("TEST", default=False))
RESOLVE_ACCOUNT = boolean(os.getenv("RESOLVE_ACCOUNT", default=True))
ENHANCE_FLOWLOG = boolean(os.getenv("ENHANCE_FLOWLOG", default=True))
if DEBUG:
logging.getLogger().setLevel(logging.DEBUG)
@ -62,7 +66,7 @@ else:
class EventTime(msgpack.ExtType):
def __new__(cls, timestamp):
seconds = int(timestamp)
nanoseconds = int(timestamp % 1 * 10 ** 9)
nanoseconds = int(timestamp % 1 * 10**9)
return super(EventTime, cls).__new__(
cls,
code=0,
@ -78,60 +82,123 @@ def fluentd_time(timestamp):
def get_source(region, account_id):
""" returns a new base source object
resolves aws account_id to account alias and caches for lifetime of lambda function
"""returns a new base source object
resolves aws account_id to account alias and caches for lifetime of lambda function
"""
global RESOLVE_ACCOUNT
source = {'account': account_id, 'region': region}
source = {"account": account_id, "region": region}
if RESOLVE_ACCOUNT and not TEST:
try:
if account_id not in account_aliases:
iam = boto3.client('iam')
account_aliases[account_id] = iam.list_account_aliases()['AccountAliases'][0]
boto3_config = botocore.config.Config(
retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5
)
iam = boto3.client("iam", config=boto3_config)
account_aliases[account_id] = iam.list_account_aliases()[
"AccountAliases"
][0]
source['account_alias'] = account_aliases[account_id]
source["account_alias"] = account_aliases[account_id]
except(KeyError, IndexError):
logger.warning("Could not resolve IAM account alias")
except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError):
logger.warning(
"Could not resolve IAM account alias, disabled for this session"
)
RESOLVE_ACCOUNT = False
pass
return source
def get_eni_data(eni):
""" returns additional ENI properties
and caches for lifetime of lambda function
def add_flow_metadata(flow):
"""adds metadata to VPC flow: ENI, direction, type
caches the ENI and IP lookup tables for Lambda lifetime
"""
global RESOLVE_ENI
if RESOLVE_ENI and not TEST:
global ENHANCE_FLOWLOG
if ENHANCE_FLOWLOG and not TEST:
try:
if eni not in enis:
ec2 = boto3.client('ec2')
interface_iter = ec2.get_paginator('describe_network_interfaces').paginate()
# Check cache and update if missed with all ENIs in one go
if flow["interface-id"] not in enis:
boto3_config = botocore.config.Config(
retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5
)
ec2 = boto3.client("ec2", config=boto3_config)
interface_iter = ec2.get_paginator(
"describe_network_interfaces"
).paginate()
for response in interface_iter:
for interface in response['NetworkInterfaces']:
enis[interface['NetworkInterfaceId']] = {'eni.az': interface['AvailabilityZone'],
'eni.subnet': interface['SubnetId']}
if 'Association' in interface and 'PublicIp' in interface['Association']:
enis[interface['NetworkInterfaceId']]['eni.public_ip'] = interface['Association']['PublicIp']
for interface in response["NetworkInterfaces"]:
# Lookup table by ENI ID
enis[interface["NetworkInterfaceId"]] = interface
return enis[eni]
# Lookup table by IP to classify traffic
ips[interface["PrivateIpAddress"]] = interface
except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError):
logger.warning(
"Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG"
)
ENHANCE_FLOWLOG = False
return flow
except(KeyError, IndexError):
logger.warning("Could not get additional data for ENI {}".format(eni))
RESOLVE_ENI = False
try:
eni = enis[flow["interface-id"]]
metadata = {
"eni.az": eni["AvailabilityZone"],
"eni.subnet": eni["SubnetId"],
}
remote_ip = None
if len(eni["Groups"]):
metadata["eni.sg"] = eni["Groups"][0]["GroupName"]
# Add PublicIP if attached
if "Association" in eni and "PublicIp" in eni["Association"]:
metadata["eni.public_ip"] = eni["Association"]["PublicIp"]
# Determine traffic direction
if eni["PrivateIpAddress"] == flow["srcaddr"]:
metadata["direction"] = "Out"
remote_ip = flow["dstaddr"]
elif eni["PrivateIpAddress"] == flow["dstaddr"]:
metadata["direction"] = "In"
remote_ip = flow["srcaddr"]
# Try to classify traffic:
# Free,Regional,Out
if remote_ip:
if remote_ip in ips:
if (
ips[remote_ip]["AvailabilityZone"] == eni["AvailabilityZone"]
and ips[remote_ip]["VpcId"] == eni["VpcId"]
):
metadata["traffic_class"] = "Free"
else:
metadata["traffic_class"] = "Regional"
else:
# Incoming traffic is free 90% of times
if metadata["direction"] == "In":
metadata["traffic_class"] = "Free"
else:
metadata["traffic_class"] = "Out"
flow.update(metadata)
except (KeyError, IndexError) as e:
logger.warning(
"Could not get additional data for ENI {} ({})".format(
flow["interface-id"], e
)
)
pass
return {}
return flow
class Queue:
url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=''), scheme='https')
passwd = os.getenv('FLUENT_SHARED_KEY', default=None)
url = urllib.parse.urlsplit(os.getenv("FLUENTD_URL", default=""), scheme="https")
passwd = os.getenv("FLUENT_SHARED_KEY", default=None)
verify_certs = os.getenv('FLUENTD_VERIFY_CERTS', default=1)
if verify_certs in ('f', 'F', 'false', 'False', 'FALSE', '0', 0, False):
verify_certs = os.getenv("FLUENTD_VERIFY_CERTS", default=1)
if verify_certs in ("f", "F", "false", "False", "FALSE", "0", 0, False):
verify_certs = False
else:
verify_certs = True
@ -159,15 +226,24 @@ class Queue:
if not events:
return
logger.debug("Sending {} events to {}/{} ({})".format(events, self.url.geturl(), self.tag, self.request))
logger.debug(
"Sending {} events to {}/{} ({})".format(
events, self.url.geturl(), self.tag, self.request
)
)
if not TEST:
# Send events via POSTs reusing the same https connection, retry couple of times
retries = 0
_url = '{}/{}'.format(self.url.geturl(), self.tag)
_url = "{}/{}".format(self.url.geturl(), self.tag)
while True:
try:
r = self.request.post(url=_url, data=msgpack.packb(self._queue), verify=self.verify_certs)
r = self.request.post(
url=_url,
data=msgpack.packb(self._queue),
verify=self.verify_certs,
timeout=(6, 30),
)
if r:
break
else:
@ -177,12 +253,13 @@ class Queue:
logger.warning("RequestException: {}".format(e))
pass
if retries >= 8:
raise Exception("Error sending {} events to {}. Giving up.".format(events, _url))
if retries >= 2:
raise Exception(
"Error sending {} events to {}. Giving up.".format(events, _url)
)
retries = retries + 1
logger.warning("Error sending {} events to {}. Retrying in {} seconds.".format(events, _url, retries**2))
time.sleep(retries**2)
time.sleep(1)
else:
logger.debug("Test mode, dump only: {}".format(msgpack.packb(self._queue)))
@ -190,7 +267,11 @@ class Queue:
self._queue = []
def info(self):
logger.info("Sent {} events to {}/{} ({})".format(self.sent, self.url.geturl(), self.tag, self.request))
logger.info(
"Sent {} events to {}/{} ({})".format(
self.sent, self.url.geturl(), self.tag, self.request
)
)
# Handler to handle CloudWatch logs.
@ -200,149 +281,171 @@ def handler(event, context):
(region, account_id) = context.invoked_function_arn.split(":")[3:5]
# Cloudwatch Logs event
if 'awslogs' in event:
if "awslogs" in event:
# Grab the base64-encoded data.
b64strg = event['awslogs']['data']
b64strg = event["awslogs"]["data"]
# Decode base64-encoded string, which should be a gzipped object.
zippedContent = io.BytesIO(base64.b64decode(b64strg))
# Decompress the content and load JSON.
with gzip.GzipFile(mode='rb', fileobj=zippedContent) as content:
with gzip.GzipFile(mode="rb", fileobj=zippedContent) as content:
for line in content:
awsLogsData = json.loads(line.decode())
# First determine type
if re.match("/aws/lambda/", awsLogsData['logGroup']):
if re.match("/aws/lambda/", awsLogsData["logGroup"]):
logs = Queue("aws.lambda")
elif re.search("cloudtrail", awsLogsData['logGroup'], flags=re.IGNORECASE):
elif re.search("cloudtrail", awsLogsData["logGroup"], flags=re.IGNORECASE):
logs = Queue("aws.cloudtrail")
elif re.match("RDSOSMetrics", awsLogsData['logGroup']):
elif re.match("RDSOSMetrics", awsLogsData["logGroup"]):
logs = Queue("aws.rdsosmetrics")
elif re.match("vpcflowlog", awsLogsData['logGroup'], flags=re.IGNORECASE):
elif re.match("vpcflowlog", awsLogsData["logGroup"], flags=re.IGNORECASE):
logs = Queue("aws.vpcflowlog")
else:
logs = Queue("aws.cloudwatch_logs")
# Build list of log events
for e in awsLogsData['logEvents']:
for e in awsLogsData["logEvents"]:
event = {}
source = get_source(region, account_id)
parsed = {}
# Remove whitespace / empty events & skip over empty events
e['message'] = e['message'].strip()
if re.match(r'^\s*$', e['message']):
e["message"] = e["message"].strip()
if re.match(r"^\s*$", e["message"]):
continue
# inject existing data from subscrition filters
if('extractedFields' in e.keys()):
for key in e['extractedFields']:
event[key] = e['extractedFields'][key]
if "extractedFields" in e.keys():
for key in e["extractedFields"]:
event[key] = e["extractedFields"][key]
# lambda ?
if logs.tag == 'aws.lambda':
if logs.tag == "aws.lambda":
# First look for the three AWS Lambda entries
mg = re.match(r'(?P<type>(START|END|REPORT)) RequestId: (?P<request>\S*)', e['message'])
mg = re.match(
r"(?P<type>(START|END|REPORT)) RequestId: (?P<request>\S*)",
e["message"],
)
if mg:
parsed['RequestId'] = mg.group('request')
if mg.group('type') == 'REPORT':
pattern = r'.*(?:\tDuration: (?P<duration>[\d\.\d]+) ms\s*)(?:\tBilled Duration: (?P<billed_duration>[\d\.\d]+) ms\s*)(?:\tMemory Size: (?P<memory_size>[\d\.\d]+) MB\s*)(?:\tMax Memory Used: (?P<max_memory_used>[\d\.\d]+) MB)'
parsed["RequestId"] = mg.group("request")
if mg.group("type") == "REPORT":
pattern = r".*(?:\tDuration: (?P<duration>[\d\.\d]+) ms\s*)(?:\tBilled Duration: (?P<billed_duration>[\d\.\d]+) ms\s*)(?:\tMemory Size: (?P<memory_size>[\d\.\d]+) MB\s*)(?:\tMax Memory Used: (?P<max_memory_used>[\d\.\d]+) MB)(?:\tInit Duration: (?P<init_duration>[\d\.\d]+) ms\s*)?"
elif mg.group('type') == 'START':
pattern = r'.*(?:Version: (?P<version>.*))'
elif mg.group("type") == "START":
pattern = r".*(?:Version: (?P<version>.*))"
else:
pattern = ''
pattern = ""
data = re.match(pattern, e['message'])
data = re.match(pattern, e["message"])
for key in data.groupdict().keys():
parsed[key] = data.group(key)
if data.group(key):
parsed[key] = data.group(key)
# All other info parsed, so just set type itself
event['message'] = mg.group('type')
event["message"] = mg.group("type")
else:
# Try to extract data from AWS default python logging format
# This normalizes print vs. logging entries and allows requestid tracking
# "[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t%(message)s\n"
_msg = e['message']
pattern = r'(?:\[(?P<level>[^\]]*)\]\s)?(?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}Z)\s(?P<RequestId>\S*?)\s(?P<message>.*)'
data = re.match(pattern, e['message'], flags=re.DOTALL)
_msg = e["message"]
pattern = r"(?:\[(?P<level>[^\]]*)\]\s)?(?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}Z)\s(?P<RequestId>\S*?)\s(?P<message>.*)"
data = re.match(pattern, e["message"], flags=re.DOTALL)
if data:
if data.group('level'):
event['level'] = data.group('level')
event['time'] = fluentd_time(datetime.datetime.strptime(data.group('time'), '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
parsed['RequestId'] = data.group('RequestId')
_msg = data.group('message')
if data.group("level"):
event["level"] = data.group("level")
event["time"] = fluentd_time(
datetime.datetime.strptime(
data.group("time"), "%Y-%m-%dT%H:%M:%S.%fZ"
).timestamp()
)
parsed["RequestId"] = data.group("RequestId")
_msg = data.group("message")
# try to parse the remaining as json
try:
_json = json.loads(_msg)
# Make sure we have an actual object assigned to json field
if isinstance(_json, dict):
event['message_json'] = _json
event["message_json"] = _json
else:
event['message'] = _json
event["message"] = _json
except (ValueError, TypeError, KeyError):
event['message'] = _msg
event["message"] = _msg
# cloudtrail ?
elif logs.tag == 'aws.cloudtrail':
elif logs.tag == "aws.cloudtrail":
try:
parsed = json.loads(e['message'])
parsed = json.loads(e["message"])
# use eventTime and eventID from the event itself
event['time'] = fluentd_time(datetime.datetime.strptime(parsed['eventTime'], '%Y-%m-%dT%H:%M:%SZ').timestamp())
event['id'] = parsed['eventID']
event["time"] = fluentd_time(
datetime.datetime.strptime(
parsed["eventTime"], "%Y-%m-%dT%H:%M:%SZ"
).timestamp()
)
event["id"] = parsed["eventID"]
# override region from cloudtrail event
source['region'] = parsed['awsRegion']
source["region"] = parsed["awsRegion"]
except (ValueError, TypeError, KeyError):
event['message'] = e['message']
event["message"] = e["message"]
parsed.clear()
# RDS metrics ?
elif logs.tag == 'aws.rdsosmetrics':
elif logs.tag == "aws.rdsosmetrics":
try:
parsed = json.loads(e['message'])
parsed = json.loads(e["message"])
except (ValueError, TypeError, KeyError):
event['message'] = e['message']
event["message"] = e["message"]
# VPC FlowLog ?
# <version> <account-id> <interface-id> <srcaddr> <dstaddr> <srcport> <dstport> <protocol> <packets> <bytes> <start> <end> <action> <log-status>
elif logs.tag == 'aws.vpcflowlog':
row = e['message'].split(" ")
elif logs.tag == "aws.vpcflowlog":
row = e["message"].split(" ")
# Skip over NODATA entries, what would be the point having these in ES ?
if row[13] == 'NODATA':
# Skip over NODATA,SKIPDATA entries, what would be the point having these in ES ?
if row[13] != "OK":
continue
parsed = {'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7],
'packets': row[8], 'bytes': row[9], 'start': row[10], 'end': row[11], 'action': row[12], 'log-status': row[13]}
eni_metadata = get_eni_data(parsed['interface-id'])
if eni_metadata:
parsed.update(eni_metadata)
parsed = add_flow_metadata(
{
"interface-id": row[2],
"srcaddr": row[3],
"dstaddr": row[4],
"srcport": row[5],
"dstport": row[6],
"protocol": row[7],
"packets": row[8],
"bytes": row[9],
"start": row[10],
"end": row[11],
"action": row[12],
"log-status": row[13],
}
)
# Fallback add raw message
else:
event['message'] = e['message']
event["message"] = e["message"]
if parsed and logs.tag:
event[logs.tag] = parsed
# Forward cloudwatch logs event ID
source['log_group'] = awsLogsData['logGroup']
source['log_stream'] = awsLogsData['logStream']
event['source'] = source
source["log_group"] = awsLogsData["logGroup"]
source["log_stream"] = awsLogsData["logStream"]
event["source"] = source
# If time and id are not set yet use data from cloudwatch logs event
if 'time' not in event:
event['time'] = fluentd_time(e['timestamp'] / 1000)
if 'id' not in source:
event['id'] = e['id']
if "time" not in event:
event["time"] = fluentd_time(e["timestamp"] / 1000)
if "id" not in source:
event["id"] = e["id"]
logs.send(event)
@ -350,24 +453,26 @@ def handler(event, context):
logs.info()
# S3 Put event
elif 'Records' in event:
s3_client = boto3.client('s3')
elif "Records" in event:
s3_client = boto3.client("s3")
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
file_path = '/tmp/stream2fluentd.gz'
file_path = "/tmp/stream2fluentd.gz"
if TEST:
shutil.copyfile(key, file_path)
else:
s3_client.download_file(bucket, key, file_path)
source = get_source(region, account_id)
source['s3_url'] = '{}/{}'.format(bucket, key)
source["s3_url"] = "{}/{}".format(bucket, key)
alb_regex = re.compile(r"(?P<type>[^ ]*) (?P<timestamp>[^ ]*) (?P<elb>[^ ]*) (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*) (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*) (?P<request_processing_time>[-.0-9]*) (?P<target_processing_time>[-.0-9]*) (?P<response_processing_time>[-.0-9]*) (?P<elb_status_code>|[-0-9]*) (?P<target_status_code>-|[-0-9]*) (?P<received_bytes>[-0-9]*) (?P<sent_bytes>[-0-9]*) \"(?P<request_verb>[^ ]*) (?P<request_url>[^ ]*) (?P<request_proto>- |[^ ]*)\" \"(?P<user_agent>[^\"]*)\" (?P<ssl_cipher>[A-Z0-9-]+) (?P<ssl_protocol>[A-Za-z0-9.-]*) (?P<target_group_arn>[^ ]*) \"(?P<trace_id>[^\"]*)\" \"(?P<domain_name>[^\"]*)\" \"(?P<chosen_cert_arn>[^\"]*)\" (?P<matched_rule_priority>[-.0-9]*) (?P<request_creation_time>[^ ]*) \"(?P<actions_executed>[^\"]*)\" \"(?P<redirect_url>[^ ]*)\" \"(?P<error_reason>[^ ]*)\"")
alb_regex = re.compile(
r"(?P<type>[^ ]*) (?P<timestamp>[^ ]*) (?P<elb>[^ ]*) (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*) (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*) (?P<request_processing_time>[-.0-9]*) (?P<target_processing_time>[-.0-9]*) (?P<response_processing_time>[-.0-9]*) (?P<elb_status_code>|[-0-9]*) (?P<target_status_code>-|[-0-9]*) (?P<received_bytes>[-0-9]*) (?P<sent_bytes>[-0-9]*) \"(?P<request_verb>[^ ]*) (?P<request_url>[^\"]*) (?P<request_proto>- |[^ ]*)\" \"(?P<user_agent>[^\"]*)\" (?P<ssl_cipher>[A-Z0-9-]+) (?P<ssl_protocol>[A-Za-z0-9.-]*) (?P<target_group_arn>[^ ]*) \"(?P<trace_id>[^\"]*)\" \"(?P<domain_name>[^\"]*)\" \"(?P<chosen_cert_arn>[^\"]*)\" (?P<matched_rule_priority>[-.0-9]*) (?P<request_creation_time>[^ ]*) \"(?P<actions_executed>[^\"]*)\" \"(?P<redirect_url>[^ ]*)\" \"(?P<error_reason>[^ ]*)\""
)
# try to identify file type by looking at first lines
with gzip.open(file_path, mode='rt', newline='\n') as data:
with gzip.open(file_path, mode="rt", newline="\n") as data:
header = data.readlines(2048)
# ALB Access ?
@ -375,15 +480,19 @@ def handler(event, context):
logs = Queue("aws.alb_accesslog")
# cloudfront access logs
elif len(header) > 1 and re.match('#Version:', header[0]) and re.match('#Fields:', header[1]):
elif (
len(header) > 1
and re.match("#Version:", header[0])
and re.match("#Fields:", header[1])
):
logs = Queue("aws.cloudfront_accesslog")
else:
logger.warning("{}/{}: Unknown type!".format(bucket, key))
return
if logs.tag == 'aws.alb_accesslog':
with gzip.open(file_path, mode='rt', newline='\n') as data:
if logs.tag == "aws.alb_accesslog":
with gzip.open(file_path, mode="rt", newline="\n") as data:
for line in data:
event = {}
parsed = {}
@ -393,29 +502,39 @@ def handler(event, context):
value = data.group(key)
# Remove empty values
if value in ['-', '-\n']:
if value in ["-", "-\n"]:
continue
# Remove times of requests timed out
if key in ['request_processing_time', 'target_processing_time', 'response_processing_time'] and value in ['-1']:
if key in [
"request_processing_time",
"target_processing_time",
"response_processing_time",
] and value in ["-1"]:
continue
parsed[key] = data.group(key)
else:
logger.warning("Could not parse ALB access log entry: {}".format(line))
logger.warning(
"Could not parse ALB access log entry: {}".format(line)
)
continue
event['time'] = fluentd_time(datetime.datetime.strptime(parsed['request_creation_time'], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
event["time"] = fluentd_time(
datetime.datetime.strptime(
parsed["request_creation_time"], "%Y-%m-%dT%H:%M:%S.%fZ"
).timestamp()
)
# Copy to host to allow geoip upstream
event['host'] = parsed['client_ip']
event["host"] = parsed["client_ip"]
event[logs.tag] = parsed
event['source'] = source
event["source"] = source
logs.send(event)
elif logs.tag == 'aws.cloudfront_accesslog':
with gzip.open(file_path, mode='rt', newline='\n') as data:
elif logs.tag == "aws.cloudfront_accesslog":
with gzip.open(file_path, mode="rt", newline="\n") as data:
next(data)
# columns are in second line: first is #Fields, next two are merged into time later
columns = next(data).split()[3:]
@ -427,20 +546,24 @@ def handler(event, context):
# Todo hash each line to create source['id']
# source['id'] = md5.of.line matching ES ids
row = line.split('\t')
row = line.split("\t")
# cloudfront events are logged to the second only, date and time are seperate
event['time'] = fluentd_time(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp())
event["time"] = fluentd_time(
datetime.datetime.strptime(
row[0] + " " + row[1], "%Y-%m-%d %H:%M:%S"
).timestamp()
)
for n, c in enumerate(columns, 2):
value = row[n]
if value not in ['-', '-\n']:
if value not in ["-", "-\n"]:
parsed[c] = row[n]
# Copy c-ip to host to allow geoip upstream
if c == 'c-ip':
event['host'] = row[n]
if c == "c-ip":
event["host"] = row[n]
event[logs.tag] = parsed
event['source'] = source
event["source"] = source
logs.send(event)

16
test Normal file
View File

@ -0,0 +1,16 @@
{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "Cloudtrail/DefaultTrail",
"logStream": "123456789012_CloudTrail_eu-central-1",
"subscriptionFilters": [
"CloudBender_Mgmt"
],
"logEvents": [
{
"id": "36010944938174877173576838392419674140970254593468989442",
"timestamp": 1614786618904,
"message": "{\"eventVersion\":\"1.08\",\"userIdentity\":{\"type\":\"AssumedRole\",\"principalId\":\"AROAIVUV7DYO7JM46FRLW:AutoScaling\",\"arn\":\"arn:aws:sts::123456789012:assumed-role/AWSServiceRoleForAutoScaling/AutoScaling\",\"accountId\":\"123456789012\",\"sessionContext\":{\"sessionIssuer\":{\"type\":\"Role\",\"principalId\":\"AROAIVUV7DYO7JM46FRLW\",\"arn\":\"arn:aws:iam::123456789012:role/aws-service-role/autoscaling.amazonaws.com/AWSServiceRoleForAutoScaling\",\"accountId\":\"123456789012\",\"userName\":\"AWSServiceRoleForAutoScaling\"},\"webIdFederationData\":{},\"attributes\":{\"mfaAuthenticated\":\"false\",\"creationDate\":\"2021-03-03T15:18:15Z\"}},\"invokedBy\":\"autoscaling.amazonaws.com\"},\"eventTime\":\"2021-03-03T15:38:23Z\",\"eventSource\":\"ec2.amazonaws.com\",\"eventName\":\"DescribeInstanceStatus\",\"awsRegion\":\"eu-central-1\",\"sourceIPAddress\":\"autoscaling.amazonaws.com\",\"userAgent\":\"autoscaling.amazonaws.com\",\"requestParameters\":{\"instancesSet\":{\"items\":[{\"instanceId\":\"i-0fc26d9df63c21276\"},{\"instanceId\":\"i-01f635f7dd4af7d03\"},{\"instanceId\":\"i-0f5e6610de3ceb673\"}]},\"filterSet\":{},\"includeAllInstances\":true},\"responseElements\":null,\"requestID\":\"d89cde42-9a72-421a-af6b-e642ca862055\",\"eventID\":\"1dc68ed9-6879-4874-a286-179604a26747\",\"readOnly\":true,\"eventType\":\"AwsApiCall\",\"managementEvent\":true,\"eventCategory\":\"Management\",\"recipientAccountId\":\"123456789012\"}"
}
]
}

View File

@ -26,11 +26,11 @@ logging.getLogger("index").setLevel(logging.DEBUG)
def test_parse():
# Test Cloudwatch Logs
event = json.loads('{"awslogs": {"data": "H4sICILSAl0AA3Rlc3QArVNdb9tGEHzvr7gSfWgB09z7vuObHDOuCpsqKAZoawnGkTy5RChSJY9N0iD/vSslLew2BlykLwSxu9idmZt5H+39NLl7X747+CiNLhfl4u4mW68XV1l0Fg1vej9imTIupNLGAmVY7ob7q3GYD9hJ3Jsp6dy+alwS/BTi3dzXoR36j2PrMHq3xzkG1CbAE8qT22+uF2W2LresrirHgVPhjagtOLPbWa00B23BUIkrprma6rE9HDe+bLvgxylKb6MX3TA3F75v/Hh3c78P0fZ0Lfvd9+E48D5qGzzKhWIMJAjJubVCAWgtlJLGaMmUEUpaC0IwARKPWsus5oIKgYdDi8IEt0eOVEomQQmOy8zZX4Lh+kVdD3Mf2iYlDwXa9NGHsy9DIJ+JoHiVkx9WF2SVk/L7IltcEiqAa4VfvCE4I7fhV3yBJj4hTYne/g/o1DPR3S7zl6vtJhzfPgYeU15SmwqdUnXOmP4FO05UNegq9sL6WNQUYsu9iiuEpAWvtdF6E9zhcN4Nrrn76DOyG4c9mXiaJBOv5vq1D8lD/R/7MKmHftfef5a2lpodoQOj1OJBJqnmilrKjZW4EJSmqAhVlINU/AnauMM+op0Vxar4m7c68dYpQCr5OQdA3sY606D1YwM7HgvXNLFzsIsVBTBm58EJ5F1k19linZHTvpRs+vnQuODJ6DvvJj8ln35Ii0F4m276cnS1r1z9mny7H6aAgzUGgtSu60jnpvDdvzWQQA1XqJwSRmlJgTF6RMHQEJYpqpg1EsCiKFZrgCc0MALgUTSuj278maBB82V+RRY5WebrcpG/yL4m2U/LEoufexIJDP3LjeRSCkXx+Q1DhdHOGAoNGijnWgqBdmQY1KdygnAePUmWX5LC/zbj4BLT+izbfTm6p3LyT3RF9uOqKP8zwHA5j+5ocZynhp5LRfbTJly0Xecb8qDJALBDNuHG74fxHVm3f3gs4/zNBRbdW/Kp8WryeNzqU/0owPbDV38Ce70icCIGAAA=" } }')
event = json.loads('{"awslogs": {"data": "H4sIAGPPdF4AA+VUTW/bRhC991dshR5awBRnv3cJ9CDHjKvClgOJAdpahrAkVwoRilTJZRI3yH/v0HYCu7ULF82tF4KYGcx78+bNfpzsfd+7nc+uD36STE5m2Wxznq5Ws9N0cjRp3ze+wzBlXEiljQXKMFy3u9OuHQ6Yid37Pq7dPi9dHHwfou3QFKFqm9uyVei822MdA2pj4DHl8eV3Z7MsXWVXrMhzx4FT4Y0oLDiz3VqtNAdtwVCJLfoh74uuOowdX1Z18F0/SS4nL+p2KI99U/puc77bh8nVDVr6zjdhLPg4qUoE5UIxBhKE5NxaoQC0FkpJY7RkygglrQUhmACJoNYyq7mgQiBwqFCY4PY4I5WSSVCCYzNz9FkwbD8rinZoQlUm5L5A62by6ei/MZDPZLB8vSA/XxyTiwXJflqmsxNCBXCt8IsYgjNyGd7gBsrohmlC9NVXYKeeye5yvnh5cbUO4+4j4BHlGbWJ0AlVU8b0b5hxIi9A55EX1keioBBZ7lWUIyUteKGN1uvgDodp3bpyc+szsu3aPel5Esc9z4firQ/xff0f+jAu2mZb7R4dW0vNRurAKLUIyCTVXFFLubESG4LSFBWhinKQij8xNvawD8ZOl8uL5Ze51c3cOgFIJJ9yAJzbWGdKtH5kYMsj4coycg62kaIAxmw9OIFzL9OzdLZKyU2/hKyb4VC64Enna+9638d3P6TCQ/iQrJusc4XPXfGWfL9v+4CFBR4EKVxdk9r14Ye/ayCBGq5QOSWM0pICY3RkwdAQlimqmDUSwKIoVmuAJzQwAuDBaZyNbvyVoEEX88UpmS3IfLHKZosX6bck/WWeYfCxlUhg6F9uJJdSKGq0MgwVRjvjUWjQQDnXUgi0I8NDfepOkI593IkMRicyyChNuEnATjlXuBEnvIdtbqKSexmJ3KMTDeURyBHQCMkKvg69D5vO97575zd7v2+7axJawoSawvkxKYauQ73razL0VbMjKOPUYnw31m7euK4kPxKYwudI327DXeQZSuivokS6OCFL//uAhXN8t551gM/Z0z+ze+rF+Cu7ZfrqYpn9a4LhZOjceOxYTw2dSkX2/TocV3XtS3IvyQAwQ9bh/HZ5q+oPj2GsPz/GoPtA7hKve4/gVt/E//cCzJsq3OtCrUKIsdGozNWnb/4EFS89L0YIAAA=" } }')
index.handler(event, context)
# {"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"vpcflowlog","logStream":"eni-123","subscriptionFilters":["CloudBender_Mgmt"],"logEvents":[{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.0.227 10.3.9.111 443 52866 6 2 135 1563806853 1563806911 ACCEPT OK"}]}
event = json.loads('{"awslogs": {"data": "H4sIAC4iN10AAzWPTWsCMRCG7/0VIWcrSeYjibet3Xoo0oLeioirqSzsF7urUsT/3rFFmMPwvDPvzHvVdRqG3TGtf7qkZ/o1W2fbZb5aZYtcT3R7aVIv2DpAYh+isU5w1R4XfXvqRDl3+++qvQj556uxT7tahNSUz7ImdDgVw74vu7Fsm7eyGlM/6NmXnlft6fCSmkPqt8tjPerNn0F+Ts14H7jq8iA+gOycIYMEECOyMd4jM4XgyXFAphgNokNDHkyMLnpAiyiHx1LCjbta/rREjgwjiFmYPEKLvVMefQBGijaQun9tAAJxsXfGFuA9FMqaqZSZOufvPUzj1FqrEEGRC8yKlVMWSFliCIYDwaONMpfN5/nnWn2869vm9vQL/xeu7nMBAAA=" } }')
# {"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"vpcflowlog","logStream":"eni-123","subscriptionFilters":["CloudBender_Mgmt"],"logEvents":[{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.0.227 10.3.9.111 443 52866 6 2 135 1563806853 1563806911 ACCEPT OK"},{"id":"34622050453399460077466588752684659904424057309929734144","timestamp":1552506436228,"message":"2 747836459185 eni-033856bc201b3773b 10.10.9.48 10.10.0.227 24224 17234 6 10 1947 1563827256 1563827316 ACCEPT OK"}]}
event = json.loads('{"awslogs": {"data": "H4sIAHN1N10AA82R3UoDMRCF732KkOu6JPOXpHe1rr2QotDeiUh/Ylno7pbdrSLFd3eqFPQNhFwczkzOzMecbJ37frXLy49DtmN7O1lOXublYjGZlXZk2/cmd2p7QGIJMTkPau/b3axrjwetvB02r/v2XZ0ffzF0eVVrITfVtX5Ttz+u+01XHYaqbe6q/ZC73o6f7HTfHrc3udnm7mW+qwf7/B1QvuVmODecbLXVHCQBcOyIEVMicS4EEuEYA4NEEk7JEQE5DuhSghSQPJEOHiqFG1a17umZgZ0QalgcXaA1HkygEFGIk49szls7xMiy3oDzawwB18a7Qp8rAMJZY5EK770hQsMQRYwYMB7ZeBaMTiLjRSbtm0yn5ePSPNzbz9G/o0oFxT+AQABkfNCTK5d3xicKPzgQgOUi0ctvsufPqy8JYO/9TQIAAA==" } }')
index.handler(event, context)
# Cloudfront Access Logs via S3
@ -41,6 +41,10 @@ def test_parse():
event = json.loads('{ "Records": [ { "eventVersion": "2.0", "eventTime": "1970-01-01T00:00:00.000Z", "requestParameters": { "sourceIPAddress": "127.0.0.1" }, "s3": { "configurationId": "testConfigRule", "object": { "eTag": "0123456789abcdef0123456789abcdef", "sequencer": "0A1B2C3D4E5F678901", "key": "tests/test_alb_accesslogs.gz", "size": 1024 }, "bucket": { "arn": "arn:aws:s3:::mybucket", "name": "file://", "ownerIdentity": { "principalId": "EXAMPLE" } }, "s3SchemaVersion": "1.0" }, "responseElements": { "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", "x-amz-request-id": "EXAMPLE123456789" }, "awsRegion": "us-east-1", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "EXAMPLE" }, "eventSource": "aws:s3" } ] }')
index.handler(event, context)
# cloudtrail incl. _keyed
event = json.loads('{"awslogs": {"data": "H4sIAAAAAAAAA5VU227jNhB972foOUpEXajLmzZOFi6aZhG5WXTXi4AmRy5RSnJJKt40yL93KCpZw82mW8CQIc2ZOXMOh/MYdGAM28LqYQdBFSzqVX13ddE09fuL4CQY9j1o/EziJM1oXpQRifGzGrbv9TDuMHKuhlFYzaQ6W0DLRmVX7sWDGquBdUf5d1PKhLqDMeTQY7oKCaaYcWO4ljsrh/5SKgvaBNVnz/EOegH67mrb2eDLVP3iHlMd4DGQAkkSGpGoTNMyKUieFnlO8iTLaZEUSRmnpKR5StKozKM4S7MySWlRFgh3gqxEGyzrUBGhJM0LSgk2m54824PlH9cBOMZb7Ar7WwfVOiCnUbEOTtbBaEAvBUalfcAIYi0aOmFqY8YOxM2gYILutOy53DG1FD5+c10vb3+7zRe/X+c/X6X08uaXj1U92qHhTMl+O2Ux7Rnxv2J7UxlrqurQ1op5nlAj0Vn9sWlA30sOjvdy0Af1zv5Vm/Nh7O3cz2HRKWzQAhR8PvQWvlqvbv62RE7QR4L/n9JX1UnWHambVGEoNF6Wl8lQifFKTlnH/h56hJzyoXvTgB8R7Q70V9bNR/hmsSeE72GzFJeAE8rc8C6YZc4VF2LWarkZLRjvU9cyTP/DzQpnFjx/y5TxpnG8MnMJTx5HMQmjBH8rklWkqEj2CUldadnfD3+CePfgzfueGb7FaXhXsnutalJUcfJp4p9gzTBq7oHA4+Nqz6gXfxbgru0Glj1eop5DY5kdjbd5b25g+3xfDu+7n62JaPmhFkLjSP2Xjvlg6i0W+QGshr9GvNcfmMZO3TbxJyDnNk0D8zhLC50Lfj6IzpMhw6jlMRWlaGnCYxLn1Pn5KpAgJGtzIVKGzyj5LrDNADdMJCDhsKG5A35xZ9ROS29uy58wV6OAWqlnb12bVo/wNOkzu6E3cKGgc7sQQ/2o1Dfly8XEKIqSC0jjsGR5HKYxYSFr6SYEmsacFTSOsuzbsc5JRHBagChDWuRliPs0DVlc0JDkJY1SFuM6zWeXmbju1cPc2MuovSzAval38pwpNcE71uNCdf1OC/wo6xzHfjtoP9FXL9CZCLeJxLf6jbv75Kz86R9rVP9k2AYAAA==" } }')
index.handler(event, context)
# unknown file
event = json.loads('{ "Records": [ { "eventVersion": "2.0", "eventTime": "1970-01-01T00:00:00.000Z", "requestParameters": { "sourceIPAddress": "127.0.0.1" }, "s3": { "configurationId": "testConfigRule", "object": { "eTag": "0123456789abcdef0123456789abcdef", "sequencer": "0A1B2C3D4E5F678901", "key": "tests/test_s3_unknown.gz", "size": 1024 }, "bucket": { "arn": "arn:aws:s3:::mybucket", "name": "file://", "ownerIdentity": { "principalId": "EXAMPLE" } }, "s3SchemaVersion": "1.0" }, "responseElements": { "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", "x-amz-request-id": "EXAMPLE123456789" }, "awsRegion": "us-east-1", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "EXAMPLE" }, "eventSource": "aws:s3" } ] }')
index.handler(event, context)