#!/usr/bin/env python import base64 import requests import gzip import json import msgpack import struct import os import shutil import re import logging import time import io import urllib import datetime import boto3 import botocore __author__ = "Stefan Reimer" __author_email__ = "stefan@zero-downtime.net" __version__ = "0.9.10" # IAM Alias lookup cache account_aliases = {} # ENI lookup cache enis = {} # IP lookup cache ips = {} logger = logging.getLogger(__name__) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger('boto3').setLevel(logging.WARNING) logging.getLogger('botocore').setLevel(logging.WARNING) def boolean(value): if value in ('t', 'T', 'true', 'True', 'TRUE', '1', 1, True): return True return False def decrypt(encrypted): try: kms = boto3.client('kms') plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))[ 'Plaintext'] return plaintext.decode() except Exception: logging.exception("Failed to decrypt via KMS") CHUNK_SIZE = 128 DEBUG = boolean(os.getenv('DEBUG', default=False)) TEST = boolean(os.getenv('TEST', default=False)) RESOLVE_ACCOUNT = boolean(os.getenv('RESOLVE_ACCOUNT', default=True)) ENHANCE_FLOWLOG = boolean(os.getenv('ENHANCE_FLOWLOG', default=True)) if DEBUG: logging.getLogger().setLevel(logging.DEBUG) else: logging.getLogger().setLevel(logging.INFO) # From fluent/fluent-logger-python class EventTime(msgpack.ExtType): def __new__(cls, timestamp): seconds = int(timestamp) nanoseconds = int(timestamp % 1 * 10 ** 9) return super(EventTime, cls).__new__( cls, code=0, data=struct.pack(">II", seconds, nanoseconds), ) def fluentd_time(timestamp): if isinstance(timestamp, float): return EventTime(timestamp) else: return int(timestamp) def get_source(region, account_id): """ returns a new base source object resolves aws account_id to account alias and caches for lifetime of lambda function """ global RESOLVE_ACCOUNT source = {'account': account_id, 'region': region} if RESOLVE_ACCOUNT and not TEST: try: if account_id not in account_aliases: boto3_config = botocore.config.Config(retries=dict( max_attempts=2), connect_timeout=3, read_timeout=5) iam = boto3.client('iam', config=boto3_config) account_aliases[account_id] = iam.list_account_aliases()[ 'AccountAliases'][0] source['account_alias'] = account_aliases[account_id] except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): logger.warning( "Could not resolve IAM account alias, disabled for this session") RESOLVE_ACCOUNT = False pass return source def add_flow_metadata(flow): """ adds metadata to VPC flow: ENI, direction, type caches the ENI and IP lookup tables for Lambda lifetime """ global ENHANCE_FLOWLOG if ENHANCE_FLOWLOG and not TEST: try: # Check cache and update if missed with all ENIs in one go if flow['interface-id'] not in enis: boto3_config = botocore.config.Config(retries=dict( max_attempts=2), connect_timeout=3, read_timeout=5) ec2 = boto3.client('ec2', config=boto3_config) interface_iter = ec2.get_paginator( 'describe_network_interfaces').paginate() for response in interface_iter: for interface in response['NetworkInterfaces']: # Lookup table by ENI ID enis[interface['NetworkInterfaceId']] = interface # Lookup table by IP to classify traffic ips[interface['PrivateIpAddress']] = interface except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): logger.warning( "Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG") ENHANCE_FLOWLOG = False return flow try: eni = enis[flow['interface-id']] metadata = {'eni.az': eni['AvailabilityZone'], 'eni.subnet': eni['SubnetId']} remote_ip = None if len(eni['Groups']): metadata['eni.sg'] = eni['Groups'][0]['GroupName'] # Add PublicIP if attached if 'Association' in eni and 'PublicIp' in eni['Association']: metadata['eni.public_ip'] = eni['Association']['PublicIp'] # Determine traffic direction if eni['PrivateIpAddress'] == flow['srcaddr']: metadata['direction'] = 'Out' remote_ip = flow['dstaddr'] elif eni['PrivateIpAddress'] == flow['dstaddr']: metadata['direction'] = 'In' remote_ip = flow['srcaddr'] # Try to classify traffic: # Free,Regional,Out if remote_ip: if remote_ip in ips: if ips[remote_ip]['AvailabilityZone'] == eni['AvailabilityZone'] and ips[remote_ip]['VpcId'] == eni['VpcId']: metadata['traffic_class'] = 'Free' else: metadata['traffic_class'] = 'Regional' else: # Incoming traffic is free 90% of times if metadata['direction'] == 'In': metadata['traffic_class'] = 'Free' else: metadata['traffic_class'] = 'Out' flow.update(metadata) except (KeyError, IndexError) as e: logger.warning("Could not get additional data for ENI {} ({})".format( flow['interface-id'], e)) pass return flow class Queue: url = urllib.parse.urlsplit( os.getenv('FLUENTD_URL', default=''), scheme='https') passwd = os.getenv('FLUENT_SHARED_KEY', default=None) verify_certs = os.getenv('FLUENTD_VERIFY_CERTS', default=1) if verify_certs in ('f', 'F', 'false', 'False', 'FALSE', '0', 0, False): verify_certs = False else: verify_certs = True # cached request session request = requests.Session() request.headers = {"Content-type": "application/msgpack"} if passwd: request.auth = ("fluent", passwd) def __init__(self, tag): self._queue = [] self.tag = tag self.sent = 0 def send(self, event): self._queue.append(event) logger.debug("Queued {} event: {}".format(self.tag, event)) # Send events in chunks if len(self._queue) >= CHUNK_SIZE: self.flush() def flush(self): events = len(self._queue) if not events: return logger.debug("Sending {} events to {}/{} ({})".format(events, self.url.geturl(), self.tag, self.request)) if not TEST: # Send events via POSTs reusing the same https connection, retry couple of times retries = 0 _url = '{}/{}'.format(self.url.geturl(), self.tag) while True: try: r = self.request.post(url=_url, data=msgpack.packb( self._queue), verify=self.verify_certs, timeout=(6, 30)) if r: break else: logger.warning("HTTP Error: {}".format(r.status_code)) except requests.RequestException as e: logger.warning("RequestException: {}".format(e)) pass if retries >= 2: raise Exception( "Error sending {} events to {}. Giving up.".format(events, _url)) retries = retries + 1 time.sleep(1) else: logger.debug("Test mode, dump only: {}".format( msgpack.packb(self._queue))) self.sent = self.sent + events self._queue = [] def info(self): logger.info("Sent {} events to {}/{} ({})".format(self.sent, self.url.geturl(), self.tag, self.request)) # Handler to handle CloudWatch logs. def handler(event, context): logger.debug("Event received: {}".format(event)) (region, account_id) = context.invoked_function_arn.split(":")[3:5] # Cloudwatch Logs event if 'awslogs' in event: # Grab the base64-encoded data. b64strg = event['awslogs']['data'] # Decode base64-encoded string, which should be a gzipped object. zippedContent = io.BytesIO(base64.b64decode(b64strg)) # Decompress the content and load JSON. with gzip.GzipFile(mode='rb', fileobj=zippedContent) as content: for line in content: awsLogsData = json.loads(line.decode()) # First determine type if re.match("/aws/lambda/", awsLogsData['logGroup']): logs = Queue("aws.lambda") elif re.search("cloudtrail", awsLogsData['logGroup'], flags=re.IGNORECASE): logs = Queue("aws.cloudtrail") elif re.match("RDSOSMetrics", awsLogsData['logGroup']): logs = Queue("aws.rdsosmetrics") elif re.match("vpcflowlog", awsLogsData['logGroup'], flags=re.IGNORECASE): logs = Queue("aws.vpcflowlog") else: logs = Queue("aws.cloudwatch_logs") # Build list of log events for e in awsLogsData['logEvents']: event = {} source = get_source(region, account_id) parsed = {} # Remove whitespace / empty events & skip over empty events e['message'] = e['message'].strip() if re.match(r'^\s*$', e['message']): continue # inject existing data from subscrition filters if ('extractedFields' in e.keys()): for key in e['extractedFields']: event[key] = e['extractedFields'][key] # lambda ? if logs.tag == 'aws.lambda': # First look for the three AWS Lambda entries mg = re.match( r'(?P(START|END|REPORT)) RequestId: (?P\S*)', e['message']) if mg: parsed['RequestId'] = mg.group('request') if mg.group('type') == 'REPORT': pattern = r'.*(?:\tDuration: (?P[\d\.\d]+) ms\s*)(?:\tBilled Duration: (?P[\d\.\d]+) ms\s*)(?:\tMemory Size: (?P[\d\.\d]+) MB\s*)(?:\tMax Memory Used: (?P[\d\.\d]+) MB)(?:\tInit Duration: (?P[\d\.\d]+) ms\s*)?' elif mg.group('type') == 'START': pattern = r'.*(?:Version: (?P.*))' else: pattern = '' data = re.match(pattern, e['message']) for key in data.groupdict().keys(): if data.group(key): parsed[key] = data.group(key) # All other info parsed, so just set type itself event['message'] = mg.group('type') else: # Try to extract data from AWS default python logging format # This normalizes print vs. logging entries and allows requestid tracking # "[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t%(message)s\n" _msg = e['message'] pattern = r'(?:\[(?P[^\]]*)\]\s)?(?P