diff --git a/Makefile b/Makefile index 93f5093..a7a4345 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ PACKAGE_FILE := dist/$(PACKAGE) all: test build test: - flake8 --ignore=E501 index.py tests + flake8 --ignore=E501,W503 index.py tests TEST=True pytest --log-cli-level=DEBUG clean: diff --git a/index.py b/index.py index ab584d8..b9d3cf0 100644 --- a/index.py +++ b/index.py @@ -31,30 +31,30 @@ ips = {} logger = logging.getLogger(__name__) logging.getLogger("urllib3").setLevel(logging.WARNING) -logging.getLogger('boto3').setLevel(logging.WARNING) -logging.getLogger('botocore').setLevel(logging.WARNING) +logging.getLogger("boto3").setLevel(logging.WARNING) +logging.getLogger("botocore").setLevel(logging.WARNING) def boolean(value): - if value in ('t', 'T', 'true', 'True', 'TRUE', '1', 1, True): + if value in ("t", "T", "true", "True", "TRUE", "1", 1, True): return True return False def decrypt(encrypted): try: - kms = boto3.client('kms') - plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))['Plaintext'] + kms = boto3.client("kms") + plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))["Plaintext"] return plaintext.decode() except Exception: logging.exception("Failed to decrypt via KMS") CHUNK_SIZE = 128 -DEBUG = boolean(os.getenv('DEBUG', default=False)) -TEST = boolean(os.getenv('TEST', default=False)) -RESOLVE_ACCOUNT = boolean(os.getenv('RESOLVE_ACCOUNT', default=True)) -ENHANCE_FLOWLOG = boolean(os.getenv('ENHANCE_FLOWLOG', default=True)) +DEBUG = boolean(os.getenv("DEBUG", default=False)) +TEST = boolean(os.getenv("TEST", default=False)) +RESOLVE_ACCOUNT = boolean(os.getenv("RESOLVE_ACCOUNT", default=True)) +ENHANCE_FLOWLOG = boolean(os.getenv("ENHANCE_FLOWLOG", default=True)) if DEBUG: logging.getLogger().setLevel(logging.DEBUG) @@ -66,7 +66,7 @@ else: class EventTime(msgpack.ExtType): def __new__(cls, timestamp): seconds = int(timestamp) - nanoseconds = int(timestamp % 1 * 10 ** 9) + nanoseconds = int(timestamp % 1 * 10**9) return super(EventTime, cls).__new__( cls, code=0, @@ -82,22 +82,28 @@ def fluentd_time(timestamp): def get_source(region, account_id): - """ returns a new base source object - resolves aws account_id to account alias and caches for lifetime of lambda function + """returns a new base source object + resolves aws account_id to account alias and caches for lifetime of lambda function """ global RESOLVE_ACCOUNT - source = {'account': account_id, 'region': region} + source = {"account": account_id, "region": region} if RESOLVE_ACCOUNT and not TEST: try: if account_id not in account_aliases: - boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5) - iam = boto3.client('iam', config=boto3_config) - account_aliases[account_id] = iam.list_account_aliases()['AccountAliases'][0] + boto3_config = botocore.config.Config( + retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5 + ) + iam = boto3.client("iam", config=boto3_config) + account_aliases[account_id] = iam.list_account_aliases()[ + "AccountAliases" + ][0] - source['account_alias'] = account_aliases[account_id] + source["account_alias"] = account_aliases[account_id] - except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): - logger.warning("Could not resolve IAM account alias, disabled for this session") + except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): + logger.warning( + "Could not resolve IAM account alias, disabled for this session" + ) RESOLVE_ACCOUNT = False pass @@ -105,79 +111,94 @@ def get_source(region, account_id): def add_flow_metadata(flow): - """ adds metadata to VPC flow: ENI, direction, type - caches the ENI and IP lookup tables for Lambda lifetime + """adds metadata to VPC flow: ENI, direction, type + caches the ENI and IP lookup tables for Lambda lifetime """ global ENHANCE_FLOWLOG if ENHANCE_FLOWLOG and not TEST: try: # Check cache and update if missed with all ENIs in one go - if flow['interface-id'] not in enis: - boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5) - ec2 = boto3.client('ec2', config=boto3_config) - interface_iter = ec2.get_paginator('describe_network_interfaces').paginate() + if flow["interface-id"] not in enis: + boto3_config = botocore.config.Config( + retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5 + ) + ec2 = boto3.client("ec2", config=boto3_config) + interface_iter = ec2.get_paginator( + "describe_network_interfaces" + ).paginate() for response in interface_iter: - for interface in response['NetworkInterfaces']: + for interface in response["NetworkInterfaces"]: # Lookup table by ENI ID - enis[interface['NetworkInterfaceId']] = interface + enis[interface["NetworkInterfaceId"]] = interface # Lookup table by IP to classify traffic - ips[interface['PrivateIpAddress']] = interface - except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): - logger.warning("Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG") + ips[interface["PrivateIpAddress"]] = interface + except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): + logger.warning( + "Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG" + ) ENHANCE_FLOWLOG = False return flow try: - eni = enis[flow['interface-id']] - metadata = {'eni.az': eni['AvailabilityZone'], - 'eni.subnet': eni['SubnetId']} + eni = enis[flow["interface-id"]] + metadata = { + "eni.az": eni["AvailabilityZone"], + "eni.subnet": eni["SubnetId"], + } remote_ip = None - if len(eni['Groups']): - metadata['eni.sg'] = eni['Groups'][0]['GroupName'] + if len(eni["Groups"]): + metadata["eni.sg"] = eni["Groups"][0]["GroupName"] # Add PublicIP if attached - if 'Association' in eni and 'PublicIp' in eni['Association']: - metadata['eni.public_ip'] = eni['Association']['PublicIp'] + if "Association" in eni and "PublicIp" in eni["Association"]: + metadata["eni.public_ip"] = eni["Association"]["PublicIp"] # Determine traffic direction - if eni['PrivateIpAddress'] == flow['srcaddr']: - metadata['direction'] = 'Out' - remote_ip = flow['dstaddr'] - elif eni['PrivateIpAddress'] == flow['dstaddr']: - metadata['direction'] = 'In' - remote_ip = flow['srcaddr'] + if eni["PrivateIpAddress"] == flow["srcaddr"]: + metadata["direction"] = "Out" + remote_ip = flow["dstaddr"] + elif eni["PrivateIpAddress"] == flow["dstaddr"]: + metadata["direction"] = "In" + remote_ip = flow["srcaddr"] # Try to classify traffic: # Free,Regional,Out if remote_ip: if remote_ip in ips: - if ips[remote_ip]['AvailabilityZone'] == eni['AvailabilityZone'] and ips[remote_ip]['VpcId'] == eni['VpcId']: - metadata['traffic_class'] = 'Free' + if ( + ips[remote_ip]["AvailabilityZone"] == eni["AvailabilityZone"] + and ips[remote_ip]["VpcId"] == eni["VpcId"] + ): + metadata["traffic_class"] = "Free" else: - metadata['traffic_class'] = 'Regional' + metadata["traffic_class"] = "Regional" else: # Incoming traffic is free 90% of times - if metadata['direction'] == 'In': - metadata['traffic_class'] = 'Free' + if metadata["direction"] == "In": + metadata["traffic_class"] = "Free" else: - metadata['traffic_class'] = 'Out' + metadata["traffic_class"] = "Out" flow.update(metadata) - except(KeyError, IndexError) as e: - logger.warning("Could not get additional data for ENI {} ({})".format(flow['interface-id'], e)) + except (KeyError, IndexError) as e: + logger.warning( + "Could not get additional data for ENI {} ({})".format( + flow["interface-id"], e + ) + ) pass return flow class Queue: - url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=''), scheme='https') - passwd = os.getenv('FLUENT_SHARED_KEY', default=None) + url = urllib.parse.urlsplit(os.getenv("FLUENTD_URL", default=""), scheme="https") + passwd = os.getenv("FLUENT_SHARED_KEY", default=None) - verify_certs = os.getenv('FLUENTD_VERIFY_CERTS', default=1) - if verify_certs in ('f', 'F', 'false', 'False', 'FALSE', '0', 0, False): + verify_certs = os.getenv("FLUENTD_VERIFY_CERTS", default=1) + if verify_certs in ("f", "F", "false", "False", "FALSE", "0", 0, False): verify_certs = False else: verify_certs = True @@ -205,15 +226,24 @@ class Queue: if not events: return - logger.debug("Sending {} events to {}/{} ({})".format(events, self.url.geturl(), self.tag, self.request)) + logger.debug( + "Sending {} events to {}/{} ({})".format( + events, self.url.geturl(), self.tag, self.request + ) + ) if not TEST: # Send events via POSTs reusing the same https connection, retry couple of times retries = 0 - _url = '{}/{}'.format(self.url.geturl(), self.tag) + _url = "{}/{}".format(self.url.geturl(), self.tag) while True: try: - r = self.request.post(url=_url, data=msgpack.packb(self._queue), verify=self.verify_certs, timeout=(6, 30)) + r = self.request.post( + url=_url, + data=msgpack.packb(self._queue), + verify=self.verify_certs, + timeout=(6, 30), + ) if r: break else: @@ -224,7 +254,9 @@ class Queue: pass if retries >= 2: - raise Exception("Error sending {} events to {}. Giving up.".format(events, _url)) + raise Exception( + "Error sending {} events to {}. Giving up.".format(events, _url) + ) retries = retries + 1 time.sleep(1) @@ -235,7 +267,11 @@ class Queue: self._queue = [] def info(self): - logger.info("Sent {} events to {}/{} ({})".format(self.sent, self.url.geturl(), self.tag, self.request)) + logger.info( + "Sent {} events to {}/{} ({})".format( + self.sent, self.url.geturl(), self.tag, self.request + ) + ) # Handler to handle CloudWatch logs. @@ -245,146 +281,171 @@ def handler(event, context): (region, account_id) = context.invoked_function_arn.split(":")[3:5] # Cloudwatch Logs event - if 'awslogs' in event: + if "awslogs" in event: # Grab the base64-encoded data. - b64strg = event['awslogs']['data'] + b64strg = event["awslogs"]["data"] # Decode base64-encoded string, which should be a gzipped object. zippedContent = io.BytesIO(base64.b64decode(b64strg)) # Decompress the content and load JSON. - with gzip.GzipFile(mode='rb', fileobj=zippedContent) as content: + with gzip.GzipFile(mode="rb", fileobj=zippedContent) as content: for line in content: awsLogsData = json.loads(line.decode()) # First determine type - if re.match("/aws/lambda/", awsLogsData['logGroup']): + if re.match("/aws/lambda/", awsLogsData["logGroup"]): logs = Queue("aws.lambda") - elif re.search("cloudtrail", awsLogsData['logGroup'], flags=re.IGNORECASE): + elif re.search("cloudtrail", awsLogsData["logGroup"], flags=re.IGNORECASE): logs = Queue("aws.cloudtrail") - elif re.match("RDSOSMetrics", awsLogsData['logGroup']): + elif re.match("RDSOSMetrics", awsLogsData["logGroup"]): logs = Queue("aws.rdsosmetrics") - elif re.match("vpcflowlog", awsLogsData['logGroup'], flags=re.IGNORECASE): + elif re.match("vpcflowlog", awsLogsData["logGroup"], flags=re.IGNORECASE): logs = Queue("aws.vpcflowlog") else: logs = Queue("aws.cloudwatch_logs") # Build list of log events - for e in awsLogsData['logEvents']: + for e in awsLogsData["logEvents"]: event = {} source = get_source(region, account_id) parsed = {} # Remove whitespace / empty events & skip over empty events - e['message'] = e['message'].strip() - if re.match(r'^\s*$', e['message']): + e["message"] = e["message"].strip() + if re.match(r"^\s*$", e["message"]): continue # inject existing data from subscrition filters - if('extractedFields' in e.keys()): - for key in e['extractedFields']: - event[key] = e['extractedFields'][key] + if "extractedFields" in e.keys(): + for key in e["extractedFields"]: + event[key] = e["extractedFields"][key] # lambda ? - if logs.tag == 'aws.lambda': + if logs.tag == "aws.lambda": # First look for the three AWS Lambda entries - mg = re.match(r'(?P(START|END|REPORT)) RequestId: (?P\S*)', e['message']) + mg = re.match( + r"(?P(START|END|REPORT)) RequestId: (?P\S*)", + e["message"], + ) if mg: - parsed['RequestId'] = mg.group('request') - if mg.group('type') == 'REPORT': - pattern = r'.*(?:\tDuration: (?P[\d\.\d]+) ms\s*)(?:\tBilled Duration: (?P[\d\.\d]+) ms\s*)(?:\tMemory Size: (?P[\d\.\d]+) MB\s*)(?:\tMax Memory Used: (?P[\d\.\d]+) MB)(?:\tInit Duration: (?P[\d\.\d]+) ms\s*)?' + parsed["RequestId"] = mg.group("request") + if mg.group("type") == "REPORT": + pattern = r".*(?:\tDuration: (?P[\d\.\d]+) ms\s*)(?:\tBilled Duration: (?P[\d\.\d]+) ms\s*)(?:\tMemory Size: (?P[\d\.\d]+) MB\s*)(?:\tMax Memory Used: (?P[\d\.\d]+) MB)(?:\tInit Duration: (?P[\d\.\d]+) ms\s*)?" - elif mg.group('type') == 'START': - pattern = r'.*(?:Version: (?P.*))' + elif mg.group("type") == "START": + pattern = r".*(?:Version: (?P.*))" else: - pattern = '' + pattern = "" - data = re.match(pattern, e['message']) + data = re.match(pattern, e["message"]) for key in data.groupdict().keys(): if data.group(key): parsed[key] = data.group(key) # All other info parsed, so just set type itself - event['message'] = mg.group('type') + event["message"] = mg.group("type") else: # Try to extract data from AWS default python logging format # This normalizes print vs. logging entries and allows requestid tracking # "[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t%(message)s\n" - _msg = e['message'] - pattern = r'(?:\[(?P[^\]]*)\]\s)?(?P