elastalert-lambda/elastalert_rules.py

162 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python2.7
import os
import logging
import argparse
import time
import yaml
from elasticsearch import Elasticsearch, RequestsHttpConnection
logger = logging.getLogger(__name__)
class elastalert_rules:
def __init__(self, config='config.yaml', saveto=None):
self._root = os.path.dirname(os.path.realpath(__file__))
fn = os.path.join(self._root, config)
with open(fn, 'r') as stream:
try:
self._config = yaml.safe_load(stream)
except yaml.YAMLError as e:
logger.error("loading config file: {}".format(e))
if os.path.isabs(self._config.get('rules_folder')):
self._rules = self._config.get('rules_folder')
else:
self._rules = os.path.join(self._root, self._config.get('rules_folder'))
self._saveto = self._rules
if saveto:
if os.path.isabs(saveto):
self._saveto = saveto
else:
self._saveto = os.path.join(self._root, saveto)
self._index = self._config.get('rules_index')
def _connect(self):
host = os.getenv('ES_HOST', default=self._config.get('es_host'))
port = int(os.getenv('ES_PORT', default=self._config.get('es_port')))
ssl = os.getenv('ES_USE_SSL', default=self._config.get('use_ssl'))
verify = self._config.get('verify_certs', False)
retry_on_status = tuple({502, 503, 504})
logger.debug("Connecting to {}:{} ssl:{}".format(host, port, ssl))
es = Elasticsearch(hosts=[host], port=port, use_ssl=ssl, verify_certs=verify,
connection_class=RequestsHttpConnection,
max_retries=5, retry_on_status=retry_on_status)
return es
def _load_rule(self, filename):
rule = None
try:
with open(filename, 'r') as stream:
rule = stream.read()
stream.seek(0)
yaml.safe_load(stream)
logger.info("Loaded rule {}.".format(filename))
except Exception as e:
logger.error("ERRRO loading rule: {} ({})".format(filename, e))
return rule
def _save_rule(self, filename, rule):
try:
# verify the rule is valid YAML
if isinstance(rule, str):
yaml.safe_load(rule)
with open(filename, 'w+') as outfile:
outfile.write(rule)
logger.info("Stored rule {}.".format(filename))
except Exception as e:
logger.error("ERROR saving rule: {} ({})".format(filename, e))
return e
def put(self):
try:
es = self._connect()
rules = {}
for r, d, f in os.walk(self._rules):
for filename in f:
rules[filename] = os.path.join(r, filename)
loaded = {}
for fn, fullpath in rules.iteritems():
loaded[fn] = self._load_rule(fullpath)
# mark all as deleted
update = {"query": {"match_all": {}}, "script": "ctx._source.active=false"}
es.update_by_query(index=self._index, body=update, refresh=True)
# index each record
for fn, rule in loaded.iteritems():
doc = {
"rule": rule,
"active": True,
"created": int(time.time()),
"name": fn
}
es.index(index=self._index, doc_type='rules', body=doc)
except Exception as e:
logger.error("failed to get: {}".format(e))
return e
def flush(self):
es = self._connect()
try:
delete = {"query": {"match_all": {}}}
es.delete_by_query(index=self._index, body=delete)
except Exception as e:
logger.error("failed to flush index: {}".format(e))
return e
def mapping(self):
es = self._connect()
try:
fn = os.path.join(self._root, "mapping.yaml")
with open(fn, 'r') as stream:
mapping = yaml.safe_load(stream)
es.indices.create(self._index, body=mapping)
except Exception as e:
logger.error("failed to deploy mapping: {}".format(e))
return e
def get(self):
es = self._connect()
try:
if not os.path.exists(self._saveto):
os.makedirs(self._saveto)
for r, d, f in os.walk(self._saveto):
for filename in f:
os.remove(os.path.join(r, filename))
query = {"size": 1000, "query": {"bool": {"filter": [{"term": {"active": True}}]}}}
result = es.search(index=self._index, body=query)
for item in result['hits']['hits']:
doc = item['_source']
fn = os.path.join(self._saveto, doc['name'])
self._save_rule(fn, doc['rule'])
except Exception as e:
logger.error("failed to put: {}".format(e))
return e
def main():
parser = argparse.ArgumentParser(description='Extends ElastAlert by storing its rules within Elasticsearch itself')
parser.add_argument('command', choices=['get', 'put', 'mapping', 'flush'])
parser.add_argument('--config', dest='config', default='config.yaml', help='Custom ElasticAlert config file')
args = parser.parse_args()
alert = elastalert_rules(config=args.config)
command = getattr(alert, args.command)
command()
if __name__ == "__main__":
ch = logging.StreamHandler()
logging.getLogger(__name__).addHandler(ch)
logging.getLogger(__name__).setLevel(logging.DEBUG)
main()