From ce57cf70f43a92ce136bd9bc423503b6b444837f Mon Sep 17 00:00:00 2001 From: Stefan Reimer Date: Sat, 4 Jan 2020 16:11:52 +0000 Subject: [PATCH] Introduce shorter timeouts for AWS API calls to fail fast and prevent cost explosions due to high log volume --- CHANGES.md | 5 +++++ index.py | 28 +++++++++++++++------------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6900e78..a370c04 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,10 @@ # Changelog +## 0.9.9 +- Improved error handling / descreased timouts calling AWS APIs to fail fast in case AWS throttles API calls + which causes each Lambda call to timout and run for 300s +- Skip over more invalid vpc flowlog entries + ## 0.9.8 - Fix for ALB AccessLog parser to handle spaces in request_url - Improved VPC FlowLog metadata augmentation diff --git a/index.py b/index.py index 7d82144..89cb27e 100644 --- a/index.py +++ b/index.py @@ -14,10 +14,11 @@ import io import urllib import datetime import boto3 +import botocore __author__ = "Stefan Reimer" __author_email__ = "stefan@zero-downtime.net" -__version__ = "0.9.8" +__version__ = "0.9.9" # IAM Alias lookup cache account_aliases = {} @@ -89,13 +90,14 @@ def get_source(region, account_id): if RESOLVE_ACCOUNT and not TEST: try: if account_id not in account_aliases: - iam = boto3.client('iam') + boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5) + iam = boto3.client('iam', config=boto3_config) account_aliases[account_id] = iam.list_account_aliases()['AccountAliases'][0] source['account_alias'] = account_aliases[account_id] - except(KeyError, IndexError): - logger.warning("Could not resolve IAM account alias") + except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): + logger.warning("Could not resolve IAM account alias, disabled for this session") RESOLVE_ACCOUNT = False pass @@ -111,16 +113,17 @@ def add_flow_metadata(flow): try: # Check cache and update if missed with all ENIs in one go if flow['interface-id'] not in enis: - ec2 = boto3.client('ec2') + boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5) + ec2 = boto3.client('ec2', config=boto3_config) interface_iter = ec2.get_paginator('describe_network_interfaces').paginate() for response in interface_iter: for interface in response['NetworkInterfaces']: - # Lookup table keyed bt ENI ID + # Lookup table by ENI ID enis[interface['NetworkInterfaceId']] = interface # Lookup table by IP to classify traffic ips[interface['PrivateIpAddress']] = interface - except(KeyError, IndexError): + except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError): logger.warning("Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG") ENHANCE_FLOWLOG = False return flow @@ -210,7 +213,7 @@ class Queue: _url = '{}/{}'.format(self.url.geturl(), self.tag) while True: try: - r = self.request.post(url=_url, data=msgpack.packb(self._queue), verify=self.verify_certs) + r = self.request.post(url=_url, data=msgpack.packb(self._queue), verify=self.verify_certs, timeout=(6, 30)) if r: break else: @@ -220,12 +223,11 @@ class Queue: logger.warning("RequestException: {}".format(e)) pass - if retries >= 8: + if retries >= 2: raise Exception("Error sending {} events to {}. Giving up.".format(events, _url)) retries = retries + 1 - logger.warning("Error sending {} events to {}. Retrying in {} seconds.".format(events, _url, retries**2)) - time.sleep(retries**2) + time.sleep(1) else: logger.debug("Test mode, dump only: {}".format(msgpack.packb(self._queue))) @@ -358,8 +360,8 @@ def handler(event, context): elif logs.tag == 'aws.vpcflowlog': row = e['message'].split(" ") - # Skip over NODATA entries, what would be the point having these in ES ? - if row[13] == 'NODATA': + # Skip over NODATA,SKIPDATA entries, what would be the point having these in ES ? + if row[13] != 'OK': continue parsed = add_flow_metadata({'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7],