From 5ad49d8ab15b56f3f6af8b8e5dc0c0fad68a855d Mon Sep 17 00:00:00 2001 From: Dmitry Vasiliev Date: Sun, 17 May 2020 18:34:32 +0300 Subject: [PATCH] add env variables --- .github/workflows/go.yml | 2 +- README.md | 29 ++++++++++++---- cmd/config.go | 72 +++++++++++++++++++++++++++++++++++++--- cmd/config_test.go | 43 ++++++++++++++++++++++++ cmd/main.go | 5 ++- example.yaml | 2 +- merger/merge.go | 9 +++-- merger/merger.go | 18 +++++----- merger/merger_test.go | 18 ++++++---- 9 files changed, 165 insertions(+), 33 deletions(-) create mode 100644 cmd/config_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 43cbf2c..19cd70a 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -30,4 +30,4 @@ jobs: run: go build -v . - name: Test - run: go test -v . + run: go test ./... -v -race diff --git a/README.md b/README.md index a493943..b9dbdd5 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ Merges Prometheus metrics from multiple sources. ## But Why?! -> [prometheus/prometheus#3756](https://github.com/prometheus/prometheus/issues/3756) +Sometimes you need in Kubernetes to scrape Prometheus metrics from multiple containers in a single pod, +but you can't do this using annotations: [prometheus/prometheus#3756](https://github.com/prometheus/prometheus/issues/3756). To start the exporter: @@ -20,10 +21,20 @@ scrap_timeout: 20s sources: - url: http://127.0.0.1:8081/metrics labels: - key1: value1 + keyX: valueX + keyY: Y - url: http://127.0.0.1:8082/metrics labels: - key2: value2 + key2: Z +``` + +Another way to pass configuration by setting environment variables: + +```bash +export LISTEN=":8080" +export SCRAPE_TIMEOUT="20s" +export URL_1=http://127.0.0.1:801/api/v1/metrics/prometheus,keyX:valueX,keyY:Y +export URL_2=http://0.0.0.0:7070/api/v1/metrics/prometheus,key2:Z ``` ## Kubernetes @@ -44,8 +55,14 @@ By default, config must be available in the container by the path: `/config/prom ... - name: prometheus-exporter-merger image: vadv/prometheus-exporter-merger - volumeMounts: - - name: config - mountPath: /config + env: + - name: LISTEN + value: :8080 + - name: SCRAPE_TIMEOUT + value: 20s + - name: URL_COMMON + value: http://127.0.0.1:8081/api/v1/metrics/prometheus,type:common + - name: URL_AUDIT + value: http://127.0.0.1:8082/api/v1/metrics/prometheus,type:audit ... ``` diff --git a/cmd/config.go b/cmd/config.go index 9dfbad3..ed0ebf1 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,31 +1,93 @@ package cmd import ( + "fmt" "io/ioutil" + "os" + "strings" "time" + "github.com/pkg/errors" "gopkg.in/yaml.v2" ) +const ( + defaultListen = ":8080" + defaultScrapeTimeout = 15 * time.Second +) + type source struct { Url string `yaml:"url"` Labels map[string]string `yaml:"labels"` } type config struct { - Listen string `yaml:"listen"` - ScrapTimeout time.Duration `yaml:"scrap_timeout"` - Sources []*source `yaml:"sources"` + Listen string `yaml:"listen"` + ScrapeTimeout time.Duration `yaml:"scrape_timeout"` + Sources []*source `yaml:"sources"` } func parseConfig(filename string) (*config, error) { + _, err := os.Stat(filename) + if err == nil { + return parseConfigFromFile(filename) + } + if os.IsNotExist(err) { + return parseConfigFromEnv() + } + return nil, err +} + +func parseConfigFromFile(filename string) (*config, error) { data, err := ioutil.ReadFile(filename) if err != nil { return nil, err } result := &config{ - Listen: ":8080", - ScrapTimeout: 15 * time.Second, + Listen: defaultListen, + ScrapeTimeout: defaultScrapeTimeout, } return result, yaml.Unmarshal(data, result) } + +func parseConfigFromEnv() (*config, error) { + result := &config{ + Listen: defaultListen, + ScrapeTimeout: defaultScrapeTimeout, + } + if v := os.Getenv("LISTEN"); v != "" { + result.Listen = v + } + if v := os.Getenv("SCRAPE_TIMEOUT"); v != "" { + timeout, err := time.ParseDuration(v) + if err != nil { + return nil, errors.Wrap(err, "parse SCRAPE_TIMEOUT") + } + result.ScrapeTimeout = timeout + } + for _, env := range os.Environ() { + // URL_ONE=http://127.0.0.1:8080/metrics,k1:v1,k2:v2 + if strings.HasPrefix(env, "URL_") { + args := strings.Split(env, "=") + if len(args) != 2 { + return nil, fmt.Errorf("unable to parse env variable %s", env) + } + valuesArgs := strings.Split(args[1], ",") + s := &source{Url: valuesArgs[0], Labels: make(map[string]string)} + if len(valuesArgs) > 1 { + for i, v := range valuesArgs { + if i == 0 { + continue + } + labelArgs := strings.Split(v, ":") + if len(labelArgs) != 2 { + return nil, fmt.Errorf("unable to parse labels from env variable %s", env) + } + s.Labels[labelArgs[0]] = labelArgs[1] + } + } + result.Sources = append(result.Sources, s) + } + } + return result, nil +} diff --git a/cmd/config_test.go b/cmd/config_test.go new file mode 100644 index 0000000..c3c80b0 --- /dev/null +++ b/cmd/config_test.go @@ -0,0 +1,43 @@ +package cmd + +import ( + "os" + "testing" + "time" +) + +func Test_parseEnv(t *testing.T) { + os.Setenv("LISTEN", ":9090") + os.Setenv("SCRAPE_TIMEOUT", "120s") + os.Setenv("URL_8080", "http://127.0.0.1:8080/metrics,keyUrl1_1:valueUrl1_1,keyUrl1_2:valueUrl1_2") + os.Setenv("URL_8081", "http://127.0.0.1:8081/metrics,keyUrl2_1:valueUrl2_1") + os.Setenv("URL_8082", "http://127.0.0.1:8082/url3") + c, err := parseConfigFromEnv() + if err != nil { + t.Fatal(err) + } + if c.Listen != ":9090" { + t.Fatalf("listen: %s\n", c.Listen) + } + if c.ScrapeTimeout != 120*time.Second { + t.Fatalf("timeout: %s\n", c.ScrapeTimeout) + } + for _, s := range c.Sources { + switch s.Url { + case "http://127.0.0.1:8080/metrics": + if s.Labels[`keyUrl1_1`] != `valueUrl1_1` || s.Labels[`keyUrl1_2`] != `valueUrl1_2` { + t.Fatalf("labels: %v", s.Labels) + } + case "http://127.0.0.1:8081/metrics": + if s.Labels[`keyUrl2_1`] != `valueUrl2_1` { + t.Fatalf("labels: %v", s.Labels) + } + case "http://127.0.0.1:8082/url3": + if len(s.Labels) > 0 { + t.Fatalf("labels: %v", s.Labels) + } + default: + t.Fatalf("unknown url: %s", s.Url) + } + } +} diff --git a/cmd/main.go b/cmd/main.go index 7f8847b..ec877dc 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,6 +3,7 @@ package cmd import ( "context" "flag" + "log" "net/http" "os" "os/signal" @@ -24,17 +25,19 @@ func Execute() { panic(err) } - m := merger.New(c.ScrapTimeout) + m := merger.New(c.ScrapeTimeout) for _, s := range c.Sources { var labels []*prom.LabelPair for k, v := range s.Labels { k, v := k, v labels = append(labels, &prom.LabelPair{Name: &k, Value: &v}) } + log.Printf("[INFO] add url: %s with labels: %v\n", s.Url, s.Labels) m.AddSource(s.Url, labels) } srv := &http.Server{Addr: c.Listen, Handler: &handler{m: m}} + log.Printf("[INFO] starting listen %s\n", c.Listen) go srv.ListenAndServe() stop := make(chan os.Signal, 1) diff --git a/example.yaml b/example.yaml index 25a64a0..7a82f78 100644 --- a/example.yaml +++ b/example.yaml @@ -1,5 +1,5 @@ listen: :8080 -timeout: 20s +scrape_timeout: 20s sources: - url: http://127.0.0.1:8081/metrics labels: diff --git a/merger/merge.go b/merger/merge.go index 410de56..41cd668 100644 --- a/merger/merge.go +++ b/merger/merge.go @@ -35,10 +35,13 @@ func (m *merger) merge(ctx context.Context, w io.Writer) error { mu.Lock() defer mu.Unlock() for name, metricFamily := range out { - // append metrics - for _, metric := range metricFamily.Metric { - metric.Label = append(metric.Label, source.labels...) + // append labels + if len(source.labels) > 0 { + for _, metric := range metricFamily.Metric { + metric.Label = append(metric.Label, source.labels...) + } } + // append metrics if mfResult, ok := result[name]; ok { mfResult.Metric = append(mfResult.Metric, metricFamily.Metric...) } else { diff --git a/merger/merger.go b/merger/merger.go index c8710d5..6fcb2e4 100644 --- a/merger/merger.go +++ b/merger/merger.go @@ -16,10 +16,10 @@ type Merger interface { } type merger struct { - mu sync.Mutex - scrapTimeout time.Duration - client *http.Client - sources []*source + mu sync.Mutex + scrapeTimeout time.Duration + client *http.Client + sources []*source } type source struct { @@ -27,7 +27,7 @@ type source struct { labels []*prom.LabelPair } -func New(scrapTimeout time.Duration) Merger { +func New(scrapeTimeout time.Duration) Merger { client := &http.Client{ Transport: &http.Transport{ DisableKeepAlives: false, @@ -37,11 +37,11 @@ func New(scrapTimeout time.Duration) Merger { MaxConnsPerHost: 10, IdleConnTimeout: 5 * time.Minute, }, - Timeout: scrapTimeout, + Timeout: scrapeTimeout, } return &merger{ - scrapTimeout: scrapTimeout, - client: client, + scrapeTimeout: scrapeTimeout, + client: client, } } @@ -56,7 +56,7 @@ func (m *merger) AddSource(url string, labels []*prom.LabelPair) { func (m *merger) Merge(w io.Writer) error { m.mu.Lock() defer m.mu.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), m.scrapTimeout) + ctx, cancel := context.WithTimeout(context.Background(), m.scrapeTimeout) defer cancel() return m.merge(ctx, w) } diff --git a/merger/merger_test.go b/merger/merger_test.go index dd97ff5..98efbe1 100644 --- a/merger/merger_test.go +++ b/merger/merger_test.go @@ -5,6 +5,8 @@ import ( "fmt" "net" "net/http" + "sort" + "strings" "testing" "time" @@ -27,15 +29,15 @@ fluentbit_filter_add_records_total{name="kubernetes.0"} 0 1589716338417 fluentbit_filter_add_records_total{name="lua.1"} 0 1589716338417 fluentbit_filter_add_records_total{name="rewrite_tag.2"} 0 1589716338417 ` - result = `# TYPE fluentbit_filter_add_records_total counter -fluentbit_filter_add_records_total{name="lua.1",url="value2"} 0 1589716338417 -fluentbit_filter_add_records_total{name="rewrite_tag.2",url="value2"} 0 1589716338417 + result = ` +# TYPE fluentbit_filter_add_records_total counter fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338403 +fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338417 fluentbit_filter_add_records_total{name="lua.1",url="value1"} 0 1589716338403 +fluentbit_filter_add_records_total{name="lua.1",url="value2"} 0 1589716338417 fluentbit_filter_add_records_total{name="lua.2",url="value1"} 0 1589716338403 fluentbit_filter_add_records_total{name="lua.3",url="value1"} 0 1589716338403 -fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338417 -` +fluentbit_filter_add_records_total{name="rewrite_tag.2",url="value2"} 0 1589716338417` ) func Test_Merger(t *testing.T) { @@ -65,7 +67,9 @@ func Test_Merger(t *testing.T) { out := bytes.NewBuffer(make([]byte, 0)) m.Merge(out) - if out.String() != result { - t.Fatalf("get:\n%s\nexcept:\n%s\n", out.String(), result) + outStrs := strings.Split(out.String(), "\n") + sort.Strings(outStrs) + if result != strings.Join(outStrs, "\n") { + t.Fatalf("out:\n%s\n", strings.Join(outStrs, "\n")) } }