From b3b54aaae7e62bf7d205f9e22df32dcc96f2c87d Mon Sep 17 00:00:00 2001 From: Dmitry Vasiliev Date: Sun, 17 May 2020 16:30:52 +0300 Subject: [PATCH] init --- .gitignore | 1 + cmd/config.go | 31 ++++++++++++++++++ cmd/handler.go | 25 +++++++++++++++ cmd/main.go | 49 +++++++++++++++++++++++++++++ example.yaml | 9 ++++++ go.mod | 11 +++++++ go.sum | 65 ++++++++++++++++++++++++++++++++++++++ merger/merge.go | 73 +++++++++++++++++++++++++++++++++++++++++++ merger/merger.go | 62 ++++++++++++++++++++++++++++++++++++ merger/merger_test.go | 71 +++++++++++++++++++++++++++++++++++++++++ 10 files changed, 397 insertions(+) create mode 100644 .gitignore create mode 100644 cmd/config.go create mode 100644 cmd/handler.go create mode 100644 cmd/main.go create mode 100644 example.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 merger/merge.go create mode 100644 merger/merger.go create mode 100644 merger/merger_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/cmd/config.go b/cmd/config.go new file mode 100644 index 0000000..3b20551 --- /dev/null +++ b/cmd/config.go @@ -0,0 +1,31 @@ +package main + +import ( + "io/ioutil" + "time" + + "gopkg.in/yaml.v2" +) + +type source struct { + Url string `yaml:"url"` + Labels map[string]string `yaml:"labels"` +} + +type config struct { + Listen string `yaml:"listen"` + Timeout time.Duration `yaml:"timeout"` + Sources []*source `yaml:"sources"` +} + +func parseConfig(filename string) (*config, error) { + data, err := ioutil.ReadFile(filename) + if err != nil { + return nil, err + } + result := &config{ + Listen: ":8080", + Timeout: 15 * time.Second, + } + return result, yaml.Unmarshal(data, result) +} diff --git a/cmd/handler.go b/cmd/handler.go new file mode 100644 index 0000000..1740f23 --- /dev/null +++ b/cmd/handler.go @@ -0,0 +1,25 @@ +package main + +import ( + "log" + "net/http" + + "github.com/vadv/prometheus-exporter-merger/merger" +) + +type handler struct { + m merger.Merger +} + +func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/healthz": + w.WriteHeader(http.StatusOK) + default: + err := h.m.Merge(w) + if err != nil { + log.Printf("[ERROR] %s\n", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + } + } +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..8edad66 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "flag" + "net/http" + "os" + "os/signal" + "time" + + prom "github.com/prometheus/client_model/go" + "github.com/vadv/prometheus-exporter-merger/merger" +) + +func main() { + + var ( + configPath = flag.String("config", "config.yaml", "Path to config") + ) + flag.Parse() + + c, err := parseConfig(*configPath) + if err != nil { + panic(err) + } + + m := merger.New(c.Timeout) + for _, s := range c.Sources { + var labels []*prom.LabelPair + for k, v := range s.Labels { + k, v := k, v + labels = append(labels, &prom.LabelPair{Name: &k, Value: &v}) + } + m.AddSource(s.Url, labels) + } + + srv := &http.Server{Addr: c.Listen, Handler: &handler{m: m}} + go srv.ListenAndServe() + + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt) + <-stop + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + panic(err) + } +} diff --git a/example.yaml b/example.yaml new file mode 100644 index 0000000..25a64a0 --- /dev/null +++ b/example.yaml @@ -0,0 +1,9 @@ +listen: :8080 +timeout: 20s +sources: + - url: http://127.0.0.1:8081/metrics + labels: + key1: value1 + - url: http://127.0.0.1:8082/metrics + labels: + key2: value2 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a7a7712 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/vadv/prometheus-exporter-merger + +go 1.14 + +require ( + github.com/pkg/errors v0.8.1 + github.com/prometheus/client_model v0.2.0 + github.com/prometheus/common v0.10.0 + golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 + gopkg.in/yaml.v2 v2.2.4 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..46c96e3 --- /dev/null +++ b/go.sum @@ -0,0 +1,65 @@ +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= +github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc= +github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/merger/merge.go b/merger/merge.go new file mode 100644 index 0000000..410de56 --- /dev/null +++ b/merger/merge.go @@ -0,0 +1,73 @@ +package merger + +import ( + "context" + "fmt" + "io" + "sort" + "sync" + + "github.com/pkg/errors" + prom "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "golang.org/x/sync/errgroup" +) + +func (m *merger) merge(ctx context.Context, w io.Writer) error { + + mu := &sync.Mutex{} + result := map[string]*prom.MetricFamily{} + + g, ctx := errgroup.WithContext(ctx) + for _, source := range m.sources { + source := source + g.Go(func() error { + resp, err := m.client.Get(source.url) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("get url: %s", source.url)) + } + defer resp.Body.Close() + tp := new(expfmt.TextParser) + out, err := tp.TextToMetricFamilies(resp.Body) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("parse url: %s", source.url)) + } + mu.Lock() + defer mu.Unlock() + for name, metricFamily := range out { + // append metrics + for _, metric := range metricFamily.Metric { + metric.Label = append(metric.Label, source.labels...) + } + if mfResult, ok := result[name]; ok { + mfResult.Metric = append(mfResult.Metric, metricFamily.Metric...) + } else { + result[name] = metricFamily + } + } + return nil + }) + } + + // wait to process all routines + if err := g.Wait(); err != nil { + return err + } + + // sort names + var names []string + for n := range result { + names = append(names, n) + } + sort.Strings(names) + + // write result + enc := expfmt.NewEncoder(w, expfmt.FmtText) + for _, n := range names { + err := enc.Encode(result[n]) + if err != nil { + return err + } + } + return nil +} diff --git a/merger/merger.go b/merger/merger.go new file mode 100644 index 0000000..60d0b55 --- /dev/null +++ b/merger/merger.go @@ -0,0 +1,62 @@ +package merger + +import ( + "context" + "io" + "net/http" + "sync" + "time" + + prom "github.com/prometheus/client_model/go" +) + +type Merger interface { + Merge(w io.Writer) error + AddSource(url string, labels []*prom.LabelPair) +} + +type merger struct { + mu sync.Mutex + timeout time.Duration + client *http.Client + sources []*source +} + +type source struct { + url string + labels []*prom.LabelPair +} + +func New(timeout time.Duration) Merger { + client := &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: false, + DisableCompression: false, + MaxIdleConns: 1, + MaxIdleConnsPerHost: 1, + MaxConnsPerHost: 10, + IdleConnTimeout: 5 * time.Minute, + }, + Timeout: timeout, + } + return &merger{ + timeout: timeout, + client: client, + } +} + +// AddSource new source +func (m *merger) AddSource(url string, labels []*prom.LabelPair) { + m.mu.Lock() + defer m.mu.Unlock() + m.sources = append(m.sources, &source{url: url, labels: labels}) +} + +// Merge sources +func (m *merger) Merge(w io.Writer) error { + m.mu.Lock() + defer m.mu.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + return m.merge(ctx, w) +} diff --git a/merger/merger_test.go b/merger/merger_test.go new file mode 100644 index 0000000..dd97ff5 --- /dev/null +++ b/merger/merger_test.go @@ -0,0 +1,71 @@ +package merger_test + +import ( + "bytes" + "fmt" + "net" + "net/http" + "testing" + "time" + + prom "github.com/prometheus/client_model/go" + + "github.com/vadv/prometheus-exporter-merger/merger" +) + +const ( + data1 = ` +# TYPE fluentbit_filter_add_records_total counter +fluentbit_filter_add_records_total{name="kubernetes.0"} 0 1589716338403 +fluentbit_filter_add_records_total{name="lua.1"} 0 1589716338403 +fluentbit_filter_add_records_total{name="lua.2"} 0 1589716338403 +fluentbit_filter_add_records_total{name="lua.3"} 0 1589716338403 +fluentbit_filter_add_records_total{name="kubernetes.0"} 0 1589716338417 +` + data2 = ` +# TYPE fluentbit_filter_add_records_total counter +fluentbit_filter_add_records_total{name="lua.1"} 0 1589716338417 +fluentbit_filter_add_records_total{name="rewrite_tag.2"} 0 1589716338417 +` + result = `# TYPE fluentbit_filter_add_records_total counter +fluentbit_filter_add_records_total{name="lua.1",url="value2"} 0 1589716338417 +fluentbit_filter_add_records_total{name="rewrite_tag.2",url="value2"} 0 1589716338417 +fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338403 +fluentbit_filter_add_records_total{name="lua.1",url="value1"} 0 1589716338403 +fluentbit_filter_add_records_total{name="lua.2",url="value1"} 0 1589716338403 +fluentbit_filter_add_records_total{name="lua.3",url="value1"} 0 1589716338403 +fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338417 +` +) + +func Test_Merger(t *testing.T) { + + listener, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + http.HandleFunc("/data_1", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, data1) + }) + http.HandleFunc("/data_2", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, data2) + }) + go http.Serve(listener, nil) + + urlPrefix := fmt.Sprintf("http://127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) + + url, value1, value2 := "url", "value1", "value2" + label1 := []*prom.LabelPair{{Name: &url, Value: &value1}} + label2 := []*prom.LabelPair{{Name: &url, Value: &value2}} + + m := merger.New(time.Second) + m.AddSource(urlPrefix+"/data_1", label1) + m.AddSource(urlPrefix+"/data_2", label2) + + out := bytes.NewBuffer(make([]byte, 0)) + m.Merge(out) + + if out.String() != result { + t.Fatalf("get:\n%s\nexcept:\n%s\n", out.String(), result) + } +}