#!/usr/bin/env python2.7 import os import logging import argparse import time import yaml from elasticsearch import Elasticsearch, RequestsHttpConnection logger = logging.getLogger(__name__) class elastalert_rules: def __init__(self, config='config.yaml', saveto=None): self._root = os.path.dirname(os.path.realpath(__file__)) fn = os.path.join(self._root, config) with open(fn, 'r') as stream: try: self._config = yaml.safe_load(stream) except yaml.YAMLError as e: logger.error("loading config file: {}".format(e)) if os.path.isabs(self._config.get('rules_folder')): self._rules = self._config.get('rules_folder') else: self._rules = os.path.join(self._root, self._config.get('rules_folder')) self._saveto = self._rules if saveto: if os.path.isabs(saveto): self._saveto = saveto else: self._saveto = os.path.join(self._root, saveto) self._index = self._config.get('rules_index') def _connect(self): host = os.getenv('ES_HOST', default=self._config.get('es_host')) port = int(os.getenv('ES_PORT', default=self._config.get('es_port'))) ssl = os.getenv('ES_USE_SSL', default=self._config.get('use_ssl')) verify = self._config.get('verify_certs', False) retry_on_status = tuple({502, 503, 504}) logger.debug("Connecting to {}:{} ssl:{}".format(host, port, ssl)) es = Elasticsearch(hosts=[host], port=port, use_ssl=ssl, verify_certs=verify, connection_class=RequestsHttpConnection, max_retries=5, retry_on_status=retry_on_status) return es def _load_rule(self, filename): rule = None try: with open(filename, 'r') as stream: rule = stream.read() stream.seek(0) yaml.safe_load(stream) logger.info("Loaded rule {}.".format(filename)) except Exception as e: logger.error("ERRRO loading rule: {} ({})".format(filename, e)) return rule def _save_rule(self, filename, rule): try: # verify the rule is valid YAML if isinstance(rule, str): yaml.safe_load(rule) with open(filename, 'w+') as outfile: outfile.write(rule) logger.info("Stored rule {}.".format(filename)) except Exception as e: logger.error("ERROR saving rule: {} ({})".format(filename, e)) return e def put(self): try: es = self._connect() rules = {} for r, d, f in os.walk(self._rules): for filename in f: rules[filename] = os.path.join(r, filename) loaded = {} for fn, fullpath in rules.iteritems(): loaded[fn] = self._load_rule(fullpath) # mark all as deleted update = {"query": {"match_all": {}}, "script": "ctx._source.active=false"} es.update_by_query(index=self._index, body=update, refresh=True) # index each record for fn, rule in loaded.iteritems(): doc = { "rule": rule, "active": True, "created": int(time.time()), "name": fn } es.index(index=self._index, doc_type='rules', body=doc) except Exception as e: logger.error("failed to get: {}".format(e)) return e def flush(self): es = self._connect() try: delete = {"query": {"match_all": {}}} es.delete_by_query(index=self._index, body=delete) except Exception as e: logger.error("failed to flush index: {}".format(e)) return e def mapping(self): es = self._connect() try: fn = os.path.join(self._root, "mapping.yaml") with open(fn, 'r') as stream: mapping = yaml.safe_load(stream) es.indices.create(self._index, body=mapping) except Exception as e: logger.error("failed to deploy mapping: {}".format(e)) return e def get(self): es = self._connect() try: if not os.path.exists(self._saveto): os.makedirs(self._saveto) for r, d, f in os.walk(self._saveto): for filename in f: os.remove(os.path.join(r, filename)) query = {"size": 1000, "query": {"bool": {"filter": [{"term": {"active": True}}]}}} result = es.search(index=self._index, body=query) for item in result['hits']['hits']: doc = item['_source'] fn = os.path.join(self._saveto, doc['name']) self._save_rule(fn, doc['rule']) except Exception as e: logger.error("failed to put: {}".format(e)) return e def main(): parser = argparse.ArgumentParser(description='Extends ElastAlert by storing its rules within Elasticsearch itself') parser.add_argument('command', choices=['get', 'put', 'mapping', 'flush']) parser.add_argument('--config', dest='config', default='config.yaml', help='Custom ElasticAlert config file') args = parser.parse_args() alert = elastalert_rules(config=args.config) command = getattr(alert, args.command) command() if __name__ == "__main__": ch = logging.StreamHandler() logging.getLogger(__name__).addHandler(ch) logging.getLogger(__name__).setLevel(logging.DEBUG) main()