2019-05-08 11:51:48 +00:00
|
|
|
#!/usr/bin/env python
|
|
|
|
import base64
|
|
|
|
import requests
|
|
|
|
import gzip
|
|
|
|
import json
|
|
|
|
import msgpack
|
|
|
|
import struct
|
|
|
|
import os
|
|
|
|
import shutil
|
|
|
|
import re
|
|
|
|
import logging
|
|
|
|
import time
|
|
|
|
import io
|
|
|
|
import urllib
|
|
|
|
import datetime
|
|
|
|
import boto3
|
2020-01-04 16:11:52 +00:00
|
|
|
import botocore
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
__author__ = "Stefan Reimer"
|
|
|
|
__author_email__ = "stefan@zero-downtime.net"
|
2021-03-04 13:47:36 +00:00
|
|
|
__version__ = "0.9.10"
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2019-07-28 11:34:05 +00:00
|
|
|
# IAM Alias lookup cache
|
2019-05-08 11:51:48 +00:00
|
|
|
account_aliases = {}
|
|
|
|
|
2019-07-28 11:34:05 +00:00
|
|
|
# ENI lookup cache
|
2019-07-23 15:24:36 +00:00
|
|
|
enis = {}
|
|
|
|
|
2019-07-28 11:34:05 +00:00
|
|
|
# IP lookup cache
|
|
|
|
ips = {}
|
|
|
|
|
2019-05-08 11:51:48 +00:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
2022-04-18 21:57:51 +00:00
|
|
|
logging.getLogger("boto3").setLevel(logging.WARNING)
|
|
|
|
logging.getLogger("botocore").setLevel(logging.WARNING)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
|
|
|
|
def boolean(value):
|
2022-04-18 21:57:51 +00:00
|
|
|
if value in ("t", "T", "true", "True", "TRUE", "1", 1, True):
|
2019-05-08 11:51:48 +00:00
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def decrypt(encrypted):
|
|
|
|
try:
|
2022-04-18 21:57:51 +00:00
|
|
|
kms = boto3.client("kms")
|
|
|
|
plaintext = kms.decrypt(CiphertextBlob=base64.b64decode(encrypted))["Plaintext"]
|
2019-05-08 11:51:48 +00:00
|
|
|
return plaintext.decode()
|
|
|
|
except Exception:
|
|
|
|
logging.exception("Failed to decrypt via KMS")
|
|
|
|
|
|
|
|
|
|
|
|
CHUNK_SIZE = 128
|
2022-04-18 21:57:51 +00:00
|
|
|
DEBUG = boolean(os.getenv("DEBUG", default=False))
|
|
|
|
TEST = boolean(os.getenv("TEST", default=False))
|
|
|
|
RESOLVE_ACCOUNT = boolean(os.getenv("RESOLVE_ACCOUNT", default=True))
|
|
|
|
ENHANCE_FLOWLOG = boolean(os.getenv("ENHANCE_FLOWLOG", default=True))
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
if DEBUG:
|
|
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
else:
|
|
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
|
|
|
|
|
|
|
|
# From fluent/fluent-logger-python
|
|
|
|
class EventTime(msgpack.ExtType):
|
|
|
|
def __new__(cls, timestamp):
|
|
|
|
seconds = int(timestamp)
|
2022-04-18 21:57:51 +00:00
|
|
|
nanoseconds = int(timestamp % 1 * 10**9)
|
2019-05-08 11:51:48 +00:00
|
|
|
return super(EventTime, cls).__new__(
|
|
|
|
cls,
|
|
|
|
code=0,
|
|
|
|
data=struct.pack(">II", seconds, nanoseconds),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def fluentd_time(timestamp):
|
|
|
|
if isinstance(timestamp, float):
|
|
|
|
return EventTime(timestamp)
|
|
|
|
else:
|
|
|
|
return int(timestamp)
|
|
|
|
|
|
|
|
|
|
|
|
def get_source(region, account_id):
|
2022-04-18 21:57:51 +00:00
|
|
|
"""returns a new base source object
|
|
|
|
resolves aws account_id to account alias and caches for lifetime of lambda function
|
2019-05-08 11:51:48 +00:00
|
|
|
"""
|
2019-06-28 15:36:23 +00:00
|
|
|
global RESOLVE_ACCOUNT
|
2022-04-18 21:57:51 +00:00
|
|
|
source = {"account": account_id, "region": region}
|
2019-05-08 11:51:48 +00:00
|
|
|
if RESOLVE_ACCOUNT and not TEST:
|
|
|
|
try:
|
|
|
|
if account_id not in account_aliases:
|
2022-04-18 21:57:51 +00:00
|
|
|
boto3_config = botocore.config.Config(
|
|
|
|
retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5
|
|
|
|
)
|
|
|
|
iam = boto3.client("iam", config=boto3_config)
|
|
|
|
account_aliases[account_id] = iam.list_account_aliases()[
|
|
|
|
"AccountAliases"
|
|
|
|
][0]
|
|
|
|
|
|
|
|
source["account_alias"] = account_aliases[account_id]
|
|
|
|
|
|
|
|
except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError):
|
|
|
|
logger.warning(
|
|
|
|
"Could not resolve IAM account alias, disabled for this session"
|
|
|
|
)
|
2019-06-28 15:36:23 +00:00
|
|
|
RESOLVE_ACCOUNT = False
|
2019-05-08 11:51:48 +00:00
|
|
|
pass
|
|
|
|
|
|
|
|
return source
|
|
|
|
|
|
|
|
|
2019-07-28 11:34:05 +00:00
|
|
|
def add_flow_metadata(flow):
|
2022-04-18 21:57:51 +00:00
|
|
|
"""adds metadata to VPC flow: ENI, direction, type
|
|
|
|
caches the ENI and IP lookup tables for Lambda lifetime
|
2019-07-23 15:24:36 +00:00
|
|
|
"""
|
2019-07-28 11:34:05 +00:00
|
|
|
global ENHANCE_FLOWLOG
|
|
|
|
if ENHANCE_FLOWLOG and not TEST:
|
2019-07-23 15:24:36 +00:00
|
|
|
try:
|
2019-07-28 11:34:05 +00:00
|
|
|
# Check cache and update if missed with all ENIs in one go
|
2022-04-18 21:57:51 +00:00
|
|
|
if flow["interface-id"] not in enis:
|
|
|
|
boto3_config = botocore.config.Config(
|
|
|
|
retries=dict(max_attempts=2), connect_timeout=3, read_timeout=5
|
|
|
|
)
|
|
|
|
ec2 = boto3.client("ec2", config=boto3_config)
|
|
|
|
interface_iter = ec2.get_paginator(
|
|
|
|
"describe_network_interfaces"
|
|
|
|
).paginate()
|
2019-07-23 15:24:36 +00:00
|
|
|
for response in interface_iter:
|
2022-04-18 21:57:51 +00:00
|
|
|
for interface in response["NetworkInterfaces"]:
|
2020-01-04 16:11:52 +00:00
|
|
|
# Lookup table by ENI ID
|
2022-04-18 21:57:51 +00:00
|
|
|
enis[interface["NetworkInterfaceId"]] = interface
|
2019-07-28 11:34:05 +00:00
|
|
|
|
|
|
|
# Lookup table by IP to classify traffic
|
2022-04-18 21:57:51 +00:00
|
|
|
ips[interface["PrivateIpAddress"]] = interface
|
|
|
|
except (botocore.exceptions.ConnectTimeoutError, KeyError, IndexError):
|
|
|
|
logger.warning(
|
|
|
|
"Error trying to get metadata for ENIs, disabling ENHANCE_FLOWLOG"
|
|
|
|
)
|
2019-08-06 23:20:11 +00:00
|
|
|
ENHANCE_FLOWLOG = False
|
|
|
|
return flow
|
2019-07-28 11:34:05 +00:00
|
|
|
|
2019-08-06 23:20:11 +00:00
|
|
|
try:
|
2022-04-18 21:57:51 +00:00
|
|
|
eni = enis[flow["interface-id"]]
|
|
|
|
metadata = {
|
|
|
|
"eni.az": eni["AvailabilityZone"],
|
|
|
|
"eni.subnet": eni["SubnetId"],
|
|
|
|
}
|
2019-08-06 23:20:11 +00:00
|
|
|
remote_ip = None
|
2022-04-18 21:57:51 +00:00
|
|
|
if len(eni["Groups"]):
|
|
|
|
metadata["eni.sg"] = eni["Groups"][0]["GroupName"]
|
2019-07-28 11:34:05 +00:00
|
|
|
|
|
|
|
# Add PublicIP if attached
|
2022-04-18 21:57:51 +00:00
|
|
|
if "Association" in eni and "PublicIp" in eni["Association"]:
|
|
|
|
metadata["eni.public_ip"] = eni["Association"]["PublicIp"]
|
2019-07-28 11:34:05 +00:00
|
|
|
|
|
|
|
# Determine traffic direction
|
2022-04-18 21:57:51 +00:00
|
|
|
if eni["PrivateIpAddress"] == flow["srcaddr"]:
|
|
|
|
metadata["direction"] = "Out"
|
|
|
|
remote_ip = flow["dstaddr"]
|
|
|
|
elif eni["PrivateIpAddress"] == flow["dstaddr"]:
|
|
|
|
metadata["direction"] = "In"
|
|
|
|
remote_ip = flow["srcaddr"]
|
2019-07-28 11:34:05 +00:00
|
|
|
|
|
|
|
# Try to classify traffic:
|
|
|
|
# Free,Regional,Out
|
2019-08-06 23:20:11 +00:00
|
|
|
if remote_ip:
|
|
|
|
if remote_ip in ips:
|
2022-04-18 21:57:51 +00:00
|
|
|
if (
|
|
|
|
ips[remote_ip]["AvailabilityZone"] == eni["AvailabilityZone"]
|
|
|
|
and ips[remote_ip]["VpcId"] == eni["VpcId"]
|
|
|
|
):
|
|
|
|
metadata["traffic_class"] = "Free"
|
2019-08-06 23:20:11 +00:00
|
|
|
else:
|
2022-04-18 21:57:51 +00:00
|
|
|
metadata["traffic_class"] = "Regional"
|
2019-07-28 11:34:05 +00:00
|
|
|
else:
|
2019-08-06 23:20:11 +00:00
|
|
|
# Incoming traffic is free 90% of times
|
2022-04-18 21:57:51 +00:00
|
|
|
if metadata["direction"] == "In":
|
|
|
|
metadata["traffic_class"] = "Free"
|
2019-08-06 23:20:11 +00:00
|
|
|
else:
|
2022-04-18 21:57:51 +00:00
|
|
|
metadata["traffic_class"] = "Out"
|
2019-07-23 15:24:36 +00:00
|
|
|
|
2019-07-28 11:34:05 +00:00
|
|
|
flow.update(metadata)
|
2019-07-23 15:24:36 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
except (KeyError, IndexError) as e:
|
|
|
|
logger.warning(
|
|
|
|
"Could not get additional data for ENI {} ({})".format(
|
|
|
|
flow["interface-id"], e
|
|
|
|
)
|
|
|
|
)
|
2019-07-23 15:24:36 +00:00
|
|
|
pass
|
|
|
|
|
2019-07-28 11:34:05 +00:00
|
|
|
return flow
|
2019-07-23 15:24:36 +00:00
|
|
|
|
|
|
|
|
2019-05-08 11:51:48 +00:00
|
|
|
class Queue:
|
2022-04-18 21:57:51 +00:00
|
|
|
url = urllib.parse.urlsplit(os.getenv("FLUENTD_URL", default=""), scheme="https")
|
|
|
|
passwd = os.getenv("FLUENT_SHARED_KEY", default=None)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
verify_certs = os.getenv("FLUENTD_VERIFY_CERTS", default=1)
|
|
|
|
if verify_certs in ("f", "F", "false", "False", "FALSE", "0", 0, False):
|
2019-05-08 11:51:48 +00:00
|
|
|
verify_certs = False
|
|
|
|
else:
|
|
|
|
verify_certs = True
|
|
|
|
|
|
|
|
# cached request session
|
|
|
|
request = requests.Session()
|
|
|
|
request.headers = {"Content-type": "application/msgpack"}
|
|
|
|
if passwd:
|
|
|
|
request.auth = ("fluent", passwd)
|
|
|
|
|
|
|
|
def __init__(self, tag):
|
|
|
|
self._queue = []
|
|
|
|
self.tag = tag
|
|
|
|
self.sent = 0
|
|
|
|
|
|
|
|
def send(self, event):
|
|
|
|
self._queue.append(event)
|
|
|
|
logger.debug("Queued {} event: {}".format(self.tag, event))
|
|
|
|
# Send events in chunks
|
|
|
|
if len(self._queue) >= CHUNK_SIZE:
|
|
|
|
self.flush()
|
|
|
|
|
|
|
|
def flush(self):
|
|
|
|
events = len(self._queue)
|
|
|
|
if not events:
|
|
|
|
return
|
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
logger.debug(
|
|
|
|
"Sending {} events to {}/{} ({})".format(
|
|
|
|
events, self.url.geturl(), self.tag, self.request
|
|
|
|
)
|
|
|
|
)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
if not TEST:
|
|
|
|
# Send events via POSTs reusing the same https connection, retry couple of times
|
|
|
|
retries = 0
|
2022-04-18 21:57:51 +00:00
|
|
|
_url = "{}/{}".format(self.url.geturl(), self.tag)
|
2019-05-08 11:51:48 +00:00
|
|
|
while True:
|
|
|
|
try:
|
2022-04-18 21:57:51 +00:00
|
|
|
r = self.request.post(
|
|
|
|
url=_url,
|
|
|
|
data=msgpack.packb(self._queue),
|
|
|
|
verify=self.verify_certs,
|
|
|
|
timeout=(6, 30),
|
|
|
|
)
|
2019-06-03 12:33:53 +00:00
|
|
|
if r:
|
2019-05-08 11:51:48 +00:00
|
|
|
break
|
2019-06-03 12:33:53 +00:00
|
|
|
else:
|
|
|
|
logger.warning("HTTP Error: {}".format(r.status_code))
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
except requests.RequestException as e:
|
2019-06-03 12:33:53 +00:00
|
|
|
logger.warning("RequestException: {}".format(e))
|
2019-05-08 11:51:48 +00:00
|
|
|
pass
|
|
|
|
|
2020-01-04 16:11:52 +00:00
|
|
|
if retries >= 2:
|
2022-04-18 21:57:51 +00:00
|
|
|
raise Exception(
|
|
|
|
"Error sending {} events to {}. Giving up.".format(events, _url)
|
|
|
|
)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
retries = retries + 1
|
2020-01-04 16:11:52 +00:00
|
|
|
time.sleep(1)
|
2019-05-08 11:51:48 +00:00
|
|
|
else:
|
|
|
|
logger.debug("Test mode, dump only: {}".format(msgpack.packb(self._queue)))
|
|
|
|
|
|
|
|
self.sent = self.sent + events
|
|
|
|
self._queue = []
|
|
|
|
|
|
|
|
def info(self):
|
2022-04-18 21:57:51 +00:00
|
|
|
logger.info(
|
|
|
|
"Sent {} events to {}/{} ({})".format(
|
|
|
|
self.sent, self.url.geturl(), self.tag, self.request
|
|
|
|
)
|
|
|
|
)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
|
|
|
|
# Handler to handle CloudWatch logs.
|
|
|
|
def handler(event, context):
|
|
|
|
logger.debug("Event received: {}".format(event))
|
|
|
|
|
|
|
|
(region, account_id) = context.invoked_function_arn.split(":")[3:5]
|
|
|
|
|
|
|
|
# Cloudwatch Logs event
|
2022-04-18 21:57:51 +00:00
|
|
|
if "awslogs" in event:
|
2019-05-08 11:51:48 +00:00
|
|
|
# Grab the base64-encoded data.
|
2022-04-18 21:57:51 +00:00
|
|
|
b64strg = event["awslogs"]["data"]
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# Decode base64-encoded string, which should be a gzipped object.
|
|
|
|
zippedContent = io.BytesIO(base64.b64decode(b64strg))
|
|
|
|
|
|
|
|
# Decompress the content and load JSON.
|
2022-04-18 21:57:51 +00:00
|
|
|
with gzip.GzipFile(mode="rb", fileobj=zippedContent) as content:
|
2019-05-08 11:51:48 +00:00
|
|
|
for line in content:
|
|
|
|
awsLogsData = json.loads(line.decode())
|
|
|
|
|
|
|
|
# First determine type
|
2022-04-18 21:57:51 +00:00
|
|
|
if re.match("/aws/lambda/", awsLogsData["logGroup"]):
|
2019-05-08 11:51:48 +00:00
|
|
|
logs = Queue("aws.lambda")
|
2022-04-18 21:57:51 +00:00
|
|
|
elif re.search("cloudtrail", awsLogsData["logGroup"], flags=re.IGNORECASE):
|
2019-05-08 11:51:48 +00:00
|
|
|
logs = Queue("aws.cloudtrail")
|
2022-04-18 21:57:51 +00:00
|
|
|
elif re.match("RDSOSMetrics", awsLogsData["logGroup"]):
|
2019-05-08 11:51:48 +00:00
|
|
|
logs = Queue("aws.rdsosmetrics")
|
2022-04-18 21:57:51 +00:00
|
|
|
elif re.match("vpcflowlog", awsLogsData["logGroup"], flags=re.IGNORECASE):
|
2019-07-22 14:33:07 +00:00
|
|
|
logs = Queue("aws.vpcflowlog")
|
2019-05-08 11:51:48 +00:00
|
|
|
else:
|
|
|
|
logs = Queue("aws.cloudwatch_logs")
|
|
|
|
|
|
|
|
# Build list of log events
|
2022-04-18 21:57:51 +00:00
|
|
|
for e in awsLogsData["logEvents"]:
|
2019-05-08 11:51:48 +00:00
|
|
|
event = {}
|
|
|
|
source = get_source(region, account_id)
|
|
|
|
parsed = {}
|
|
|
|
|
2019-06-24 16:14:40 +00:00
|
|
|
# Remove whitespace / empty events & skip over empty events
|
2022-04-18 21:57:51 +00:00
|
|
|
e["message"] = e["message"].strip()
|
|
|
|
if re.match(r"^\s*$", e["message"]):
|
2019-06-24 16:14:40 +00:00
|
|
|
continue
|
|
|
|
|
2019-05-08 11:51:48 +00:00
|
|
|
# inject existing data from subscrition filters
|
2022-04-18 21:57:51 +00:00
|
|
|
if "extractedFields" in e.keys():
|
|
|
|
for key in e["extractedFields"]:
|
|
|
|
event[key] = e["extractedFields"][key]
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# lambda ?
|
2022-04-18 21:57:51 +00:00
|
|
|
if logs.tag == "aws.lambda":
|
2019-05-08 11:51:48 +00:00
|
|
|
# First look for the three AWS Lambda entries
|
2022-04-18 21:57:51 +00:00
|
|
|
mg = re.match(
|
|
|
|
r"(?P<type>(START|END|REPORT)) RequestId: (?P<request>\S*)",
|
|
|
|
e["message"],
|
|
|
|
)
|
2019-05-08 11:51:48 +00:00
|
|
|
if mg:
|
2022-04-18 21:57:51 +00:00
|
|
|
parsed["RequestId"] = mg.group("request")
|
|
|
|
if mg.group("type") == "REPORT":
|
|
|
|
pattern = r".*(?:\tDuration: (?P<duration>[\d\.\d]+) ms\s*)(?:\tBilled Duration: (?P<billed_duration>[\d\.\d]+) ms\s*)(?:\tMemory Size: (?P<memory_size>[\d\.\d]+) MB\s*)(?:\tMax Memory Used: (?P<max_memory_used>[\d\.\d]+) MB)(?:\tInit Duration: (?P<init_duration>[\d\.\d]+) ms\s*)?"
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
elif mg.group("type") == "START":
|
|
|
|
pattern = r".*(?:Version: (?P<version>.*))"
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
else:
|
2022-04-18 21:57:51 +00:00
|
|
|
pattern = ""
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
data = re.match(pattern, e["message"])
|
2019-05-08 11:51:48 +00:00
|
|
|
for key in data.groupdict().keys():
|
2020-03-20 14:17:18 +00:00
|
|
|
if data.group(key):
|
|
|
|
parsed[key] = data.group(key)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# All other info parsed, so just set type itself
|
2022-04-18 21:57:51 +00:00
|
|
|
event["message"] = mg.group("type")
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
else:
|
|
|
|
# Try to extract data from AWS default python logging format
|
|
|
|
# This normalizes print vs. logging entries and allows requestid tracking
|
|
|
|
# "[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t%(message)s\n"
|
2022-04-18 21:57:51 +00:00
|
|
|
_msg = e["message"]
|
|
|
|
pattern = r"(?:\[(?P<level>[^\]]*)\]\s)?(?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}Z)\s(?P<RequestId>\S*?)\s(?P<message>.*)"
|
|
|
|
data = re.match(pattern, e["message"], flags=re.DOTALL)
|
2019-05-08 11:51:48 +00:00
|
|
|
if data:
|
2022-04-18 21:57:51 +00:00
|
|
|
if data.group("level"):
|
|
|
|
event["level"] = data.group("level")
|
|
|
|
event["time"] = fluentd_time(
|
|
|
|
datetime.datetime.strptime(
|
|
|
|
data.group("time"), "%Y-%m-%dT%H:%M:%S.%fZ"
|
|
|
|
).timestamp()
|
|
|
|
)
|
|
|
|
parsed["RequestId"] = data.group("RequestId")
|
|
|
|
_msg = data.group("message")
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# try to parse the remaining as json
|
|
|
|
try:
|
|
|
|
_json = json.loads(_msg)
|
|
|
|
# Make sure we have an actual object assigned to json field
|
|
|
|
if isinstance(_json, dict):
|
2022-04-18 21:57:51 +00:00
|
|
|
event["message_json"] = _json
|
2019-05-08 11:51:48 +00:00
|
|
|
else:
|
2022-04-18 21:57:51 +00:00
|
|
|
event["message"] = _json
|
2019-05-08 11:51:48 +00:00
|
|
|
except (ValueError, TypeError, KeyError):
|
2022-04-18 21:57:51 +00:00
|
|
|
event["message"] = _msg
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# cloudtrail ?
|
2022-04-18 21:57:51 +00:00
|
|
|
elif logs.tag == "aws.cloudtrail":
|
2019-05-08 11:51:48 +00:00
|
|
|
try:
|
2022-04-18 21:57:51 +00:00
|
|
|
parsed = json.loads(e["message"])
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# use eventTime and eventID from the event itself
|
2022-04-18 21:57:51 +00:00
|
|
|
event["time"] = fluentd_time(
|
|
|
|
datetime.datetime.strptime(
|
|
|
|
parsed["eventTime"], "%Y-%m-%dT%H:%M:%SZ"
|
|
|
|
).timestamp()
|
|
|
|
)
|
|
|
|
event["id"] = parsed["eventID"]
|
2019-05-08 11:51:48 +00:00
|
|
|
# override region from cloudtrail event
|
2022-04-18 21:57:51 +00:00
|
|
|
source["region"] = parsed["awsRegion"]
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
except (ValueError, TypeError, KeyError):
|
2022-04-18 21:57:51 +00:00
|
|
|
event["message"] = e["message"]
|
2019-05-08 11:51:48 +00:00
|
|
|
parsed.clear()
|
|
|
|
|
|
|
|
# RDS metrics ?
|
2022-04-18 21:57:51 +00:00
|
|
|
elif logs.tag == "aws.rdsosmetrics":
|
2019-05-08 11:51:48 +00:00
|
|
|
try:
|
2022-04-18 21:57:51 +00:00
|
|
|
parsed = json.loads(e["message"])
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
except (ValueError, TypeError, KeyError):
|
2022-04-18 21:57:51 +00:00
|
|
|
event["message"] = e["message"]
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2019-07-22 14:33:07 +00:00
|
|
|
# VPC FlowLog ?
|
|
|
|
# <version> <account-id> <interface-id> <srcaddr> <dstaddr> <srcport> <dstport> <protocol> <packets> <bytes> <start> <end> <action> <log-status>
|
2022-04-18 21:57:51 +00:00
|
|
|
elif logs.tag == "aws.vpcflowlog":
|
|
|
|
row = e["message"].split(" ")
|
2019-07-22 14:33:07 +00:00
|
|
|
|
2020-01-04 16:11:52 +00:00
|
|
|
# Skip over NODATA,SKIPDATA entries, what would be the point having these in ES ?
|
2022-04-18 21:57:51 +00:00
|
|
|
if row[13] != "OK":
|
2019-07-22 14:33:07 +00:00
|
|
|
continue
|
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
parsed = add_flow_metadata(
|
|
|
|
{
|
|
|
|
"interface-id": row[2],
|
|
|
|
"srcaddr": row[3],
|
|
|
|
"dstaddr": row[4],
|
|
|
|
"srcport": row[5],
|
|
|
|
"dstport": row[6],
|
|
|
|
"protocol": row[7],
|
|
|
|
"packets": row[8],
|
|
|
|
"bytes": row[9],
|
|
|
|
"start": row[10],
|
|
|
|
"end": row[11],
|
|
|
|
"action": row[12],
|
|
|
|
"log-status": row[13],
|
|
|
|
}
|
|
|
|
)
|
2019-07-23 15:24:36 +00:00
|
|
|
|
2019-05-08 11:51:48 +00:00
|
|
|
# Fallback add raw message
|
|
|
|
else:
|
2022-04-18 21:57:51 +00:00
|
|
|
event["message"] = e["message"]
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
if parsed and logs.tag:
|
|
|
|
event[logs.tag] = parsed
|
|
|
|
|
|
|
|
# Forward cloudwatch logs event ID
|
2022-04-18 21:57:51 +00:00
|
|
|
source["log_group"] = awsLogsData["logGroup"]
|
|
|
|
source["log_stream"] = awsLogsData["logStream"]
|
|
|
|
event["source"] = source
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# If time and id are not set yet use data from cloudwatch logs event
|
2022-04-18 21:57:51 +00:00
|
|
|
if "time" not in event:
|
|
|
|
event["time"] = fluentd_time(e["timestamp"] / 1000)
|
|
|
|
if "id" not in source:
|
|
|
|
event["id"] = e["id"]
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
logs.send(event)
|
|
|
|
|
|
|
|
logs.flush()
|
|
|
|
logs.info()
|
|
|
|
|
|
|
|
# S3 Put event
|
2022-04-18 21:57:51 +00:00
|
|
|
elif "Records" in event:
|
|
|
|
s3_client = boto3.client("s3")
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
bucket = event["Records"][0]["s3"]["bucket"]["name"]
|
|
|
|
key = event["Records"][0]["s3"]["object"]["key"]
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
file_path = "/tmp/stream2fluentd.gz"
|
2019-05-08 11:51:48 +00:00
|
|
|
if TEST:
|
|
|
|
shutil.copyfile(key, file_path)
|
|
|
|
else:
|
|
|
|
s3_client.download_file(bucket, key, file_path)
|
|
|
|
source = get_source(region, account_id)
|
2022-04-18 21:57:51 +00:00
|
|
|
source["s3_url"] = "{}/{}".format(bucket, key)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
alb_regex = re.compile(
|
|
|
|
r"(?P<type>[^ ]*) (?P<timestamp>[^ ]*) (?P<elb>[^ ]*) (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*) (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*) (?P<request_processing_time>[-.0-9]*) (?P<target_processing_time>[-.0-9]*) (?P<response_processing_time>[-.0-9]*) (?P<elb_status_code>|[-0-9]*) (?P<target_status_code>-|[-0-9]*) (?P<received_bytes>[-0-9]*) (?P<sent_bytes>[-0-9]*) \"(?P<request_verb>[^ ]*) (?P<request_url>[^\"]*) (?P<request_proto>- |[^ ]*)\" \"(?P<user_agent>[^\"]*)\" (?P<ssl_cipher>[A-Z0-9-]+) (?P<ssl_protocol>[A-Za-z0-9.-]*) (?P<target_group_arn>[^ ]*) \"(?P<trace_id>[^\"]*)\" \"(?P<domain_name>[^\"]*)\" \"(?P<chosen_cert_arn>[^\"]*)\" (?P<matched_rule_priority>[-.0-9]*) (?P<request_creation_time>[^ ]*) \"(?P<actions_executed>[^\"]*)\" \"(?P<redirect_url>[^ ]*)\" \"(?P<error_reason>[^ ]*)\""
|
|
|
|
)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2019-06-26 12:21:23 +00:00
|
|
|
# try to identify file type by looking at first lines
|
2022-04-18 21:57:51 +00:00
|
|
|
with gzip.open(file_path, mode="rt", newline="\n") as data:
|
2019-06-26 12:21:23 +00:00
|
|
|
header = data.readlines(2048)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2019-06-26 12:21:23 +00:00
|
|
|
# ALB Access ?
|
|
|
|
if alb_regex.match(header[0]):
|
|
|
|
logs = Queue("aws.alb_accesslog")
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2019-06-26 12:21:23 +00:00
|
|
|
# cloudfront access logs
|
2022-04-18 21:57:51 +00:00
|
|
|
elif (
|
|
|
|
len(header) > 1
|
|
|
|
and re.match("#Version:", header[0])
|
|
|
|
and re.match("#Fields:", header[1])
|
|
|
|
):
|
2019-06-26 12:21:23 +00:00
|
|
|
logs = Queue("aws.cloudfront_accesslog")
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2019-06-26 12:21:23 +00:00
|
|
|
else:
|
|
|
|
logger.warning("{}/{}: Unknown type!".format(bucket, key))
|
|
|
|
return
|
2019-05-08 11:51:48 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
if logs.tag == "aws.alb_accesslog":
|
|
|
|
with gzip.open(file_path, mode="rt", newline="\n") as data:
|
2019-05-08 11:51:48 +00:00
|
|
|
for line in data:
|
|
|
|
event = {}
|
|
|
|
parsed = {}
|
|
|
|
data = alb_regex.match(line)
|
|
|
|
if data:
|
|
|
|
for key in data.groupdict().keys():
|
|
|
|
value = data.group(key)
|
|
|
|
|
|
|
|
# Remove empty values
|
2022-04-18 21:57:51 +00:00
|
|
|
if value in ["-", "-\n"]:
|
2019-05-08 11:51:48 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
# Remove times of requests timed out
|
2022-04-18 21:57:51 +00:00
|
|
|
if key in [
|
|
|
|
"request_processing_time",
|
|
|
|
"target_processing_time",
|
|
|
|
"response_processing_time",
|
|
|
|
] and value in ["-1"]:
|
2019-05-08 11:51:48 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
parsed[key] = data.group(key)
|
|
|
|
else:
|
2022-04-18 21:57:51 +00:00
|
|
|
logger.warning(
|
|
|
|
"Could not parse ALB access log entry: {}".format(line)
|
|
|
|
)
|
2019-05-08 11:51:48 +00:00
|
|
|
continue
|
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
event["time"] = fluentd_time(
|
|
|
|
datetime.datetime.strptime(
|
|
|
|
parsed["request_creation_time"], "%Y-%m-%dT%H:%M:%S.%fZ"
|
|
|
|
).timestamp()
|
|
|
|
)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
# Copy to host to allow geoip upstream
|
2022-04-18 21:57:51 +00:00
|
|
|
event["host"] = parsed["client_ip"]
|
2019-05-08 11:51:48 +00:00
|
|
|
event[logs.tag] = parsed
|
2022-04-18 21:57:51 +00:00
|
|
|
event["source"] = source
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
logs.send(event)
|
2019-06-26 12:21:23 +00:00
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
elif logs.tag == "aws.cloudfront_accesslog":
|
|
|
|
with gzip.open(file_path, mode="rt", newline="\n") as data:
|
2019-06-26 12:21:23 +00:00
|
|
|
next(data)
|
|
|
|
# columns are in second line: first is #Fields, next two are merged into time later
|
|
|
|
columns = next(data).split()[3:]
|
|
|
|
|
|
|
|
for line in data:
|
|
|
|
event = {}
|
|
|
|
parsed = {}
|
|
|
|
|
|
|
|
# Todo hash each line to create source['id']
|
|
|
|
# source['id'] = md5.of.line matching ES ids
|
|
|
|
|
2022-04-18 21:57:51 +00:00
|
|
|
row = line.split("\t")
|
2019-06-26 12:21:23 +00:00
|
|
|
# cloudfront events are logged to the second only, date and time are seperate
|
2022-04-18 21:57:51 +00:00
|
|
|
event["time"] = fluentd_time(
|
|
|
|
datetime.datetime.strptime(
|
|
|
|
row[0] + " " + row[1], "%Y-%m-%d %H:%M:%S"
|
|
|
|
).timestamp()
|
|
|
|
)
|
2019-06-26 12:21:23 +00:00
|
|
|
|
|
|
|
for n, c in enumerate(columns, 2):
|
|
|
|
value = row[n]
|
2022-04-18 21:57:51 +00:00
|
|
|
if value not in ["-", "-\n"]:
|
2019-06-26 12:21:23 +00:00
|
|
|
parsed[c] = row[n]
|
|
|
|
# Copy c-ip to host to allow geoip upstream
|
2022-04-18 21:57:51 +00:00
|
|
|
if c == "c-ip":
|
|
|
|
event["host"] = row[n]
|
2019-06-26 12:21:23 +00:00
|
|
|
|
|
|
|
event[logs.tag] = parsed
|
2022-04-18 21:57:51 +00:00
|
|
|
event["source"] = source
|
2019-06-26 12:21:23 +00:00
|
|
|
|
|
|
|
logs.send(event)
|
2019-05-08 11:51:48 +00:00
|
|
|
|
|
|
|
logs.flush()
|
|
|
|
logs.info()
|