162 lines
5.6 KiB
Python
Executable File
162 lines
5.6 KiB
Python
Executable File
#!/usr/bin/env python2.7
|
|
import os
|
|
import logging
|
|
import argparse
|
|
import time
|
|
import yaml
|
|
|
|
from elasticsearch import Elasticsearch, RequestsHttpConnection
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class elastalert_rules:
|
|
def __init__(self, config='config.yaml', saveto=None):
|
|
self._root = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
fn = os.path.join(self._root, config)
|
|
with open(fn, 'r') as stream:
|
|
try:
|
|
self._config = yaml.safe_load(stream)
|
|
except yaml.YAMLError as e:
|
|
logger.error("loading config file: {}".format(e))
|
|
|
|
if os.path.isabs(self._config.get('rules_folder')):
|
|
self._rules = self._config.get('rules_folder')
|
|
else:
|
|
self._rules = os.path.join(self._root, self._config.get('rules_folder'))
|
|
|
|
self._saveto = self._rules
|
|
if saveto:
|
|
if os.path.isabs(saveto):
|
|
self._saveto = saveto
|
|
else:
|
|
self._saveto = os.path.join(self._root, saveto)
|
|
self._index = self._config.get('rules_index')
|
|
|
|
def _connect(self):
|
|
host = os.getenv('ES_HOST', default=self._config.get('es_host'))
|
|
port = int(os.getenv('ES_PORT', default=self._config.get('es_port')))
|
|
ssl = os.getenv('ES_USE_SSL', default=self._config.get('use_ssl'))
|
|
verify = self._config.get('verify_certs', False)
|
|
retry_on_status = tuple({502, 503, 504})
|
|
|
|
logger.debug("Connecting to {}:{} ssl:{}".format(host, port, ssl))
|
|
es = Elasticsearch(hosts=[host], port=port, use_ssl=ssl, verify_certs=verify,
|
|
connection_class=RequestsHttpConnection,
|
|
max_retries=5, retry_on_status=retry_on_status)
|
|
|
|
return es
|
|
|
|
def _load_rule(self, filename):
|
|
rule = None
|
|
try:
|
|
with open(filename, 'r') as stream:
|
|
rule = stream.read()
|
|
stream.seek(0)
|
|
yaml.safe_load(stream)
|
|
logger.info("Loaded rule {}.".format(filename))
|
|
except Exception as e:
|
|
logger.error("ERRRO loading rule: {} ({})".format(filename, e))
|
|
return rule
|
|
|
|
def _save_rule(self, filename, rule):
|
|
try:
|
|
# verify the rule is valid YAML
|
|
if isinstance(rule, str):
|
|
yaml.safe_load(rule)
|
|
|
|
with open(filename, 'w+') as outfile:
|
|
outfile.write(rule)
|
|
logger.info("Stored rule {}.".format(filename))
|
|
except Exception as e:
|
|
logger.error("ERROR saving rule: {} ({})".format(filename, e))
|
|
return e
|
|
|
|
def put(self):
|
|
try:
|
|
es = self._connect()
|
|
rules = {}
|
|
for r, d, f in os.walk(self._rules):
|
|
for filename in f:
|
|
rules[filename] = os.path.join(r, filename)
|
|
|
|
loaded = {}
|
|
for fn, fullpath in rules.iteritems():
|
|
loaded[fn] = self._load_rule(fullpath)
|
|
|
|
# mark all as deleted
|
|
update = {"query": {"match_all": {}}, "script": "ctx._source.active=false"}
|
|
es.update_by_query(index=self._index, body=update, refresh=True)
|
|
|
|
# index each record
|
|
for fn, rule in loaded.iteritems():
|
|
doc = {
|
|
"rule": rule,
|
|
"active": True,
|
|
"created": int(time.time()),
|
|
"name": fn
|
|
}
|
|
es.index(index=self._index, doc_type='rules', body=doc)
|
|
|
|
except Exception as e:
|
|
logger.error("failed to get: {}".format(e))
|
|
return e
|
|
|
|
def flush(self):
|
|
es = self._connect()
|
|
try:
|
|
delete = {"query": {"match_all": {}}}
|
|
es.delete_by_query(index=self._index, body=delete)
|
|
except Exception as e:
|
|
logger.error("failed to flush index: {}".format(e))
|
|
return e
|
|
|
|
def mapping(self):
|
|
es = self._connect()
|
|
try:
|
|
fn = os.path.join(self._root, "mapping.yaml")
|
|
with open(fn, 'r') as stream:
|
|
mapping = yaml.safe_load(stream)
|
|
es.indices.create(self._index, body=mapping)
|
|
except Exception as e:
|
|
logger.error("failed to deploy mapping: {}".format(e))
|
|
return e
|
|
|
|
def get(self):
|
|
es = self._connect()
|
|
try:
|
|
if not os.path.exists(self._saveto):
|
|
os.makedirs(self._saveto)
|
|
for r, d, f in os.walk(self._saveto):
|
|
for filename in f:
|
|
os.remove(os.path.join(r, filename))
|
|
query = {"size": 1000, "query": {"bool": {"filter": [{"term": {"active": True}}]}}}
|
|
result = es.search(index=self._index, body=query)
|
|
for item in result['hits']['hits']:
|
|
doc = item['_source']
|
|
fn = os.path.join(self._saveto, doc['name'])
|
|
self._save_rule(fn, doc['rule'])
|
|
except Exception as e:
|
|
logger.error("failed to put: {}".format(e))
|
|
return e
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Extends ElastAlert by storing its rules within Elasticsearch itself')
|
|
parser.add_argument('command', choices=['get', 'put', 'mapping', 'flush'])
|
|
parser.add_argument('--config', dest='config', default='config.yaml', help='Custom ElasticAlert config file')
|
|
args = parser.parse_args()
|
|
|
|
alert = elastalert_rules(config=args.config)
|
|
|
|
command = getattr(alert, args.command)
|
|
command()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
ch = logging.StreamHandler()
|
|
logging.getLogger(__name__).addHandler(ch)
|
|
logging.getLogger(__name__).setLevel(logging.DEBUG)
|
|
|
|
main()
|