288 lines
7.6 KiB
Go
288 lines
7.6 KiB
Go
package writer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/benbjohnson/clock"
|
|
"github.com/grafana/dataplane/sdata/numeric"
|
|
"github.com/m3db/prometheus_remote_client_golang/promremote"
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
)
|
|
|
|
const backendType = "prometheus"
|
|
|
|
const (
|
|
// Fixed error messages
|
|
MimirDuplicateTimestampError = "err-mimir-sample-duplicate-timestamp"
|
|
MimirInvalidLabelError = "err-mimir-label-invalid"
|
|
MimirLabelValueTooLongError = "err-mimir-label-value-too-long"
|
|
MimirMaxLabelNamesPerSeriesError = "err-mimir-max-label-names-per-series"
|
|
MimirMaxSeriesPerUserError = "err-mimir-max-series-per-user"
|
|
|
|
// Best effort error messages
|
|
PrometheusDuplicateTimestampError = "duplicate sample for timestamp"
|
|
)
|
|
|
|
var (
|
|
// Unexpected, 500-like write errors.
|
|
ErrUnexpectedWriteFailure = errors.New("failed to write time series")
|
|
// Expected, user-level write errors like trying to write an invalid series.
|
|
ErrRejectedWrite = errors.New("series was rejected")
|
|
ErrBadFrame = errors.New("failed to read dataframe")
|
|
)
|
|
|
|
var DuplicateTimestampErrors = [...]string{
|
|
MimirDuplicateTimestampError,
|
|
PrometheusDuplicateTimestampError,
|
|
}
|
|
|
|
// Metric represents a Prometheus time series metric.
|
|
type Metric struct {
|
|
T time.Time
|
|
V float64
|
|
}
|
|
|
|
// Point is a logical representation of a single point in time for a Prometheus time series.
|
|
type Point struct {
|
|
Name string
|
|
Labels map[string]string
|
|
Metric Metric
|
|
}
|
|
|
|
func PointsFromFrames(name string, t time.Time, frames data.Frames, extraLabels map[string]string) ([]Point, error) {
|
|
cr, err := numeric.CollectionReaderFromFrames(frames)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
col, err := cr.GetCollection(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
points := make([]Point, 0, len(col.Refs))
|
|
for _, ref := range col.Refs {
|
|
fp, empty, err := ref.NullableFloat64Value()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to read float64 value: %w", err)
|
|
}
|
|
if empty {
|
|
return nil, fmt.Errorf("empty frame")
|
|
}
|
|
if fp == nil {
|
|
return nil, fmt.Errorf("nil frame")
|
|
}
|
|
|
|
metric := Metric{
|
|
T: t,
|
|
V: *fp,
|
|
}
|
|
|
|
labels := ref.GetLabels().Copy()
|
|
if labels == nil {
|
|
labels = data.Labels{}
|
|
}
|
|
delete(labels, "__name__")
|
|
for k, v := range extraLabels {
|
|
labels[k] = v
|
|
}
|
|
|
|
points = append(points, Point{
|
|
Name: name,
|
|
Labels: labels,
|
|
Metric: metric,
|
|
})
|
|
}
|
|
|
|
return points, nil
|
|
}
|
|
|
|
type HttpClientProvider interface {
|
|
New(options ...httpclient.Options) (*http.Client, error)
|
|
}
|
|
|
|
type PrometheusWriter struct {
|
|
client promremote.Client
|
|
clock clock.Clock
|
|
logger log.Logger
|
|
metrics *metrics.RemoteWriter
|
|
}
|
|
|
|
func NewPrometheusWriter(
|
|
settings setting.RecordingRuleSettings,
|
|
httpClientProvider HttpClientProvider,
|
|
clock clock.Clock,
|
|
l log.Logger,
|
|
metrics *metrics.RemoteWriter,
|
|
) (*PrometheusWriter, error) {
|
|
if err := validateSettings(settings); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
headers := make(http.Header)
|
|
for k, v := range settings.CustomHeaders {
|
|
headers.Add(k, v)
|
|
}
|
|
|
|
cl, err := httpClientProvider.New(httpclient.Options{
|
|
BasicAuth: createAuthOpts(settings.BasicAuthUsername, settings.BasicAuthPassword),
|
|
Header: headers,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clientCfg := promremote.NewConfig(
|
|
promremote.UserAgent("grafana-recording-rule"),
|
|
promremote.WriteURLOption(settings.URL),
|
|
promremote.HTTPClientTimeoutOption(settings.Timeout),
|
|
promremote.HTTPClientOption(cl),
|
|
)
|
|
|
|
client, err := promremote.NewClient(clientCfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &PrometheusWriter{
|
|
client: client,
|
|
clock: clock,
|
|
logger: l,
|
|
metrics: metrics,
|
|
}, nil
|
|
}
|
|
|
|
func validateSettings(settings setting.RecordingRuleSettings) error {
|
|
if settings.BasicAuthUsername != "" && settings.BasicAuthPassword == "" {
|
|
return fmt.Errorf("basic auth password is required if username is set")
|
|
}
|
|
|
|
if _, err := url.Parse(settings.URL); err != nil {
|
|
return fmt.Errorf("invalid URL: %w", err)
|
|
}
|
|
|
|
if settings.Timeout <= 0 {
|
|
return fmt.Errorf("timeout must be greater than 0")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createAuthOpts(username, password string) *httpclient.BasicAuthOptions {
|
|
// If username is empty, do not use basic auth and ignore password.
|
|
if username == "" {
|
|
return nil
|
|
}
|
|
|
|
return &httpclient.BasicAuthOptions{
|
|
User: username,
|
|
Password: password,
|
|
}
|
|
}
|
|
|
|
// Write writes the given frames to the Prometheus remote write endpoint.
|
|
func (w PrometheusWriter) Write(ctx context.Context, name string, t time.Time, frames data.Frames, orgID int64, extraLabels map[string]string) error {
|
|
l := w.logger.FromContext(ctx)
|
|
lvs := []string{fmt.Sprint(orgID), backendType}
|
|
|
|
points, err := PointsFromFrames(name, t, frames, extraLabels)
|
|
if err != nil {
|
|
return errors.Join(ErrBadFrame, err)
|
|
}
|
|
|
|
series := make([]promremote.TimeSeries, 0, len(points))
|
|
for _, p := range points {
|
|
series = append(series, promremote.TimeSeries{
|
|
Labels: promremoteLabelsFromPoint(p),
|
|
Datapoint: promremote.Datapoint{
|
|
Timestamp: p.Metric.T,
|
|
Value: p.Metric.V,
|
|
},
|
|
})
|
|
}
|
|
|
|
l.Debug("Writing metric", "name", name)
|
|
writeStart := w.clock.Now()
|
|
res, writeErr := w.client.WriteTimeSeries(ctx, series, promremote.WriteOptions{})
|
|
w.metrics.WriteDuration.WithLabelValues(lvs...).Observe(w.clock.Now().Sub(writeStart).Seconds())
|
|
|
|
lvs = append(lvs, fmt.Sprint(res.StatusCode))
|
|
w.metrics.WritesTotal.WithLabelValues(lvs...).Inc()
|
|
|
|
if writeErr != nil {
|
|
if err, ignored := checkWriteError(writeErr); err != nil {
|
|
return err
|
|
} else if ignored {
|
|
l.Debug("Ignored write error", "error", err, "status_code", res.StatusCode)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func promremoteLabelsFromPoint(point Point) []promremote.Label {
|
|
labels := make([]promremote.Label, 0, len(point.Labels))
|
|
labels = append(labels, promremote.Label{
|
|
Name: "__name__",
|
|
Value: point.Name,
|
|
})
|
|
for k, v := range point.Labels {
|
|
labels = append(labels, promremote.Label{
|
|
Name: k,
|
|
Value: v,
|
|
})
|
|
}
|
|
return labels
|
|
}
|
|
|
|
func checkWriteError(writeErr promremote.WriteError) (err error, ignored bool) {
|
|
if writeErr == nil {
|
|
return nil, false
|
|
}
|
|
|
|
// All 500-range statuses are automatically unexpected and not the fault of the data.
|
|
if writeErr.StatusCode()/100 == 5 {
|
|
return errors.Join(ErrUnexpectedWriteFailure, writeErr), false
|
|
}
|
|
|
|
// Special case for 400 status code. 400s may be ignorable in the event of HA writers, or the fault of the written data.
|
|
if writeErr.StatusCode() == 400 {
|
|
msg := writeErr.Error()
|
|
// HA may potentially write different values for the same timestamp, so we ignore this error
|
|
// TODO: this may not be needed, further testing needed
|
|
for _, e := range DuplicateTimestampErrors {
|
|
if strings.Contains(msg, e) {
|
|
return nil, true
|
|
}
|
|
}
|
|
|
|
// Check for expected user errors.
|
|
switch {
|
|
case strings.Contains(msg, MimirInvalidLabelError),
|
|
strings.Contains(msg, MimirMaxSeriesPerUserError),
|
|
strings.Contains(msg, MimirMaxLabelNamesPerSeriesError),
|
|
strings.Contains(msg, MimirLabelValueTooLongError):
|
|
return errors.Join(ErrRejectedWrite, writeErr), false
|
|
}
|
|
|
|
// For now, all 400s that are not previously known are considered unexpected.
|
|
// TODO: Consider blanket-converting all 400s to be known errors. This should only be done once we are confident this is not a problem with this client.
|
|
return errors.Join(ErrUnexpectedWriteFailure, writeErr), false
|
|
}
|
|
|
|
// All other errors which do not fit into the above categories are also unexpected.
|
|
return errors.Join(ErrUnexpectedWriteFailure, writeErr), false
|
|
}
|