Introduce shorter timeouts for AWS API calls to fail fast and prevent cost explosions due to high log volume

This commit is contained in:
Stefan Reimer 2020-01-04 16:11:52 +00:00
parent f0fd0c1e07
commit ce57cf70f4
2 changed files with 20 additions and 13 deletions

View File

@ -1,5 +1,10 @@
# Changelog # Changelog
## 0.9.9
- Improved error handling / descreased timouts calling AWS APIs to fail fast in case AWS throttles API calls
which causes each Lambda call to timout and run for 300s
- Skip over more invalid vpc flowlog entries
## 0.9.8 ## 0.9.8
- Fix for ALB AccessLog parser to handle spaces in request_url - Fix for ALB AccessLog parser to handle spaces in request_url
- Improved VPC FlowLog metadata augmentation - Improved VPC FlowLog metadata augmentation

View File

@ -14,10 +14,11 @@ import io
import urllib import urllib
import datetime import datetime
import boto3 import boto3
import botocore
__author__ = "Stefan Reimer" __author__ = "Stefan Reimer"
__author_email__ = "stefan@zero-downtime.net" __author_email__ = "stefan@zero-downtime.net"
__version__ = "0.9.8" __version__ = "0.9.9"
# IAM Alias lookup cache # IAM Alias lookup cache
account_aliases = {} account_aliases = {}
@ -89,13 +90,14 @@ def get_source(region, account_id):
if RESOLVE_ACCOUNT and not TEST: if RESOLVE_ACCOUNT and not TEST:
try: try:
if account_id not in account_aliases: if account_id not in account_aliases:
iam = boto3.client('iam') boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5)
iam = boto3.client('iam', config=boto3_config)
account_aliases[account_id] = iam.list_account_aliases()['AccountAliases'][0] account_aliases[account_id] = iam.list_account_aliases()['AccountAliases'][0]
source['account_alias'] = account_aliases[account_id] source['account_alias'] = account_aliases[account_id]
except(KeyError, IndexError): except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError):
logger.warning("Could not resolve IAM account alias") logger.warning("Could not resolve IAM account alias, disabled for this session")
RESOLVE_ACCOUNT = False RESOLVE_ACCOUNT = False
pass pass
@ -111,16 +113,17 @@ def add_flow_metadata(flow):
try: try:
# Check cache and update if missed with all ENIs in one go # Check cache and update if missed with all ENIs in one go
if flow['interface-id'] not in enis: if flow['interface-id'] not in enis:
ec2 = boto3.client('ec2') boto3_config = botocore.config.Config(retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5)
ec2 = boto3.client('ec2', config=boto3_config)
interface_iter = ec2.get_paginator('describe_network_interfaces').paginate() interface_iter = ec2.get_paginator('describe_network_interfaces').paginate()
for response in interface_iter: for response in interface_iter:
for interface in response['NetworkInterfaces']: for interface in response['NetworkInterfaces']:
# Lookup table keyed bt ENI ID # Lookup table by ENI ID
enis[interface['NetworkInterfaceId']] = interface enis[interface['NetworkInterfaceId']] = interface
# Lookup table by IP to classify traffic # Lookup table by IP to classify traffic
ips[interface['PrivateIpAddress']] = interface ips[interface['PrivateIpAddress']] = interface
except(KeyError, IndexError): except(botocore.exceptions.ConnectTimeoutError, KeyError, IndexError):
logger.warning("Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG") logger.warning("Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG")
ENHANCE_FLOWLOG = False ENHANCE_FLOWLOG = False
return flow return flow
@ -210,7 +213,7 @@ class Queue:
_url = '{}/{}'.format(self.url.geturl(), self.tag) _url = '{}/{}'.format(self.url.geturl(), self.tag)
while True: while True:
try: try:
r = self.request.post(url=_url, data=msgpack.packb(self._queue), verify=self.verify_certs) r = self.request.post(url=_url, data=msgpack.packb(self._queue), verify=self.verify_certs, timeout=(6, 30))
if r: if r:
break break
else: else:
@ -220,12 +223,11 @@ class Queue:
logger.warning("RequestException: {}".format(e)) logger.warning("RequestException: {}".format(e))
pass pass
if retries >= 8: if retries >= 2:
raise Exception("Error sending {} events to {}. Giving up.".format(events, _url)) raise Exception("Error sending {} events to {}. Giving up.".format(events, _url))
retries = retries + 1 retries = retries + 1
logger.warning("Error sending {} events to {}. Retrying in {} seconds.".format(events, _url, retries**2)) time.sleep(1)
time.sleep(retries**2)
else: else:
logger.debug("Test mode, dump only: {}".format(msgpack.packb(self._queue))) logger.debug("Test mode, dump only: {}".format(msgpack.packb(self._queue)))
@ -358,8 +360,8 @@ def handler(event, context):
elif logs.tag == 'aws.vpcflowlog': elif logs.tag == 'aws.vpcflowlog':
row = e['message'].split(" ") row = e['message'].split(" ")
# Skip over NODATA entries, what would be the point having these in ES ? # Skip over NODATA,SKIPDATA entries, what would be the point having these in ES ?
if row[13] == 'NODATA': if row[13] != 'OK':
continue continue
parsed = add_flow_metadata({'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7], parsed = add_flow_metadata({'interface-id': row[2], 'srcaddr': row[3], 'dstaddr': row[4], 'srcport': row[5], 'dstport': row[6], 'protocol': row[7],