Files
cloudpods/pkg/monitor/models/datasource.go
2025-09-05 18:26:33 +08:00

889 lines
26 KiB
Go

// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"bytes"
"context"
"fmt"
"math"
"regexp"
"sort"
"strings"
"time"
"github.com/influxdata/promql/v2/pkg/labels"
"github.com/zexi/influxql-to-metricsql/converter/translator"
"golang.org/x/sync/errgroup"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/tristate"
"yunion.io/x/pkg/util/sets"
"yunion.io/x/pkg/utils"
"yunion.io/x/onecloud/pkg/apis/monitor"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/mcclient/auth"
"yunion.io/x/onecloud/pkg/monitor/datasource"
merrors "yunion.io/x/onecloud/pkg/monitor/errors"
"yunion.io/x/onecloud/pkg/monitor/tsdb"
"yunion.io/x/onecloud/pkg/monitor/validators"
"yunion.io/x/onecloud/pkg/util/influxdb"
"yunion.io/x/onecloud/pkg/util/stringutils2"
)
const (
VICTORIA_METRICS_DB_TAG_KEY = "db"
VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF = "telegraf"
)
var (
DataSourceManager *SDataSourceManager
compile = regexp.MustCompile(`\w{8}(-\w{4}){3}-\w{12}`)
)
func init() {
DataSourceManager = &SDataSourceManager{
SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
SDataSource{},
"datasources_tbl",
"datasource",
"datasources",
),
}
DataSourceManager.SetVirtualObject(DataSourceManager)
}
// +onecloud:swagger-gen-model-singular=datasource
// +onecloud:swagger-gen-model-plural=datasources
type SDataSourceManager struct {
db.SStandaloneResourceBaseManager
}
type SDataSource struct {
db.SStandaloneResourceBase
Type string `nullable:"false" list:"user"`
Url string `nullable:"false" list:"user"`
User string `width:"64" charset:"utf8" nullable:"true"`
Password string `width:"64" charset:"utf8" nullable:"true"`
Database string `width:"64" charset:"utf8" nullable:"true"`
IsDefault tristate.TriState `default:"false" create:"optional"`
/*
TimeInterval string
BasicAuth bool
BasicAuthUser string
BasicAuthPassword string
*/
}
func (m *SDataSourceManager) GetSource(id string) (*SDataSource, error) {
ret, err := m.FetchById(id)
if err != nil {
return nil, err
}
return ret.(*SDataSource), nil
}
func (m *SDataSourceManager) GetDatabases() (jsonutils.JSONObject, error) {
ret := jsonutils.NewDict()
dataSource, err := datasource.GetDefaultSource("")
if err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
}
db := influxdb.NewInfluxdb(dataSource.Url)
//db.SetDatabase("telegraf")
databases, err := db.GetDatabases()
if err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "GetDatabases")
}
ret.Add(jsonutils.NewStringArray(databases), "databases")
return ret, nil
}
func (m *SDataSourceManager) GetMeasurements(query jsonutils.JSONObject,
measurementFilter, tagFilter string) (jsonutils.JSONObject,
error) {
ret := jsonutils.NewDict()
measurements, err := m.getMeasurementQueryInfluxdb(query, measurementFilter, tagFilter)
if err != nil {
return jsonutils.JSONNull, err
}
ret.Add(jsonutils.Marshal(&measurements), "measurements")
return ret, nil
}
func (m *SDataSourceManager) getMeasurementQueryInfluxdb(query jsonutils.JSONObject,
measurementFilter, tagFilter string) (rtnMeasurements []monitor.InfluxMeasurement, err error) {
database, _ := query.GetString("database")
if database == "" {
return rtnMeasurements, merrors.NewArgIsEmptyErr("database")
}
dataSource, err := datasource.GetDefaultSource("")
if err != nil {
return rtnMeasurements, errors.Wrap(err, "s.GetDefaultSource")
}
db := influxdb.NewInfluxdb(dataSource.Url)
db.SetDatabase(database)
var buffer bytes.Buffer
buffer.WriteString(" SHOW MEASUREMENTS ON ")
buffer.WriteString(database)
if len(measurementFilter) != 0 {
buffer.WriteString(" WITH ")
buffer.WriteString(measurementFilter)
}
if len(tagFilter) != 0 {
buffer.WriteString(" WHERE ")
buffer.WriteString(tagFilter)
}
dbRtn, err := db.Query(buffer.String())
if err != nil {
return rtnMeasurements, errors.Wrap(err, "SHOW MEASUREMENTS")
}
if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
res := dbRtn[0][0]
measurements := make([]monitor.InfluxMeasurement, len(res.Values))
for i := range res.Values {
tmpDict := jsonutils.NewDict()
tmpDict.Add(res.Values[i][0], "measurement")
err = tmpDict.Unmarshal(&measurements[i])
if err != nil {
return rtnMeasurements, errors.Wrap(err, "measurement unmarshal error")
}
}
rtnMeasurements = append(rtnMeasurements, measurements...)
}
return
}
func (m *SDataSourceManager) GetMeasurementsWithDescriptionInfos(query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
ret := jsonutils.NewDict()
rtnMeasurements := make([]monitor.InfluxMeasurement, 0)
measurements, err := MetricMeasurementManager.getMeasurementsFromDB()
if err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "getMeasurementsFromDB")
}
/*filterMeasurements, err := m.filterMeasurementsByTime(measurements, query, tagFilter)
if err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "filterMeasurementsByTime error")
}*/
filterMeasurements := m.getMetricDescriptions(measurements)
if len(filterMeasurements) != 0 {
rtnMeasurements = append(rtnMeasurements, filterMeasurements...)
}
ret.Add(jsonutils.Marshal(&rtnMeasurements), "measurements")
resTypeMap := make(map[string][]monitor.InfluxMeasurement, 0)
resTypes := make([]string, 0)
for _, measurement := range rtnMeasurements {
if typeMeasurements, ok := resTypeMap[measurement.ResType]; ok {
resTypeMap[measurement.ResType] = append(typeMeasurements, measurement)
continue
}
resTypes = append(resTypes, measurement.ResType)
resTypeMap[measurement.ResType] = []monitor.InfluxMeasurement{measurement}
}
sort.Slice(resTypes, func(i, j int) bool {
r1 := resTypes[i]
r2 := resTypes[j]
return monitor.ResTypeScoreMap[r1] < monitor.ResTypeScoreMap[r2]
})
for _, measures := range resTypeMap {
sort.Slice(measures, func(i, j int) bool {
return measures[i].Score < measures[j].Score
})
}
ret.Add(jsonutils.Marshal(&resTypes), "res_types")
ret.Add(jsonutils.Marshal(&resTypeMap), "res_type_measurements")
return ret, nil
}
func (m *SDataSourceManager) GetMeasurementsWithOutTimeFilter(query jsonutils.JSONObject,
measurementFilter, tagFilter string) (jsonutils.JSONObject,
error) {
ret := jsonutils.NewDict()
database, _ := query.GetString("database")
if database == "" {
return jsonutils.JSONNull, httperrors.NewInputParameterError("not support database")
}
dataSource, err := datasource.GetDefaultSource("")
if err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
}
db := influxdb.NewInfluxdb(dataSource.Url)
db.SetDatabase(database)
var buffer bytes.Buffer
buffer.WriteString(" SHOW MEASUREMENTS ON ")
buffer.WriteString(database)
if len(measurementFilter) != 0 {
buffer.WriteString(" WITH ")
buffer.WriteString(measurementFilter)
}
if len(tagFilter) != 0 {
buffer.WriteString(" WHERE ")
buffer.WriteString(tagFilter)
}
dbRtn, err := db.Query(buffer.String())
if err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "SHOW MEASUREMENTS")
}
if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
res := dbRtn[0][0]
measurements := make([]monitor.InfluxMeasurement, len(res.Values))
for i := range res.Values {
tmpDict := jsonutils.NewDict()
tmpDict.Add(res.Values[i][0], "measurement")
err := tmpDict.Unmarshal(&measurements[i])
if err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "measurement unmarshal error")
}
}
ret.Add(jsonutils.Marshal(&measurements), "measurements")
}
return ret, nil
}
func (m *SDataSourceManager) getMetricDescriptions(influxdbMeasurements []monitor.InfluxMeasurement) (
descMeasurements []monitor.InfluxMeasurement) {
userCred := auth.AdminCredential()
listInput := new(monitor.MetricListInput)
for _, measurement := range influxdbMeasurements {
listInput.Measurement.Names = append(listInput.Measurement.Names, measurement.Measurement)
}
query, err := MetricMeasurementManager.ListItemFilter(context.Background(), MetricMeasurementManager.Query(), userCred,
*listInput)
if err != nil {
log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
}
descriMeasurements, err := MetricMeasurementManager.getMeasurement(query)
if len(descriMeasurements) != 0 {
measurementsIns := make([]interface{}, len(descriMeasurements))
for i, _ := range descriMeasurements {
measurementsIns[i] = &descriMeasurements[i]
}
details := MetricMeasurementManager.FetchCustomizeColumns(context.Background(), userCred, jsonutils.NewDict(), measurementsIns,
stringutils2.NewSortedStrings([]string{}), true)
if err != nil {
log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
}
for i, measureDes := range descriMeasurements {
for j, _ := range influxdbMeasurements {
if measureDes.Name == influxdbMeasurements[j].Measurement {
if len(measureDes.DisplayName) != 0 {
influxdbMeasurements[j].MeasurementDisplayName = measureDes.DisplayName
}
if len(measureDes.ResType) != 0 {
influxdbMeasurements[j].ResType = measureDes.ResType
}
if measureDes.Score != 0 {
influxdbMeasurements[j].Score = measureDes.Score
}
fieldDesMap := make(map[string]monitor.MetricFieldDetail, 0)
fields := make([]string, 0)
fieldKeys := stringutils2.NewSortedStrings(influxdbMeasurements[j].FieldKey)
for fieldIndex, fieldDes := range details[i].MetricFields {
if len(fieldDes.DisplayName) != 0 {
fieldDesMap[fieldDes.Name] = details[i].MetricFields[fieldIndex]
}
if fieldKeys.Contains(fieldDes.Name) {
fields = append(fields, fieldDes.Name)
}
}
influxdbMeasurements[j].FieldDescriptions = fieldDesMap
influxdbMeasurements[j].Database = measureDes.Database
influxdbMeasurements[j].FieldKey = fields
descMeasurements = append(descMeasurements, influxdbMeasurements[j])
}
}
}
}
return
}
func (m *SDataSourceManager) filterMeasurementsByTime(
measurements []monitor.InfluxMeasurement, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
timeF, err := m.getFromAndToFromParam(query)
if err != nil {
return nil, err
}
filterMeasurements, err := m.getFilterMeasurementsParallel(timeF.From, timeF.To, measurements, tagFilter)
if err != nil {
return nil, err
}
return filterMeasurements, nil
}
type timeFilter struct {
From string
To string
}
func (m *SDataSourceManager) getFromAndToFromParam(query jsonutils.JSONObject) (timeFilter, error) {
timeF := timeFilter{}
from, _ := query.GetString("from")
if len(from) == 0 {
from = "6h"
}
to, _ := query.GetString("to")
if len(to) == 0 {
to = "now"
}
timeFilter := monitor.AlertQuery{
From: from,
To: to,
}
err := validators.ValidateFromAndToValue(timeFilter)
if err != nil {
return timeF, err
}
timeF.From = from
timeF.To = to
return timeF, nil
}
func (m *SDataSourceManager) getFilterMeasurementsParallel(from, to string,
measurements []monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
filterMeasurements := make([]monitor.InfluxMeasurement, len(measurements))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
measurementQueryGroup, _ := errgroup.WithContext(ctx)
for i := range measurements {
index := i
tmp := measurements[index]
measurementQueryGroup.Go(func() error {
errCh := make(chan error)
go func() {
ret, err := m.getFilterMeasurement(from, to, tmp, tagFilter)
if err != nil {
errCh <- errors.Wrapf(err, "getFilterMeasurement %d", index)
return
}
filterMeasurements[index] = *ret
errCh <- nil
}()
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "filter measurement from TSDB")
case err := <-errCh:
if err != nil {
return err
}
return nil
}
}
})
}
if err := measurementQueryGroup.Wait(); err != nil {
return nil, errors.Wrap(err, "measuremetnQueryGroup.Wait()")
}
ret := make([]monitor.InfluxMeasurement, 0)
for _, fm := range filterMeasurements {
if len(fm.Measurement) != 0 {
tmp := fm
ret = append(ret, tmp)
}
}
return ret, nil
}
func (m *SDataSourceManager) GetTSDBDriver() (tsdb.TsdbQueryEndpoint, error) {
ep, err := datasource.GetDefaultQueryEndpoint()
if err != nil {
return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
}
return ep, nil
}
func (m *SDataSourceManager) getFilterMeasurement(from, to string, measurement monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) (*monitor.InfluxMeasurement, error) {
dds, _ := datasource.GetDefaultSource("")
ep, err := m.GetTSDBDriver()
if err != nil {
return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
}
retMs, err := ep.FilterMeasurement(context.Background(), dds, from, to, &measurement, tagFilter)
if err != nil {
return nil, errors.Wrap(err, "Get endpoint filtered measurement")
}
return retMs, nil
}
func renderTimeFilter(from, to string) string {
if strings.Contains(from, "now-") {
from = "now() - " + strings.Replace(from, "now-", "", 1)
} else {
from = "now() - " + from
}
tmp := ""
if to != "now" && to != "" {
tmp = " and time < now() - " + strings.Replace(to, "now-", "", 1)
}
return fmt.Sprintf("time > %s%s", from, tmp)
}
func (m *SDataSourceManager) GetMetricMeasurement(userCred mcclient.TokenCredential, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
database, _ := query.GetString("database")
if database == "" {
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("database")
}
measurement, _ := query.GetString("measurement")
if measurement == "" {
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("measurement")
}
field, _ := query.GetString("field")
if field == "" {
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("field")
}
from, _ := query.GetString("from")
if len(from) == 0 {
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("from")
}
timeF, err := m.getFromAndToFromParam(query)
if err != nil {
return nil, errors.Wrap(err, "getFromAndToFromParam")
}
//skipCheckSeries := jsonutils.QueryBoolean(query, "skip_check_series", false)
output := new(monitor.InfluxMeasurement)
output.Measurement = measurement
output.Database = database
output.TagValue = make(map[string][]string, 0)
output.FieldKey = []string{field}
// 只查询过去 30m 的指标
if timeF.To == "now" {
timeF.From = "30m"
}
if err := getTagValues(userCred, output, timeF, tagFilter, true); err != nil {
return jsonutils.JSONNull, errors.Wrap(err, "getTagValues error")
}
m.filterRtnTags(output)
return jsonutils.Marshal(output), nil
}
func (m *SDataSourceManager) filterRtnTags(output *monitor.InfluxMeasurement) {
for _, tag := range []string{hostconsts.TELEGRAF_TAG_KEY_BRAND, hostconsts.TELEGRAF_TAG_KEY_PLATFORM,
hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR} {
if val, ok := output.TagValue[tag]; ok {
output.TagValue[hostconsts.TELEGRAF_TAG_KEY_BRAND] = val
break
}
}
for _, tag := range []string{
"source", hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE, hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
"is_vm", "os_type", hostconsts.TELEGRAF_TAG_KEY_PLATFORM, hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR,
"domain_name", "region", "ips", "vip", "vip_eip", "eip", "eip_mode",
labels.MetricName, translator.UNION_RESULT_NAME,
} {
if _, ok := output.TagValue[tag]; ok {
delete(output.TagValue, tag)
}
}
// hide VictoriaMetrics telegraf db tag
if val, ok := output.TagValue[VICTORIA_METRICS_DB_TAG_KEY]; ok {
if len(val) == 1 && val[0] == VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF {
delete(output.TagValue, VICTORIA_METRICS_DB_TAG_KEY)
}
}
repTag := make([]string, 0)
for tag, _ := range output.TagValue {
repTag = append(repTag, tag)
}
output.TagKey = repTag
}
func (m *SDataSourceManager) filterTagValue(measurement monitor.InfluxMeasurement, timeF timeFilter,
db *influxdb.SInfluxdb, tagValChan *influxdbTagValueChan, tagFilter string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
tagValGroup2, _ := errgroup.WithContext(ctx)
tagValChan2 := influxdbTagValueChan{
rtnChan: make(chan map[string][]string, len(measurement.TagKey)),
count: len(measurement.TagKey),
}
for i, _ := range measurement.TagKey {
tmpkey := measurement.TagKey[i]
tagValGroup2.Go(func() error {
return m.getFilterMeasurementTagValue(&tagValChan2, timeF.From, timeF.To, measurement.FieldKey[0],
tmpkey, measurement, db, tagFilter)
})
}
tagValGroup2.Go(func() error {
valMaps := make(map[string][]string, 0)
for i := 0; i < tagValChan2.count; i++ {
select {
case valMap := <-tagValChan2.rtnChan:
for key, val := range valMap {
if _, ok := valMaps[key]; ok {
valMaps[key] = append(valMaps[key], val...)
continue
}
valMaps[key] = val
}
case <-ctx.Done():
return fmt.Errorf("filter getFilterMeasurementTagValue time out")
}
}
tagValChan.rtnChan <- valMaps
close(tagValChan2.rtnChan)
return nil
})
return tagValGroup2.Wait()
}
func tagValUnion(measurement *monitor.InfluxMeasurement, rtn map[string][]string) {
keys := make([]string, 0)
for _, tag := range measurement.TagKey {
if rtnTagVal, ok := rtn[tag]; ok {
keys = append(keys, tag)
if _, ok := measurement.TagValue[tag]; !ok {
measurement.TagValue[tag] = rtnTagVal
continue
}
measurement.TagValue[tag] = union(measurement.TagValue[tag], rtnTagVal)
}
}
measurement.TagKey = keys
}
func union(slice1, slice2 []string) []string {
m := make(map[string]int)
for _, v := range slice1 {
m[v]++
}
for _, v := range slice2 {
times, _ := m[v]
if times == 0 {
slice1 = append(slice1, v)
}
}
return slice1
}
type InfluxdbSubscription struct {
SubName string
DataBase string
//retention policy
Rc string
Url string
}
func (m *SDataSourceManager) AddSubscription(subscription InfluxdbSubscription) error {
query := fmt.Sprintf("CREATE SUBSCRIPTION %s ON %s.%s DESTINATIONS ALL %s",
jsonutils.NewString(subscription.SubName).String(),
jsonutils.NewString(subscription.DataBase).String(),
jsonutils.NewString(subscription.Rc).String(),
strings.ReplaceAll(jsonutils.NewString(subscription.Url).String(), "\"", "'"),
)
dataSource, err := datasource.GetDefaultSource("")
if err != nil {
return errors.Wrap(err, "s.GetDefaultSource")
}
db := influxdb.NewInfluxdbWithDebug(dataSource.Url, true)
db.SetDatabase(subscription.DataBase)
rtn, err := db.GetQuery(query)
if err != nil {
return err
}
for _, result := range rtn {
for _, obj := range result {
objJson := jsonutils.Marshal(&obj)
log.Errorln(objJson.String())
}
}
return nil
}
func (m *SDataSourceManager) DropSubscription(subscription InfluxdbSubscription) error {
query := fmt.Sprintf("DROP SUBSCRIPTION %s ON %s.%s", jsonutils.NewString(subscription.SubName).String(),
jsonutils.NewString(subscription.DataBase).String(),
jsonutils.NewString(subscription.Rc).String(),
)
dataSource, err := datasource.GetDefaultSource("")
if err != nil {
return errors.Wrap(err, "s.GetDefaultSource")
}
db := influxdb.NewInfluxdb(dataSource.Url)
db.SetDatabase(subscription.DataBase)
rtn, err := db.Query(query)
if err != nil {
return err
}
for _, result := range rtn {
for _, obj := range result {
objJson := jsonutils.Marshal(&obj)
log.Errorln(objJson.String())
}
}
return nil
}
/*func getAttributesOnMeasurement(database, tp string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
query := fmt.Sprintf("SHOW %s KEYS ON %s FROM %s", tp, database, output.Measurement)
dbRtn, err := db.Query(query)
if err != nil {
return errors.Wrapf(err, "SHOW MEASUREMENTS: %s", query)
}
if len(dbRtn) == 0 || len(dbRtn[0]) == 0 {
return nil
}
res := dbRtn[0][0]
tmpDict := jsonutils.NewDict()
tmpArr := jsonutils.NewArray()
for i := range res.Values {
v, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
if filterTagKey(v) {
continue
}
tmpArr.Add(res.Values[i][0])
}
tmpDict.Add(tmpArr, res.Columns[0])
err = tmpDict.Unmarshal(output)
if err != nil {
return errors.Wrap(err, "measurement unmarshal error")
}
return nil
}*/
func getTagValues(userCred mcclient.TokenCredential, output *monitor.InfluxMeasurement, timeF timeFilter, tagFilter *monitor.MetricQueryTag, skipCheckSeries bool) error {
mq := monitor.MetricQuery{
Database: output.Database,
Measurement: output.Measurement,
Selects: []monitor.MetricQuerySelect{
{
{
Type: "field",
Params: []string{output.FieldKey[0]},
},
{
Type: "last",
},
},
},
GroupBy: []monitor.MetricQueryPart{
{
Type: "field",
Params: []string{"*"},
},
},
}
if tagFilter != nil {
mq.Tags = []monitor.MetricQueryTag{
{
Key: tagFilter.Key,
Operator: tagFilter.Operator,
Value: tagFilter.Value,
},
}
}
aq := &monitor.AlertQuery{
Model: mq,
From: timeF.From,
To: timeF.To,
}
q := monitor.MetricQueryInput{
From: timeF.From,
To: timeF.To,
Interval: "3m",
MetricQuery: []*monitor.AlertQuery{
aq,
},
SkipCheckSeries: skipCheckSeries,
}
ret, err := doQuery(userCred, q)
if err != nil {
return errors.Wrapf(err, "getTagValues query error %s", jsonutils.Marshal(q))
}
// 2. group tag and values
tagValMap := make(map[string][]string)
tagKeys := make([]string, 0)
if len(ret.Series) == 0 {
return nil
}
for _, s := range ret.Series {
tagMap := s.Tags
for key, valStr := range tagMap {
valStr = renderTagVal(valStr)
if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
continue
}
if filterTagKey(key) {
continue
}
if valArr, ok := tagValMap[key]; ok {
if !utils.IsInStringArray(valStr, valArr) {
tagValMap[key] = append(valArr, valStr)
}
continue
}
tagValMap[key] = []string{valStr}
tagKeys = append(tagKeys, key)
}
}
output.TagValue = tagValMap
sort.Strings(tagKeys)
output.TagKey = tagKeys
return nil
}
func getTagValue(database string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
if len(output.TagKey) == 0 {
return nil
}
tagKeyStr := jsonutils.NewStringArray(output.TagKey).String()
tagKeyStr = tagKeyStr[1 : len(tagKeyStr)-1]
dbRtn, err := db.Query(fmt.Sprintf("SHOW TAG VALUES ON %s FROM %s WITH KEY IN (%s)", database, output.Measurement, tagKeyStr))
if err != nil {
return err
}
res := dbRtn[0][0]
tagValue := make(map[string][]string, 0)
keys := strings.Join(output.TagKey, ",")
for i := range res.Values {
val, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
if !strings.Contains(keys, val) {
continue
}
if _, ok := tagValue[val]; !ok {
tagValue[val] = make([]string, 0)
}
tag, _ := res.Values[i][1].(*jsonutils.JSONString).GetString()
if filterTagValue(tag) {
delete(tagValue, val)
continue
}
tagValue[val] = append(tagValue[val], tag)
}
output.TagValue = tagValue
//TagKey == TagValue.keys
tagK := make([]string, 0)
for tag, _ := range output.TagValue {
tagK = append(tagK, tag)
}
output.TagKey = tagK
return nil
}
type influxdbTagValueChan struct {
rtnChan chan map[string][]string
count int
}
func (m *SDataSourceManager) getFilterMeasurementTagValue(tagValueChan *influxdbTagValueChan, from string,
to string, field string, tagKey string,
measurement monitor.InfluxMeasurement, db *influxdb.SInfluxdb, tagFilter string) error {
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf(`SELECT last("%s") FROM "%s" WHERE %s `, field, measurement.Measurement,
renderTimeFilter(from, to)))
if len(tagFilter) != 0 {
buffer.WriteString(fmt.Sprintf(` AND %s `, tagFilter))
}
buffer.WriteString(fmt.Sprintf(` GROUP BY %q`, tagKey))
log.Errorln(buffer.String())
rtn, err := db.Query(buffer.String())
if err != nil {
return errors.Wrap(err, "getFilterMeasurementTagValue query error")
}
tagValMap := make(map[string][]string)
if len(rtn) != 0 && len(rtn[0]) != 0 {
for rtnIndex, _ := range rtn {
for serieIndex, _ := range rtn[rtnIndex] {
tagMap, _ := rtn[rtnIndex][serieIndex].Tags.GetMap()
for key, valObj := range tagMap {
valStr, _ := valObj.GetString()
valStr = renderTagVal(valStr)
if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
continue
}
if !utils.IsInStringArray(key, measurement.TagKey) {
//measurement.TagKey = append(measurement.TagKey, key)
continue
}
if valArr, ok := tagValMap[key]; ok {
if !utils.IsInStringArray(valStr, valArr) {
tagValMap[key] = append(valArr, valStr)
}
continue
}
tagValMap[key] = []string{valStr}
}
}
}
measurement.TagValue = tagValMap
}
tagValueChan.rtnChan <- tagValMap
return nil
}
func renderTagVal(val string) string {
return strings.ReplaceAll(val, "+", " ")
}
func floatEquals(a, b float64) bool {
eps := 0.000000001
if math.Abs(a-b) < eps {
return true
}
return false
}
var filterKey = []string{"perf_instance", "res_type", "status", "cloudregion", "os_type", "is_vm"}
func filterTagKey(key string) bool {
whiteListIdKeys := sets.NewString("dev_id", "die_id")
if whiteListIdKeys.Has(key) {
return false
}
if strings.Contains(key, "_id") {
return true
}
if key == "perf_instance" {
return true
}
return false
}
func filterTagValue(val string) bool {
if compile.MatchString(val) {
return true
}
return false
}