This commit is contained in:
Dmitry Vasiliev 2020-05-17 16:30:52 +03:00
commit b3b54aaae7
No known key found for this signature in database
GPG Key ID: C9A6FF8856B941E3
10 changed files with 397 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.idea

31
cmd/config.go Normal file
View File

@ -0,0 +1,31 @@
package main
import (
"io/ioutil"
"time"
"gopkg.in/yaml.v2"
)
type source struct {
Url string `yaml:"url"`
Labels map[string]string `yaml:"labels"`
}
type config struct {
Listen string `yaml:"listen"`
Timeout time.Duration `yaml:"timeout"`
Sources []*source `yaml:"sources"`
}
func parseConfig(filename string) (*config, error) {
data, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
result := &config{
Listen: ":8080",
Timeout: 15 * time.Second,
}
return result, yaml.Unmarshal(data, result)
}

25
cmd/handler.go Normal file
View File

@ -0,0 +1,25 @@
package main
import (
"log"
"net/http"
"github.com/vadv/prometheus-exporter-merger/merger"
)
type handler struct {
m merger.Merger
}
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/healthz":
w.WriteHeader(http.StatusOK)
default:
err := h.m.Merge(w)
if err != nil {
log.Printf("[ERROR] %s\n", err.Error())
w.WriteHeader(http.StatusInternalServerError)
}
}
}

49
cmd/main.go Normal file
View File

@ -0,0 +1,49 @@
package main
import (
"context"
"flag"
"net/http"
"os"
"os/signal"
"time"
prom "github.com/prometheus/client_model/go"
"github.com/vadv/prometheus-exporter-merger/merger"
)
func main() {
var (
configPath = flag.String("config", "config.yaml", "Path to config")
)
flag.Parse()
c, err := parseConfig(*configPath)
if err != nil {
panic(err)
}
m := merger.New(c.Timeout)
for _, s := range c.Sources {
var labels []*prom.LabelPair
for k, v := range s.Labels {
k, v := k, v
labels = append(labels, &prom.LabelPair{Name: &k, Value: &v})
}
m.AddSource(s.Url, labels)
}
srv := &http.Server{Addr: c.Listen, Handler: &handler{m: m}}
go srv.ListenAndServe()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
<-stop
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
panic(err)
}
}

9
example.yaml Normal file
View File

@ -0,0 +1,9 @@
listen: :8080
timeout: 20s
sources:
- url: http://127.0.0.1:8081/metrics
labels:
key1: value1
- url: http://127.0.0.1:8082/metrics
labels:
key2: value2

11
go.mod Normal file
View File

@ -0,0 +1,11 @@
module github.com/vadv/prometheus-exporter-merger
go 1.14
require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.10.0
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
gopkg.in/yaml.v2 v2.2.4
)

65
go.sum Normal file
View File

@ -0,0 +1,65 @@
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

73
merger/merge.go Normal file
View File

@ -0,0 +1,73 @@
package merger
import (
"context"
"fmt"
"io"
"sort"
"sync"
"github.com/pkg/errors"
prom "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"golang.org/x/sync/errgroup"
)
func (m *merger) merge(ctx context.Context, w io.Writer) error {
mu := &sync.Mutex{}
result := map[string]*prom.MetricFamily{}
g, ctx := errgroup.WithContext(ctx)
for _, source := range m.sources {
source := source
g.Go(func() error {
resp, err := m.client.Get(source.url)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("get url: %s", source.url))
}
defer resp.Body.Close()
tp := new(expfmt.TextParser)
out, err := tp.TextToMetricFamilies(resp.Body)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("parse url: %s", source.url))
}
mu.Lock()
defer mu.Unlock()
for name, metricFamily := range out {
// append metrics
for _, metric := range metricFamily.Metric {
metric.Label = append(metric.Label, source.labels...)
}
if mfResult, ok := result[name]; ok {
mfResult.Metric = append(mfResult.Metric, metricFamily.Metric...)
} else {
result[name] = metricFamily
}
}
return nil
})
}
// wait to process all routines
if err := g.Wait(); err != nil {
return err
}
// sort names
var names []string
for n := range result {
names = append(names, n)
}
sort.Strings(names)
// write result
enc := expfmt.NewEncoder(w, expfmt.FmtText)
for _, n := range names {
err := enc.Encode(result[n])
if err != nil {
return err
}
}
return nil
}

62
merger/merger.go Normal file
View File

@ -0,0 +1,62 @@
package merger
import (
"context"
"io"
"net/http"
"sync"
"time"
prom "github.com/prometheus/client_model/go"
)
type Merger interface {
Merge(w io.Writer) error
AddSource(url string, labels []*prom.LabelPair)
}
type merger struct {
mu sync.Mutex
timeout time.Duration
client *http.Client
sources []*source
}
type source struct {
url string
labels []*prom.LabelPair
}
func New(timeout time.Duration) Merger {
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: false,
DisableCompression: false,
MaxIdleConns: 1,
MaxIdleConnsPerHost: 1,
MaxConnsPerHost: 10,
IdleConnTimeout: 5 * time.Minute,
},
Timeout: timeout,
}
return &merger{
timeout: timeout,
client: client,
}
}
// AddSource new source
func (m *merger) AddSource(url string, labels []*prom.LabelPair) {
m.mu.Lock()
defer m.mu.Unlock()
m.sources = append(m.sources, &source{url: url, labels: labels})
}
// Merge sources
func (m *merger) Merge(w io.Writer) error {
m.mu.Lock()
defer m.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
defer cancel()
return m.merge(ctx, w)
}

71
merger/merger_test.go Normal file
View File

@ -0,0 +1,71 @@
package merger_test
import (
"bytes"
"fmt"
"net"
"net/http"
"testing"
"time"
prom "github.com/prometheus/client_model/go"
"github.com/vadv/prometheus-exporter-merger/merger"
)
const (
data1 = `
# TYPE fluentbit_filter_add_records_total counter
fluentbit_filter_add_records_total{name="kubernetes.0"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.1"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.2"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.3"} 0 1589716338403
fluentbit_filter_add_records_total{name="kubernetes.0"} 0 1589716338417
`
data2 = `
# TYPE fluentbit_filter_add_records_total counter
fluentbit_filter_add_records_total{name="lua.1"} 0 1589716338417
fluentbit_filter_add_records_total{name="rewrite_tag.2"} 0 1589716338417
`
result = `# TYPE fluentbit_filter_add_records_total counter
fluentbit_filter_add_records_total{name="lua.1",url="value2"} 0 1589716338417
fluentbit_filter_add_records_total{name="rewrite_tag.2",url="value2"} 0 1589716338417
fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.1",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.2",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="lua.3",url="value1"} 0 1589716338403
fluentbit_filter_add_records_total{name="kubernetes.0",url="value1"} 0 1589716338417
`
)
func Test_Merger(t *testing.T) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
http.HandleFunc("/data_1", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, data1)
})
http.HandleFunc("/data_2", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, data2)
})
go http.Serve(listener, nil)
urlPrefix := fmt.Sprintf("http://127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port)
url, value1, value2 := "url", "value1", "value2"
label1 := []*prom.LabelPair{{Name: &url, Value: &value1}}
label2 := []*prom.LabelPair{{Name: &url, Value: &value2}}
m := merger.New(time.Second)
m.AddSource(urlPrefix+"/data_1", label1)
m.AddSource(urlPrefix+"/data_2", label2)
out := bytes.NewBuffer(make([]byte, 0))
m.Merge(out)
if out.String() != result {
t.Fatalf("get:\n%s\nexcept:\n%s\n", out.String(), result)
}
}