diff --git a/README.md b/README.md index f7e53f0..031541a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,3 @@ -[![Build Status](https://drone.zero-downtime.net/api/badges/ZeroDownTime/streamlogs2fluentd/status.svg)](https://drone.zero-downtime.net/ZeroDownTime/streamlogs2fluentd) - # streamlogs2fluentd # About diff --git a/index.py b/index.py index ab584d8..55f139b 100644 --- a/index.py +++ b/index.py @@ -44,7 +44,8 @@ def boolean(value): def decrypt(encrypted): try: kms = boto3.client('kms') - plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))['Plaintext'] + plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))[ + 'Plaintext'] return plaintext.decode() except Exception: logging.exception("Failed to decrypt via KMS") @@ -90,14 +91,17 @@ def get_source(region, account_id): if RESOLVE_ACCOUNT and not TEST: try: if account_id not in account_aliases: - boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5) + boto3_config = botocore.config.Config(retries=dict( + max_attempts=2), connect_timeout=3, read_timeout=5) iam = boto3.client('iam', config=boto3_config) - account_aliases[account_id] = iam.list_account_aliases()['AccountAliases'][0] + account_aliases[account_id] = iam.list_account_aliases()[ + 'AccountAliases'][0] source['account_alias'] = account_aliases[account_id] - except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): - logger.warning("Could not resolve IAM account alias, disabled for this session") + except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): + logger.warning( + "Could not resolve IAM account alias, disabled for this session") RESOLVE_ACCOUNT = False pass @@ -113,9 +117,11 @@ def add_flow_metadata(flow): try: # Check cache and update if missed with all ENIs in one go if flow['interface-id'] not in enis: - boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5) + boto3_config = botocore.config.Config(retries=dict( + max_attempts=2), connect_timeout=3, read_timeout=5) ec2 = boto3.client('ec2', config=boto3_config) - interface_iter = ec2.get_paginator('describe_network_interfaces').paginate() + interface_iter = ec2.get_paginator( + 'describe_network_interfaces').paginate() for response in interface_iter: for interface in response['NetworkInterfaces']: # Lookup table by ENI ID @@ -123,8 +129,9 @@ def add_flow_metadata(flow): # Lookup table by IP to classify traffic ips[interface['PrivateIpAddress']] = interface - except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): - logger.warning("Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG") + except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): + logger.warning( + "Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG") ENHANCE_FLOWLOG = False return flow @@ -165,15 +172,17 @@ def add_flow_metadata(flow): flow.update(metadata) - except(KeyError, IndexError) as e: - logger.warning("Could not get additional data for ENI {} ({})".format(flow['interface-id'], e)) + except (KeyError, IndexError) as e: + logger.warning("Could not get additional data for ENI {} ({})".format( + flow['interface-id'], e)) pass return flow class Queue: - url = urllib.parse.urlsplit(os.getenv('FLUENTD_URL', default=''), scheme='https') + url = urllib.parse.urlsplit( + os.getenv('FLUENTD_URL', default=''), scheme='https') passwd = os.getenv('FLUENT_SHARED_KEY', default=None) verify_certs = os.getenv('FLUENTD_VERIFY_CERTS', default=1) @@ -205,7 +214,8 @@ class Queue: if not events: return - logger.debug("Sending {} events to {}/{} ({})".format(events, self.url.geturl(), self.tag, self.request)) + logger.debug("Sending {} events to {}/{} ({})".format(events, + self.url.geturl(), self.tag, self.request)) if not TEST: # Send events via POSTs reusing the same https connection, retry couple of times @@ -213,7 +223,8 @@ class Queue: _url = '{}/{}'.format(self.url.geturl(), self.tag) while True: try: - r = self.request.post(url=_url, data=msgpack.packb(self._queue), verify=self.verify_certs, timeout=(6, 30)) + r = self.request.post(url=_url, data=msgpack.packb( + self._queue), verify=self.verify_certs, timeout=(6, 30)) if r: break else: @@ -224,18 +235,21 @@ class Queue: pass if retries >= 2: - raise Exception("Error sending {} events to {}. Giving up.".format(events, _url)) + raise Exception( + "Error sending {} events to {}. Giving up.".format(events, _url)) retries = retries + 1 time.sleep(1) else: - logger.debug("Test mode, dump only: {}".format(msgpack.packb(self._queue))) + logger.debug("Test mode, dump only: {}".format( + msgpack.packb(self._queue))) self.sent = self.sent + events self._queue = [] def info(self): - logger.info("Sent {} events to {}/{} ({})".format(self.sent, self.url.geturl(), self.tag, self.request)) + logger.info("Sent {} events to {}/{} ({})".format(self.sent, + self.url.geturl(), self.tag, self.request)) # Handler to handle CloudWatch logs. @@ -281,14 +295,15 @@ def handler(event, context): continue # inject existing data from subscrition filters - if('extractedFields' in e.keys()): + if ('extractedFields' in e.keys()): for key in e['extractedFields']: event[key] = e['extractedFields'][key] # lambda ? if logs.tag == 'aws.lambda': # First look for the three AWS Lambda entries - mg = re.match(r'(?P(START|END|REPORT)) RequestId: (?P\S*)', e['message']) + mg = re.match( + r'(?P(START|END|REPORT)) RequestId: (?P\S*)', e['message']) if mg: parsed['RequestId'] = mg.group('request') if mg.group('type') == 'REPORT': @@ -318,7 +333,8 @@ def handler(event, context): if data: if data.group('level'): event['level'] = data.group('level') - event['time'] = fluentd_time(datetime.datetime.strptime(data.group('time'), '%Y-%m-%dT%H:%M:%S.%fZ').timestamp()) + event['time'] = fluentd_time(datetime.datetime.strptime( + data.group('time'), '%Y-%m-%dT%H:%M:%S.%fZ').timestamp()) parsed['RequestId'] = data.group('RequestId') _msg = data.group('message') @@ -339,7 +355,8 @@ def handler(event, context): parsed = json.loads(e['message']) # use eventTime and eventID from the event itself - event['time'] = fluentd_time(datetime.datetime.strptime(parsed['eventTime'], '%Y-%m-%dT%H:%M:%SZ').timestamp()) + event['time'] = fluentd_time(datetime.datetime.strptime( + parsed['eventTime'], '%Y-%m-%dT%H:%M:%SZ').timestamp()) event['id'] = parsed['eventID'] # override region from cloudtrail event source['region'] = parsed['awsRegion'] @@ -444,10 +461,12 @@ def handler(event, context): parsed[key] = data.group(key) else: - logger.warning("Could not parse ALB access log entry: {}".format(line)) + logger.warning( + "Could not parse ALB access log entry: {}".format(line)) continue - event['time'] = fluentd_time(datetime.datetime.strptime(parsed['request_creation_time'], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp()) + event['time'] = fluentd_time(datetime.datetime.strptime( + parsed['request_creation_time'], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp()) # Copy to host to allow geoip upstream event['host'] = parsed['client_ip'] @@ -471,7 +490,8 @@ def handler(event, context): row = line.split('\t') # cloudfront events are logged to the second only, date and time are seperate - event['time'] = fluentd_time(datetime.datetime.strptime(row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp()) + event['time'] = fluentd_time(datetime.datetime.strptime( + row[0] + " " + row[1], '%Y-%m-%d %H:%M:%S').timestamp()) for n, c in enumerate(columns, 2): value = row[n]