add env variables

This commit is contained in:
Dmitry Vasiliev 2020-05-17 18:34:32 +03:00
parent 98d624f868
commit 5ad49d8ab1
No known key found for this signature in database
GPG Key ID: C9A6FF8856B941E3
9 changed files with 165 additions and 33 deletions

View File

@ -30,4 +30,4 @@ jobs:
run: go build -v .
- name: Test
run: go test -v .
run: go test ./... -v -race

View File

@ -4,7 +4,8 @@ Merges Prometheus metrics from multiple sources.
## But Why?!
> [prometheus/prometheus#3756](https://github.com/prometheus/prometheus/issues/3756)
Sometimes you need in Kubernetes to scrape Prometheus metrics from multiple containers in a single pod,
but you can't do this using annotations: [prometheus/prometheus#3756](https://github.com/prometheus/prometheus/issues/3756).
To start the exporter:
@ -20,10 +21,20 @@ scrap_timeout: 20s
sources:
- url: http://127.0.0.1:8081/metrics
labels:
key1: value1
keyX: valueX
keyY: Y
- url: http://127.0.0.1:8082/metrics
labels:
key2: value2
key2: Z
```
Another way to pass configuration by setting environment variables:
```bash
export LISTEN=":8080"
export SCRAPE_TIMEOUT="20s"
export URL_1=http://127.0.0.1:801/api/v1/metrics/prometheus,keyX:valueX,keyY:Y
export URL_2=http://0.0.0.0:7070/api/v1/metrics/prometheus,key2:Z
```
## Kubernetes
@ -44,8 +55,14 @@ By default, config must be available in the container by the path: `/config/prom
...
- name: prometheus-exporter-merger
image: vadv/prometheus-exporter-merger
volumeMounts:
- name: config
mountPath: /config
env:
- name: LISTEN
value: :8080
- name: SCRAPE_TIMEOUT
value: 20s
- name: URL_COMMON
value: http://127.0.0.1:8081/api/v1/metrics/prometheus,type:common
- name: URL_AUDIT
value: http://127.0.0.1:8082/api/v1/metrics/prometheus,type:audit
...
```

View File

@ -1,31 +1,93 @@
package cmd
import (
"fmt"
"io/ioutil"
"os"
"strings"
"time"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
const (
defaultListen = ":8080"
defaultScrapeTimeout = 15 * time.Second
)
type source struct {
Url string `yaml:"url"`
Labels map[string]string `yaml:"labels"`
}
type config struct {
Listen string `yaml:"listen"`
ScrapTimeout time.Duration `yaml:"scrap_timeout"`
Sources []*source `yaml:"sources"`
Listen string `yaml:"listen"`
ScrapeTimeout time.Duration `yaml:"scrape_timeout"`
Sources []*source `yaml:"sources"`
}
func parseConfig(filename string) (*config, error) {
_, err := os.Stat(filename)
if err == nil {
return parseConfigFromFile(filename)
}
if os.IsNotExist(err) {
return parseConfigFromEnv()
}
return nil, err
}
func parseConfigFromFile(filename string) (*config, error) {
data, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
result := &config{
Listen: ":8080",
ScrapTimeout: 15 * time.Second,
Listen: defaultListen,
ScrapeTimeout: defaultScrapeTimeout,
}
return result, yaml.Unmarshal(data, result)
}
func parseConfigFromEnv() (*config, error) {
result := &config{
Listen: defaultListen,
ScrapeTimeout: defaultScrapeTimeout,
}
if v := os.Getenv("LISTEN"); v != "" {
result.Listen = v
}
if v := os.Getenv("SCRAPE_TIMEOUT"); v != "" {
timeout, err := time.ParseDuration(v)
if err != nil {
return nil, errors.Wrap(err, "parse SCRAPE_TIMEOUT")
}
result.ScrapeTimeout = timeout
}
for _, env := range os.Environ() {
// URL_ONE=http://127.0.0.1:8080/metrics,k1:v1,k2:v2
if strings.HasPrefix(env, "URL_") {
args := strings.Split(env, "=")
if len(args) != 2 {
return nil, fmt.Errorf("unable to parse env variable %s", env)
}
valuesArgs := strings.Split(args[1], ",")
s := &source{Url: valuesArgs[0], Labels: make(map[string]string)}
if len(valuesArgs) > 1 {
for i, v := range valuesArgs {
if i == 0 {
continue
}
labelArgs := strings.Split(v, ":")
if len(labelArgs) != 2 {
return nil, fmt.Errorf("unable to parse labels from env variable %s", env)
}
s.Labels[labelArgs[0]] = labelArgs[1]
}
}
result.Sources = append(result.Sources, s)
}
}
return result, nil
}

43
cmd/config_test.go Normal file
View File

@ -0,0 +1,43 @@
package cmd
import (
"os"
"testing"
"time"
)
func Test_parseEnv(t *testing.T) {
os.Setenv("LISTEN", ":9090")
os.Setenv("SCRAPE_TIMEOUT", "120s")
os.Setenv("URL_8080", "http://127.0.0.1:8080/metrics,keyUrl1_1:valueUrl1_1,keyUrl1_2:valueUrl1_2")
os.Setenv("URL_8081", "http://127.0.0.1:8081/metrics,keyUrl2_1:valueUrl2_1")
os.Setenv("URL_8082", "http://127.0.0.1:8082/url3")
c, err := parseConfigFromEnv()
if err != nil {
t.Fatal(err)
}
if c.Listen != ":9090" {
t.Fatalf("listen: %s\n", c.Listen)
}
if c.ScrapeTimeout != 120*time.Second {
t.Fatalf("timeout: %s\n", c.ScrapeTimeout)
}
for _, s := range c.Sources {
switch s.Url {
case "http://127.0.0.1:8080/metrics":
if s.Labels[`keyUrl1_1`] != `valueUrl1_1` || s.Labels[`keyUrl1_2`] != `valueUrl1_2` {
t.Fatalf("labels: %v", s.Labels)
}
case "http://127.0.0.1:8081/metrics":
if s.Labels[`keyUrl2_1`] != `valueUrl2_1` {
t.Fatalf("labels: %v", s.Labels)
}
case "http://127.0.0.1:8082/url3":
if len(s.Labels) > 0 {
t.Fatalf("labels: %v", s.Labels)
}
default:
t.Fatalf("unknown url: %s", s.Url)
}
}
}

View File

@ -3,6 +3,7 @@ package cmd
import (
"context"
"flag"
"log"
"net/http"
"os"
"os/signal"
@ -24,17 +25,19 @@ func Execute() {
panic(err)
}
m := merger.New(c.ScrapTimeout)
m := merger.New(c.ScrapeTimeout)
for _, s := range c.Sources {
var labels []*prom.LabelPair
for k, v := range s.Labels {
k, v := k, v
labels = append(labels, &prom.LabelPair{Name: &k, Value: &v})
}
log.Printf("[INFO] add url: %s with labels: %v\n", s.Url, s.Labels)
m.AddSource(s.Url, labels)
}
srv := &http.Server{Addr: c.Listen, Handler: &handler{m: m}}
log.Printf("[INFO] starting listen %s\n", c.Listen)
go srv.ListenAndServe()
stop := make(chan os.Signal, 1)

View File

@ -1,5 +1,5 @@
listen: :8080
timeout: 20s
scrape_timeout: 20s
sources:
- url: http://127.0.0.1:8081/metrics
labels:

View File

@ -35,10 +35,13 @@ func (m *merger) merge(ctx context.Context, w io.Writer) error {
mu.Lock()
defer mu.Unlock()
for name, metricFamily := range out {
// append metrics
for _, metric := range metricFamily.Metric {
metric.Label = append(metric.Label, source.labels...)
// append labels
if len(source.labels) > 0 {
for _, metric := range metricFamily.Metric {
metric.Label = append(metric.Label, source.labels...)
}
}
// append metrics
if mfResult, ok := result[name]; ok {
mfResult.Metric = append(mfResult.Metric, metricFamily.Metric...)
} else {

View File

@ -16,10 +16,10 @@ type Merger interface {
}
type merger struct {
mu sync.Mutex
scrapTimeout time.Duration
client *http.Client
sources []*source
mu sync.Mutex
scrapeTimeout time.Duration
client *http.Client
sources []*source
}
type source struct {
@ -27,7 +27,7 @@ type source struct {
labels []*prom.LabelPair
}
func New(scrapTimeout time.Duration) Merger {
func New(scrapeTimeout time.Duration) Merger {
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: false,
@ -37,11 +37,11 @@ func New(scrapTimeout time.Duration) Merger {
MaxConnsPerHost: 10,
IdleConnTimeout: 5 * time.Minute,
},
Timeout: scrapTimeout,
Timeout: scrapeTimeout,
}
return &merger{
scrapTimeout: scrapTimeout,
client: client,
scrapeTimeout: scrapeTimeout,
client: client,
}
}
@ -56,7 +56,7 @@ func (m *merger) AddSource(url string, labels []*prom.LabelPair) {
func (m *merger) Merge(w io.Writer) error {
m.mu.Lock()
defer m.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), m.scrapTimeout)
ctx, cancel := context.WithTimeout(context.Background(), m.scrapeTimeout)
defer cancel()
return m.merge(ctx, w)
}

View File

@ -5,6 +5,8 @@ import (
"fmt"
"net"
"net/http"
"sort"
"strings"
"testing"
"time"
@ -27,15 +29,15 @@ fluentbit_filter_add_records_total{name="kubernetes.0"} 0 1589716338417
fluentbit_filter_add_records_total{name="lua.1"} 0 1589716338417
fluentbit_filter_add_records_total{name="rewrite_tag.2"} 0 1589716338417
`
result = `# TYPE fluentbit_filter_add_records_total counter
fluentbit_filter_add_records_total{name="lua.1",url="value2"} 0 1589716338417
fluentbit_filter_add_records_total{name="rewrite_tag.2",url="value2"} 0 1589716338417
result = `
# TYPE fluentbit_filter_add_records_total counter
fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338417
fluentbit_filter_add_records_total{name="lua.1",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.1",url="value2"} 0 1589716338417
fluentbit_filter_add_records_total{name="lua.2",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.3",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338417
`
fluentbit_filter_add_records_total{name="rewrite_tag.2",url="value2"} 0 1589716338417`
)
func Test_Merger(t *testing.T) {
@ -65,7 +67,9 @@ func Test_Merger(t *testing.T) {
out := bytes.NewBuffer(make([]byte, 0))
m.Merge(out)
if out.String() != result {
t.Fatalf("get:\n%s\nexcept:\n%s\n", out.String(), result)
outStrs := strings.Split(out.String(), "\n")
sort.Strings(outStrs)
if result != strings.Join(outStrs, "\n") {
t.Fatalf("out:\n%s\n", strings.Join(outStrs, "\n"))
}
}