mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-05-07 22:24:32 +08:00
889 lines
26 KiB
Go
889 lines
26 KiB
Go
// Copyright 2019 Yunion
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package models
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/influxdata/promql/v2/pkg/labels"
|
|
"github.com/zexi/influxql-to-metricsql/converter/translator"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"yunion.io/x/jsonutils"
|
|
"yunion.io/x/log"
|
|
"yunion.io/x/pkg/errors"
|
|
"yunion.io/x/pkg/tristate"
|
|
"yunion.io/x/pkg/util/sets"
|
|
"yunion.io/x/pkg/utils"
|
|
|
|
"yunion.io/x/onecloud/pkg/apis/monitor"
|
|
"yunion.io/x/onecloud/pkg/cloudcommon/db"
|
|
"yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
|
|
"yunion.io/x/onecloud/pkg/httperrors"
|
|
"yunion.io/x/onecloud/pkg/mcclient"
|
|
"yunion.io/x/onecloud/pkg/mcclient/auth"
|
|
"yunion.io/x/onecloud/pkg/monitor/datasource"
|
|
merrors "yunion.io/x/onecloud/pkg/monitor/errors"
|
|
"yunion.io/x/onecloud/pkg/monitor/tsdb"
|
|
"yunion.io/x/onecloud/pkg/monitor/validators"
|
|
"yunion.io/x/onecloud/pkg/util/influxdb"
|
|
"yunion.io/x/onecloud/pkg/util/stringutils2"
|
|
)
|
|
|
|
const (
|
|
VICTORIA_METRICS_DB_TAG_KEY = "db"
|
|
VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF = "telegraf"
|
|
)
|
|
|
|
var (
|
|
DataSourceManager *SDataSourceManager
|
|
compile = regexp.MustCompile(`\w{8}(-\w{4}){3}-\w{12}`)
|
|
)
|
|
|
|
func init() {
|
|
DataSourceManager = &SDataSourceManager{
|
|
SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
|
|
SDataSource{},
|
|
"datasources_tbl",
|
|
"datasource",
|
|
"datasources",
|
|
),
|
|
}
|
|
DataSourceManager.SetVirtualObject(DataSourceManager)
|
|
}
|
|
|
|
// +onecloud:swagger-gen-model-singular=datasource
|
|
// +onecloud:swagger-gen-model-plural=datasources
|
|
type SDataSourceManager struct {
|
|
db.SStandaloneResourceBaseManager
|
|
}
|
|
|
|
type SDataSource struct {
|
|
db.SStandaloneResourceBase
|
|
|
|
Type string `nullable:"false" list:"user"`
|
|
Url string `nullable:"false" list:"user"`
|
|
User string `width:"64" charset:"utf8" nullable:"true"`
|
|
Password string `width:"64" charset:"utf8" nullable:"true"`
|
|
Database string `width:"64" charset:"utf8" nullable:"true"`
|
|
IsDefault tristate.TriState `default:"false" create:"optional"`
|
|
/*
|
|
TimeInterval string
|
|
BasicAuth bool
|
|
BasicAuthUser string
|
|
BasicAuthPassword string
|
|
*/
|
|
}
|
|
|
|
func (m *SDataSourceManager) GetSource(id string) (*SDataSource, error) {
|
|
ret, err := m.FetchById(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ret.(*SDataSource), nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) GetDatabases() (jsonutils.JSONObject, error) {
|
|
ret := jsonutils.NewDict()
|
|
dataSource, err := datasource.GetDefaultSource("")
|
|
if err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
|
|
}
|
|
db := influxdb.NewInfluxdb(dataSource.Url)
|
|
//db.SetDatabase("telegraf")
|
|
databases, err := db.GetDatabases()
|
|
if err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "GetDatabases")
|
|
}
|
|
ret.Add(jsonutils.NewStringArray(databases), "databases")
|
|
return ret, nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) GetMeasurements(query jsonutils.JSONObject,
|
|
measurementFilter, tagFilter string) (jsonutils.JSONObject,
|
|
error) {
|
|
ret := jsonutils.NewDict()
|
|
measurements, err := m.getMeasurementQueryInfluxdb(query, measurementFilter, tagFilter)
|
|
if err != nil {
|
|
return jsonutils.JSONNull, err
|
|
}
|
|
ret.Add(jsonutils.Marshal(&measurements), "measurements")
|
|
return ret, nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) getMeasurementQueryInfluxdb(query jsonutils.JSONObject,
|
|
measurementFilter, tagFilter string) (rtnMeasurements []monitor.InfluxMeasurement, err error) {
|
|
database, _ := query.GetString("database")
|
|
if database == "" {
|
|
return rtnMeasurements, merrors.NewArgIsEmptyErr("database")
|
|
}
|
|
dataSource, err := datasource.GetDefaultSource("")
|
|
if err != nil {
|
|
return rtnMeasurements, errors.Wrap(err, "s.GetDefaultSource")
|
|
}
|
|
db := influxdb.NewInfluxdb(dataSource.Url)
|
|
db.SetDatabase(database)
|
|
var buffer bytes.Buffer
|
|
buffer.WriteString(" SHOW MEASUREMENTS ON ")
|
|
buffer.WriteString(database)
|
|
if len(measurementFilter) != 0 {
|
|
buffer.WriteString(" WITH ")
|
|
buffer.WriteString(measurementFilter)
|
|
}
|
|
if len(tagFilter) != 0 {
|
|
buffer.WriteString(" WHERE ")
|
|
buffer.WriteString(tagFilter)
|
|
}
|
|
dbRtn, err := db.Query(buffer.String())
|
|
if err != nil {
|
|
return rtnMeasurements, errors.Wrap(err, "SHOW MEASUREMENTS")
|
|
}
|
|
if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
|
|
res := dbRtn[0][0]
|
|
measurements := make([]monitor.InfluxMeasurement, len(res.Values))
|
|
for i := range res.Values {
|
|
tmpDict := jsonutils.NewDict()
|
|
tmpDict.Add(res.Values[i][0], "measurement")
|
|
err = tmpDict.Unmarshal(&measurements[i])
|
|
if err != nil {
|
|
return rtnMeasurements, errors.Wrap(err, "measurement unmarshal error")
|
|
}
|
|
}
|
|
rtnMeasurements = append(rtnMeasurements, measurements...)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (m *SDataSourceManager) GetMeasurementsWithDescriptionInfos(query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
|
|
ret := jsonutils.NewDict()
|
|
rtnMeasurements := make([]monitor.InfluxMeasurement, 0)
|
|
measurements, err := MetricMeasurementManager.getMeasurementsFromDB()
|
|
if err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "getMeasurementsFromDB")
|
|
}
|
|
/*filterMeasurements, err := m.filterMeasurementsByTime(measurements, query, tagFilter)
|
|
if err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "filterMeasurementsByTime error")
|
|
}*/
|
|
filterMeasurements := m.getMetricDescriptions(measurements)
|
|
if len(filterMeasurements) != 0 {
|
|
rtnMeasurements = append(rtnMeasurements, filterMeasurements...)
|
|
}
|
|
|
|
ret.Add(jsonutils.Marshal(&rtnMeasurements), "measurements")
|
|
resTypeMap := make(map[string][]monitor.InfluxMeasurement, 0)
|
|
resTypes := make([]string, 0)
|
|
for _, measurement := range rtnMeasurements {
|
|
if typeMeasurements, ok := resTypeMap[measurement.ResType]; ok {
|
|
resTypeMap[measurement.ResType] = append(typeMeasurements, measurement)
|
|
continue
|
|
}
|
|
resTypes = append(resTypes, measurement.ResType)
|
|
resTypeMap[measurement.ResType] = []monitor.InfluxMeasurement{measurement}
|
|
}
|
|
sort.Slice(resTypes, func(i, j int) bool {
|
|
r1 := resTypes[i]
|
|
r2 := resTypes[j]
|
|
return monitor.ResTypeScoreMap[r1] < monitor.ResTypeScoreMap[r2]
|
|
})
|
|
for _, measures := range resTypeMap {
|
|
sort.Slice(measures, func(i, j int) bool {
|
|
return measures[i].Score < measures[j].Score
|
|
})
|
|
}
|
|
ret.Add(jsonutils.Marshal(&resTypes), "res_types")
|
|
ret.Add(jsonutils.Marshal(&resTypeMap), "res_type_measurements")
|
|
return ret, nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) GetMeasurementsWithOutTimeFilter(query jsonutils.JSONObject,
|
|
measurementFilter, tagFilter string) (jsonutils.JSONObject,
|
|
error) {
|
|
ret := jsonutils.NewDict()
|
|
database, _ := query.GetString("database")
|
|
if database == "" {
|
|
return jsonutils.JSONNull, httperrors.NewInputParameterError("not support database")
|
|
}
|
|
dataSource, err := datasource.GetDefaultSource("")
|
|
if err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "s.GetDefaultSource")
|
|
}
|
|
db := influxdb.NewInfluxdb(dataSource.Url)
|
|
db.SetDatabase(database)
|
|
var buffer bytes.Buffer
|
|
buffer.WriteString(" SHOW MEASUREMENTS ON ")
|
|
buffer.WriteString(database)
|
|
if len(measurementFilter) != 0 {
|
|
buffer.WriteString(" WITH ")
|
|
buffer.WriteString(measurementFilter)
|
|
}
|
|
if len(tagFilter) != 0 {
|
|
buffer.WriteString(" WHERE ")
|
|
buffer.WriteString(tagFilter)
|
|
}
|
|
dbRtn, err := db.Query(buffer.String())
|
|
if err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "SHOW MEASUREMENTS")
|
|
}
|
|
if len(dbRtn) > 0 && len(dbRtn[0]) > 0 {
|
|
res := dbRtn[0][0]
|
|
measurements := make([]monitor.InfluxMeasurement, len(res.Values))
|
|
for i := range res.Values {
|
|
tmpDict := jsonutils.NewDict()
|
|
tmpDict.Add(res.Values[i][0], "measurement")
|
|
err := tmpDict.Unmarshal(&measurements[i])
|
|
if err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "measurement unmarshal error")
|
|
}
|
|
}
|
|
ret.Add(jsonutils.Marshal(&measurements), "measurements")
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) getMetricDescriptions(influxdbMeasurements []monitor.InfluxMeasurement) (
|
|
descMeasurements []monitor.InfluxMeasurement) {
|
|
userCred := auth.AdminCredential()
|
|
listInput := new(monitor.MetricListInput)
|
|
for _, measurement := range influxdbMeasurements {
|
|
listInput.Measurement.Names = append(listInput.Measurement.Names, measurement.Measurement)
|
|
}
|
|
query, err := MetricMeasurementManager.ListItemFilter(context.Background(), MetricMeasurementManager.Query(), userCred,
|
|
*listInput)
|
|
if err != nil {
|
|
log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
|
|
}
|
|
descriMeasurements, err := MetricMeasurementManager.getMeasurement(query)
|
|
if len(descriMeasurements) != 0 {
|
|
|
|
measurementsIns := make([]interface{}, len(descriMeasurements))
|
|
for i, _ := range descriMeasurements {
|
|
measurementsIns[i] = &descriMeasurements[i]
|
|
}
|
|
details := MetricMeasurementManager.FetchCustomizeColumns(context.Background(), userCred, jsonutils.NewDict(), measurementsIns,
|
|
stringutils2.NewSortedStrings([]string{}), true)
|
|
if err != nil {
|
|
log.Errorln(errors.Wrap(err, "DataSourceManager getMetricDescriptions error"))
|
|
}
|
|
for i, measureDes := range descriMeasurements {
|
|
for j, _ := range influxdbMeasurements {
|
|
if measureDes.Name == influxdbMeasurements[j].Measurement {
|
|
if len(measureDes.DisplayName) != 0 {
|
|
influxdbMeasurements[j].MeasurementDisplayName = measureDes.DisplayName
|
|
}
|
|
if len(measureDes.ResType) != 0 {
|
|
influxdbMeasurements[j].ResType = measureDes.ResType
|
|
}
|
|
if measureDes.Score != 0 {
|
|
influxdbMeasurements[j].Score = measureDes.Score
|
|
}
|
|
fieldDesMap := make(map[string]monitor.MetricFieldDetail, 0)
|
|
fields := make([]string, 0)
|
|
fieldKeys := stringutils2.NewSortedStrings(influxdbMeasurements[j].FieldKey)
|
|
for fieldIndex, fieldDes := range details[i].MetricFields {
|
|
if len(fieldDes.DisplayName) != 0 {
|
|
fieldDesMap[fieldDes.Name] = details[i].MetricFields[fieldIndex]
|
|
}
|
|
if fieldKeys.Contains(fieldDes.Name) {
|
|
fields = append(fields, fieldDes.Name)
|
|
}
|
|
}
|
|
influxdbMeasurements[j].FieldDescriptions = fieldDesMap
|
|
influxdbMeasurements[j].Database = measureDes.Database
|
|
influxdbMeasurements[j].FieldKey = fields
|
|
descMeasurements = append(descMeasurements, influxdbMeasurements[j])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (m *SDataSourceManager) filterMeasurementsByTime(
|
|
measurements []monitor.InfluxMeasurement, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
|
|
timeF, err := m.getFromAndToFromParam(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
filterMeasurements, err := m.getFilterMeasurementsParallel(timeF.From, timeF.To, measurements, tagFilter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return filterMeasurements, nil
|
|
}
|
|
|
|
type timeFilter struct {
|
|
From string
|
|
To string
|
|
}
|
|
|
|
func (m *SDataSourceManager) getFromAndToFromParam(query jsonutils.JSONObject) (timeFilter, error) {
|
|
timeF := timeFilter{}
|
|
from, _ := query.GetString("from")
|
|
if len(from) == 0 {
|
|
from = "6h"
|
|
}
|
|
to, _ := query.GetString("to")
|
|
if len(to) == 0 {
|
|
to = "now"
|
|
}
|
|
timeFilter := monitor.AlertQuery{
|
|
From: from,
|
|
To: to,
|
|
}
|
|
err := validators.ValidateFromAndToValue(timeFilter)
|
|
if err != nil {
|
|
return timeF, err
|
|
}
|
|
timeF.From = from
|
|
timeF.To = to
|
|
return timeF, nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) getFilterMeasurementsParallel(from, to string,
|
|
measurements []monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) ([]monitor.InfluxMeasurement, error) {
|
|
filterMeasurements := make([]monitor.InfluxMeasurement, len(measurements))
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
|
|
defer cancel()
|
|
|
|
measurementQueryGroup, _ := errgroup.WithContext(ctx)
|
|
for i := range measurements {
|
|
index := i
|
|
tmp := measurements[index]
|
|
measurementQueryGroup.Go(func() error {
|
|
errCh := make(chan error)
|
|
go func() {
|
|
ret, err := m.getFilterMeasurement(from, to, tmp, tagFilter)
|
|
if err != nil {
|
|
errCh <- errors.Wrapf(err, "getFilterMeasurement %d", index)
|
|
return
|
|
}
|
|
filterMeasurements[index] = *ret
|
|
errCh <- nil
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.Wrap(ctx.Err(), "filter measurement from TSDB")
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
})
|
|
}
|
|
if err := measurementQueryGroup.Wait(); err != nil {
|
|
return nil, errors.Wrap(err, "measuremetnQueryGroup.Wait()")
|
|
}
|
|
ret := make([]monitor.InfluxMeasurement, 0)
|
|
for _, fm := range filterMeasurements {
|
|
if len(fm.Measurement) != 0 {
|
|
tmp := fm
|
|
ret = append(ret, tmp)
|
|
}
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) GetTSDBDriver() (tsdb.TsdbQueryEndpoint, error) {
|
|
ep, err := datasource.GetDefaultQueryEndpoint()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
|
|
}
|
|
return ep, nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) getFilterMeasurement(from, to string, measurement monitor.InfluxMeasurement, tagFilter *monitor.MetricQueryTag) (*monitor.InfluxMeasurement, error) {
|
|
dds, _ := datasource.GetDefaultSource("")
|
|
ep, err := m.GetTSDBDriver()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "GetDefaultQueryEndpoint")
|
|
}
|
|
retMs, err := ep.FilterMeasurement(context.Background(), dds, from, to, &measurement, tagFilter)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Get endpoint filtered measurement")
|
|
}
|
|
return retMs, nil
|
|
}
|
|
|
|
func renderTimeFilter(from, to string) string {
|
|
if strings.Contains(from, "now-") {
|
|
from = "now() - " + strings.Replace(from, "now-", "", 1)
|
|
} else {
|
|
from = "now() - " + from
|
|
}
|
|
|
|
tmp := ""
|
|
if to != "now" && to != "" {
|
|
tmp = " and time < now() - " + strings.Replace(to, "now-", "", 1)
|
|
}
|
|
|
|
return fmt.Sprintf("time > %s%s", from, tmp)
|
|
|
|
}
|
|
|
|
func (m *SDataSourceManager) GetMetricMeasurement(userCred mcclient.TokenCredential, query jsonutils.JSONObject, tagFilter *monitor.MetricQueryTag) (jsonutils.JSONObject, error) {
|
|
database, _ := query.GetString("database")
|
|
if database == "" {
|
|
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("database")
|
|
}
|
|
measurement, _ := query.GetString("measurement")
|
|
if measurement == "" {
|
|
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("measurement")
|
|
}
|
|
field, _ := query.GetString("field")
|
|
if field == "" {
|
|
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("field")
|
|
}
|
|
from, _ := query.GetString("from")
|
|
if len(from) == 0 {
|
|
return jsonutils.JSONNull, merrors.NewArgIsEmptyErr("from")
|
|
}
|
|
timeF, err := m.getFromAndToFromParam(query)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "getFromAndToFromParam")
|
|
}
|
|
|
|
//skipCheckSeries := jsonutils.QueryBoolean(query, "skip_check_series", false)
|
|
|
|
output := new(monitor.InfluxMeasurement)
|
|
output.Measurement = measurement
|
|
output.Database = database
|
|
output.TagValue = make(map[string][]string, 0)
|
|
|
|
output.FieldKey = []string{field}
|
|
// 只查询过去 30m 的指标
|
|
if timeF.To == "now" {
|
|
timeF.From = "30m"
|
|
}
|
|
if err := getTagValues(userCred, output, timeF, tagFilter, true); err != nil {
|
|
return jsonutils.JSONNull, errors.Wrap(err, "getTagValues error")
|
|
}
|
|
|
|
m.filterRtnTags(output)
|
|
return jsonutils.Marshal(output), nil
|
|
|
|
}
|
|
|
|
func (m *SDataSourceManager) filterRtnTags(output *monitor.InfluxMeasurement) {
|
|
for _, tag := range []string{hostconsts.TELEGRAF_TAG_KEY_BRAND, hostconsts.TELEGRAF_TAG_KEY_PLATFORM,
|
|
hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR} {
|
|
if val, ok := output.TagValue[tag]; ok {
|
|
output.TagValue[hostconsts.TELEGRAF_TAG_KEY_BRAND] = val
|
|
break
|
|
}
|
|
}
|
|
for _, tag := range []string{
|
|
"source", hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE, hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
|
|
"is_vm", "os_type", hostconsts.TELEGRAF_TAG_KEY_PLATFORM, hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR,
|
|
"domain_name", "region", "ips", "vip", "vip_eip", "eip", "eip_mode",
|
|
labels.MetricName, translator.UNION_RESULT_NAME,
|
|
} {
|
|
if _, ok := output.TagValue[tag]; ok {
|
|
delete(output.TagValue, tag)
|
|
}
|
|
}
|
|
// hide VictoriaMetrics telegraf db tag
|
|
if val, ok := output.TagValue[VICTORIA_METRICS_DB_TAG_KEY]; ok {
|
|
if len(val) == 1 && val[0] == VICTORIA_METRICS_DB_TAG_VAL_TELEGRAF {
|
|
delete(output.TagValue, VICTORIA_METRICS_DB_TAG_KEY)
|
|
}
|
|
}
|
|
|
|
repTag := make([]string, 0)
|
|
for tag, _ := range output.TagValue {
|
|
repTag = append(repTag, tag)
|
|
}
|
|
output.TagKey = repTag
|
|
}
|
|
|
|
func (m *SDataSourceManager) filterTagValue(measurement monitor.InfluxMeasurement, timeF timeFilter,
|
|
db *influxdb.SInfluxdb, tagValChan *influxdbTagValueChan, tagFilter string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
|
|
defer cancel()
|
|
tagValGroup2, _ := errgroup.WithContext(ctx)
|
|
tagValChan2 := influxdbTagValueChan{
|
|
rtnChan: make(chan map[string][]string, len(measurement.TagKey)),
|
|
count: len(measurement.TagKey),
|
|
}
|
|
for i, _ := range measurement.TagKey {
|
|
tmpkey := measurement.TagKey[i]
|
|
tagValGroup2.Go(func() error {
|
|
return m.getFilterMeasurementTagValue(&tagValChan2, timeF.From, timeF.To, measurement.FieldKey[0],
|
|
tmpkey, measurement, db, tagFilter)
|
|
})
|
|
}
|
|
tagValGroup2.Go(func() error {
|
|
valMaps := make(map[string][]string, 0)
|
|
for i := 0; i < tagValChan2.count; i++ {
|
|
select {
|
|
case valMap := <-tagValChan2.rtnChan:
|
|
for key, val := range valMap {
|
|
if _, ok := valMaps[key]; ok {
|
|
valMaps[key] = append(valMaps[key], val...)
|
|
continue
|
|
}
|
|
valMaps[key] = val
|
|
}
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("filter getFilterMeasurementTagValue time out")
|
|
}
|
|
}
|
|
tagValChan.rtnChan <- valMaps
|
|
close(tagValChan2.rtnChan)
|
|
return nil
|
|
})
|
|
return tagValGroup2.Wait()
|
|
}
|
|
|
|
func tagValUnion(measurement *monitor.InfluxMeasurement, rtn map[string][]string) {
|
|
keys := make([]string, 0)
|
|
for _, tag := range measurement.TagKey {
|
|
if rtnTagVal, ok := rtn[tag]; ok {
|
|
keys = append(keys, tag)
|
|
if _, ok := measurement.TagValue[tag]; !ok {
|
|
measurement.TagValue[tag] = rtnTagVal
|
|
continue
|
|
}
|
|
measurement.TagValue[tag] = union(measurement.TagValue[tag], rtnTagVal)
|
|
}
|
|
}
|
|
measurement.TagKey = keys
|
|
}
|
|
|
|
func union(slice1, slice2 []string) []string {
|
|
m := make(map[string]int)
|
|
for _, v := range slice1 {
|
|
m[v]++
|
|
}
|
|
|
|
for _, v := range slice2 {
|
|
times, _ := m[v]
|
|
if times == 0 {
|
|
slice1 = append(slice1, v)
|
|
}
|
|
}
|
|
return slice1
|
|
}
|
|
|
|
type InfluxdbSubscription struct {
|
|
SubName string
|
|
DataBase string
|
|
//retention policy
|
|
Rc string
|
|
Url string
|
|
}
|
|
|
|
func (m *SDataSourceManager) AddSubscription(subscription InfluxdbSubscription) error {
|
|
|
|
query := fmt.Sprintf("CREATE SUBSCRIPTION %s ON %s.%s DESTINATIONS ALL %s",
|
|
jsonutils.NewString(subscription.SubName).String(),
|
|
jsonutils.NewString(subscription.DataBase).String(),
|
|
jsonutils.NewString(subscription.Rc).String(),
|
|
strings.ReplaceAll(jsonutils.NewString(subscription.Url).String(), "\"", "'"),
|
|
)
|
|
dataSource, err := datasource.GetDefaultSource("")
|
|
if err != nil {
|
|
return errors.Wrap(err, "s.GetDefaultSource")
|
|
}
|
|
|
|
db := influxdb.NewInfluxdbWithDebug(dataSource.Url, true)
|
|
db.SetDatabase(subscription.DataBase)
|
|
|
|
rtn, err := db.GetQuery(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, result := range rtn {
|
|
for _, obj := range result {
|
|
objJson := jsonutils.Marshal(&obj)
|
|
log.Errorln(objJson.String())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *SDataSourceManager) DropSubscription(subscription InfluxdbSubscription) error {
|
|
query := fmt.Sprintf("DROP SUBSCRIPTION %s ON %s.%s", jsonutils.NewString(subscription.SubName).String(),
|
|
jsonutils.NewString(subscription.DataBase).String(),
|
|
jsonutils.NewString(subscription.Rc).String(),
|
|
)
|
|
dataSource, err := datasource.GetDefaultSource("")
|
|
if err != nil {
|
|
return errors.Wrap(err, "s.GetDefaultSource")
|
|
}
|
|
|
|
db := influxdb.NewInfluxdb(dataSource.Url)
|
|
db.SetDatabase(subscription.DataBase)
|
|
rtn, err := db.Query(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, result := range rtn {
|
|
for _, obj := range result {
|
|
objJson := jsonutils.Marshal(&obj)
|
|
log.Errorln(objJson.String())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/*func getAttributesOnMeasurement(database, tp string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
|
|
query := fmt.Sprintf("SHOW %s KEYS ON %s FROM %s", tp, database, output.Measurement)
|
|
dbRtn, err := db.Query(query)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "SHOW MEASUREMENTS: %s", query)
|
|
}
|
|
if len(dbRtn) == 0 || len(dbRtn[0]) == 0 {
|
|
return nil
|
|
}
|
|
res := dbRtn[0][0]
|
|
tmpDict := jsonutils.NewDict()
|
|
tmpArr := jsonutils.NewArray()
|
|
for i := range res.Values {
|
|
v, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
|
|
if filterTagKey(v) {
|
|
continue
|
|
}
|
|
tmpArr.Add(res.Values[i][0])
|
|
}
|
|
tmpDict.Add(tmpArr, res.Columns[0])
|
|
err = tmpDict.Unmarshal(output)
|
|
if err != nil {
|
|
return errors.Wrap(err, "measurement unmarshal error")
|
|
}
|
|
return nil
|
|
}*/
|
|
|
|
func getTagValues(userCred mcclient.TokenCredential, output *monitor.InfluxMeasurement, timeF timeFilter, tagFilter *monitor.MetricQueryTag, skipCheckSeries bool) error {
|
|
mq := monitor.MetricQuery{
|
|
Database: output.Database,
|
|
Measurement: output.Measurement,
|
|
Selects: []monitor.MetricQuerySelect{
|
|
{
|
|
{
|
|
Type: "field",
|
|
Params: []string{output.FieldKey[0]},
|
|
},
|
|
{
|
|
Type: "last",
|
|
},
|
|
},
|
|
},
|
|
GroupBy: []monitor.MetricQueryPart{
|
|
{
|
|
Type: "field",
|
|
Params: []string{"*"},
|
|
},
|
|
},
|
|
}
|
|
if tagFilter != nil {
|
|
mq.Tags = []monitor.MetricQueryTag{
|
|
{
|
|
Key: tagFilter.Key,
|
|
Operator: tagFilter.Operator,
|
|
Value: tagFilter.Value,
|
|
},
|
|
}
|
|
}
|
|
|
|
aq := &monitor.AlertQuery{
|
|
Model: mq,
|
|
From: timeF.From,
|
|
To: timeF.To,
|
|
}
|
|
|
|
q := monitor.MetricQueryInput{
|
|
From: timeF.From,
|
|
To: timeF.To,
|
|
Interval: "3m",
|
|
MetricQuery: []*monitor.AlertQuery{
|
|
aq,
|
|
},
|
|
SkipCheckSeries: skipCheckSeries,
|
|
}
|
|
|
|
ret, err := doQuery(userCred, q)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "getTagValues query error %s", jsonutils.Marshal(q))
|
|
}
|
|
|
|
// 2. group tag and values
|
|
tagValMap := make(map[string][]string)
|
|
tagKeys := make([]string, 0)
|
|
if len(ret.Series) == 0 {
|
|
return nil
|
|
}
|
|
|
|
for _, s := range ret.Series {
|
|
tagMap := s.Tags
|
|
for key, valStr := range tagMap {
|
|
valStr = renderTagVal(valStr)
|
|
if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
|
|
continue
|
|
}
|
|
if filterTagKey(key) {
|
|
continue
|
|
}
|
|
if valArr, ok := tagValMap[key]; ok {
|
|
if !utils.IsInStringArray(valStr, valArr) {
|
|
tagValMap[key] = append(valArr, valStr)
|
|
}
|
|
continue
|
|
}
|
|
tagValMap[key] = []string{valStr}
|
|
tagKeys = append(tagKeys, key)
|
|
}
|
|
}
|
|
output.TagValue = tagValMap
|
|
sort.Strings(tagKeys)
|
|
output.TagKey = tagKeys
|
|
|
|
return nil
|
|
}
|
|
|
|
func getTagValue(database string, output *monitor.InfluxMeasurement, db *influxdb.SInfluxdb) error {
|
|
if len(output.TagKey) == 0 {
|
|
return nil
|
|
}
|
|
tagKeyStr := jsonutils.NewStringArray(output.TagKey).String()
|
|
tagKeyStr = tagKeyStr[1 : len(tagKeyStr)-1]
|
|
dbRtn, err := db.Query(fmt.Sprintf("SHOW TAG VALUES ON %s FROM %s WITH KEY IN (%s)", database, output.Measurement, tagKeyStr))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
res := dbRtn[0][0]
|
|
tagValue := make(map[string][]string, 0)
|
|
keys := strings.Join(output.TagKey, ",")
|
|
for i := range res.Values {
|
|
val, _ := res.Values[i][0].(*jsonutils.JSONString).GetString()
|
|
if !strings.Contains(keys, val) {
|
|
continue
|
|
}
|
|
if _, ok := tagValue[val]; !ok {
|
|
tagValue[val] = make([]string, 0)
|
|
}
|
|
tag, _ := res.Values[i][1].(*jsonutils.JSONString).GetString()
|
|
if filterTagValue(tag) {
|
|
delete(tagValue, val)
|
|
continue
|
|
}
|
|
tagValue[val] = append(tagValue[val], tag)
|
|
}
|
|
output.TagValue = tagValue
|
|
//TagKey == TagValue.keys
|
|
tagK := make([]string, 0)
|
|
for tag, _ := range output.TagValue {
|
|
tagK = append(tagK, tag)
|
|
}
|
|
output.TagKey = tagK
|
|
return nil
|
|
}
|
|
|
|
type influxdbTagValueChan struct {
|
|
rtnChan chan map[string][]string
|
|
count int
|
|
}
|
|
|
|
func (m *SDataSourceManager) getFilterMeasurementTagValue(tagValueChan *influxdbTagValueChan, from string,
|
|
to string, field string, tagKey string,
|
|
measurement monitor.InfluxMeasurement, db *influxdb.SInfluxdb, tagFilter string) error {
|
|
var buffer bytes.Buffer
|
|
buffer.WriteString(fmt.Sprintf(`SELECT last("%s") FROM "%s" WHERE %s `, field, measurement.Measurement,
|
|
renderTimeFilter(from, to)))
|
|
if len(tagFilter) != 0 {
|
|
buffer.WriteString(fmt.Sprintf(` AND %s `, tagFilter))
|
|
}
|
|
buffer.WriteString(fmt.Sprintf(` GROUP BY %q`, tagKey))
|
|
log.Errorln(buffer.String())
|
|
rtn, err := db.Query(buffer.String())
|
|
if err != nil {
|
|
return errors.Wrap(err, "getFilterMeasurementTagValue query error")
|
|
}
|
|
tagValMap := make(map[string][]string)
|
|
if len(rtn) != 0 && len(rtn[0]) != 0 {
|
|
for rtnIndex, _ := range rtn {
|
|
for serieIndex, _ := range rtn[rtnIndex] {
|
|
tagMap, _ := rtn[rtnIndex][serieIndex].Tags.GetMap()
|
|
for key, valObj := range tagMap {
|
|
valStr, _ := valObj.GetString()
|
|
valStr = renderTagVal(valStr)
|
|
if len(valStr) == 0 || valStr == "null" || filterTagValue(valStr) {
|
|
continue
|
|
}
|
|
if !utils.IsInStringArray(key, measurement.TagKey) {
|
|
//measurement.TagKey = append(measurement.TagKey, key)
|
|
continue
|
|
}
|
|
if valArr, ok := tagValMap[key]; ok {
|
|
if !utils.IsInStringArray(valStr, valArr) {
|
|
tagValMap[key] = append(valArr, valStr)
|
|
}
|
|
continue
|
|
}
|
|
tagValMap[key] = []string{valStr}
|
|
}
|
|
}
|
|
}
|
|
measurement.TagValue = tagValMap
|
|
}
|
|
tagValueChan.rtnChan <- tagValMap
|
|
return nil
|
|
}
|
|
|
|
func renderTagVal(val string) string {
|
|
return strings.ReplaceAll(val, "+", " ")
|
|
}
|
|
func floatEquals(a, b float64) bool {
|
|
eps := 0.000000001
|
|
if math.Abs(a-b) < eps {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
var filterKey = []string{"perf_instance", "res_type", "status", "cloudregion", "os_type", "is_vm"}
|
|
|
|
func filterTagKey(key string) bool {
|
|
whiteListIdKeys := sets.NewString("dev_id", "die_id")
|
|
if whiteListIdKeys.Has(key) {
|
|
return false
|
|
}
|
|
if strings.Contains(key, "_id") {
|
|
return true
|
|
}
|
|
if key == "perf_instance" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func filterTagValue(val string) bool {
|
|
if compile.MatchString(val) {
|
|
return true
|
|
}
|
|
return false
|
|
}
|