74 lines
1.5 KiB
Go
74 lines
1.5 KiB
Go
|
package merger
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"sort"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/pkg/errors"
|
||
|
prom "github.com/prometheus/client_model/go"
|
||
|
"github.com/prometheus/common/expfmt"
|
||
|
"golang.org/x/sync/errgroup"
|
||
|
)
|
||
|
|
||
|
func (m *merger) merge(ctx context.Context, w io.Writer) error {
|
||
|
|
||
|
mu := &sync.Mutex{}
|
||
|
result := map[string]*prom.MetricFamily{}
|
||
|
|
||
|
g, ctx := errgroup.WithContext(ctx)
|
||
|
for _, source := range m.sources {
|
||
|
source := source
|
||
|
g.Go(func() error {
|
||
|
resp, err := m.client.Get(source.url)
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, fmt.Sprintf("get url: %s", source.url))
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
tp := new(expfmt.TextParser)
|
||
|
out, err := tp.TextToMetricFamilies(resp.Body)
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, fmt.Sprintf("parse url: %s", source.url))
|
||
|
}
|
||
|
mu.Lock()
|
||
|
defer mu.Unlock()
|
||
|
for name, metricFamily := range out {
|
||
|
// append metrics
|
||
|
for _, metric := range metricFamily.Metric {
|
||
|
metric.Label = append(metric.Label, source.labels...)
|
||
|
}
|
||
|
if mfResult, ok := result[name]; ok {
|
||
|
mfResult.Metric = append(mfResult.Metric, metricFamily.Metric...)
|
||
|
} else {
|
||
|
result[name] = metricFamily
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// wait to process all routines
|
||
|
if err := g.Wait(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// sort names
|
||
|
var names []string
|
||
|
for n := range result {
|
||
|
names = append(names, n)
|
||
|
}
|
||
|
sort.Strings(names)
|
||
|
|
||
|
// write result
|
||
|
enc := expfmt.NewEncoder(w, expfmt.FmtText)
|
||
|
for _, n := range names {
|
||
|
err := enc.Encode(result[n])
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|