Merge pull request #8631 from zhaoxiangchun/bugfix/zxc-commonalert

modify monitor querycondition:
This commit is contained in:
Zexi Li
2020-11-04 11:07:50 +08:00
committed by GitHub
13 changed files with 170 additions and 71 deletions

View File

@@ -41,9 +41,10 @@ type CommonAlertCreateInput struct {
AlertType string `json:"alert_type"`
//scope Resource
Scope string `json:"scope"`
DomainId string `json:"domain_id"`
ProjectId string `json:"project_id"`
Scope string `json:"scope"`
DomainId string `json:"domain_id"`
ProjectId string `json:"project_id"`
GetPointStr bool `json:"get_point_str"`
}
type CommonMetricInputQuery struct {
@@ -88,6 +89,7 @@ type CommonAlertUpdateInput struct {
Recipients []string `json:"recipients"`
// systemalert policy may need update through operator
ForceUpdate bool `json:"force_update"`
GetPointStr bool `json:"get_point_str"`
}
type CommonAlertDetails struct {
@@ -117,4 +119,5 @@ type CommonAlertMetricDetails struct {
Filters []MetricQueryTag `json:"filters"`
FieldDescription MetricFieldDetail
FieldOpt string `json:"field_opt"`
GetPointStr bool `json:"get_point_str"`
}

View File

@@ -33,6 +33,7 @@ import (
"yunion.io/x/pkg/util/netutils"
"yunion.io/x/onecloud/pkg/hostman/guestman"
"yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
"yunion.io/x/onecloud/pkg/hostman/options"
"yunion.io/x/onecloud/pkg/util/httputils"
)
@@ -244,7 +245,8 @@ func (s *SGuestMonitorCollector) toTelegrafReportData(data *jsonutils.JSONDict)
for metrics, stat := range rs {
tags := map[string]string{
"vm_id": guestId, "vm_name": vmName, "vm_ip": vmIp,
"is_vm": "true", "brand": "OneCloud", "res_type": "guest",
"is_vm": "true", hostconsts.TELEGRAF_TAG_KEY_BRAND: hostconsts.TELEGRAF_TAG_ONECLOUD_BRAND,
hostconsts.TELEGRAF_TAG_KEY_RES_TYPE: "guest",
}
if len(scalingGroupId) > 0 {
tags["vm_scaling_group_id"] = scalingGroupId

View File

@@ -21,23 +21,23 @@ func (s *mathReducer) GetType() string {
return s.Type
}
func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
func (s *mathReducer) Reduce(series *tsdb.TimeSeries) (*float64, []string) {
if len(series.Points) == 0 {
return nil
return nil, nil
}
value := float64(0)
allNull := true
valArr := make([]string, 0)
switch s.Type {
case "avg":
validPointsCount := 0
for _, point := range series.Points {
if point.IsValid() {
if point.IsValids() {
values := point.Values()
tem, err := s.mathValue(values)
if err != nil {
return nil
return nil, valArr
}
value += tem
validPointsCount++
@@ -49,11 +49,11 @@ func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
}
case "sum":
for _, point := range series.Points {
if point.IsValid() {
if point.IsValids() {
values := point.Values()
tem, err := s.mathValue(values)
if err != nil {
return nil
return nil, valArr
}
value += tem
allNull = false
@@ -62,12 +62,12 @@ func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
case "min":
value = math.MaxFloat64
for _, point := range series.Points {
if point.IsValid() {
if point.IsValids() {
allNull = false
values := point.Values()
tem, err := s.mathValue(values)
if err != nil {
return nil
return nil, valArr
}
if value > tem {
value = tem
@@ -77,12 +77,12 @@ func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
case "max":
value = -math.MaxFloat64
for _, point := range series.Points {
if point.IsValid() {
if point.IsValids() {
allNull = false
values := point.Values()
tem, err := s.mathValue(values)
if err != nil {
return nil
return nil, valArr
}
if value < tem {
value = tem
@@ -95,11 +95,11 @@ func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
case "last":
points := series.Points
for i := len(points) - 1; i >= 0; i-- {
if points[i].IsValid() {
if points[i].IsValids() {
values := points[i].Values()
tem, err := s.mathValue(values)
if err != nil {
return nil
return nil, valArr
}
value = tem
allNull = false
@@ -109,12 +109,12 @@ func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
case "median":
var values []float64
for _, point := range series.Points {
if point.IsValid() {
if point.IsValids() {
allNull = false
values := point.Values()
tem, err := s.mathValue(values)
if err != nil {
return nil
return nil, valArr
}
values = append(values, tem)
}
@@ -134,7 +134,7 @@ func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
allNull, value = calculateDiff(series, allNull, value, percentDiff)
case "count_non_null":
for _, v := range series.Points {
if v.IsValid() {
if v.IsValids() {
value++
}
}
@@ -145,10 +145,10 @@ func (s *mathReducer) Reduce(series *tsdb.TimeSeries) *float64 {
}
if allNull {
return nil
return nil, valArr
}
return &value
return &value, valArr
}
func (reducer *mathReducer) mathValue(values []float64) (float64, error) {

View File

@@ -126,7 +126,7 @@ func (c *QueryCondition) Eval(context *alerting.EvalContext) (*alerting.Conditio
var alertOkmatches []*monitor.EvalMatch
for _, series := range seriesList {
reducedValue := c.Reducer.Reduce(series)
reducedValue, valStrArr := c.Reducer.Reduce(series)
evalMatch := c.Evaluator.Eval(reducedValue)
if reducedValue == nil {
@@ -148,14 +148,14 @@ func (c *QueryCondition) Eval(context *alerting.EvalContext) (*alerting.Conditio
meta = &metas[0]
}
if evalMatch {
match, err := c.NewEvalMatch(context, *series, meta, reducedValue)
match, err := c.NewEvalMatch(context, *series, meta, reducedValue, valStrArr)
if err != nil {
return nil, errors.Wrap(err, "NewEvalMatch error")
}
matches = append(matches, match)
}
if reducedValue != nil && !evalMatch {
match, err := c.NewEvalMatch(context, *series, meta, reducedValue)
match, err := c.NewEvalMatch(context, *series, meta, reducedValue, valStrArr)
if err != nil {
return nil, errors.Wrap(err, "NewEvalMatch error")
}
@@ -193,7 +193,7 @@ func (c *QueryCondition) Eval(context *alerting.EvalContext) (*alerting.Conditio
}
func (c *QueryCondition) NewEvalMatch(context *alerting.EvalContext, series tsdb.TimeSeries,
meta *tsdb.QueryResultMeta, value *float64) (*monitor.EvalMatch, error) {
meta *tsdb.QueryResultMeta, value *float64, valStrArr []string) (*monitor.EvalMatch, error) {
evalMatch := new(monitor.EvalMatch)
alert, err := models.CommonAlertManager.GetAlert(context.Rule.Id)
if err != nil {
@@ -218,12 +218,28 @@ func (c *QueryCondition) NewEvalMatch(context *alerting.EvalContext, series tsdb
//evalMatch.Condition = c.GenerateFormatCond(meta, queryKeyInfo).String()
evalMatch.Tags = c.filterTags(series.Tags, *alertDetails)
evalMatch.Value = value
evalMatch.ValueStr = c.RationalizeValueFromUnit(*value, alertDetails.FieldDescription.Unit, alertDetails.FieldOpt)
evalMatch.ValueStr = c.RationalizeValueFromUnit(*value, alertDetails.FieldDescription.Unit,
alertDetails.FieldOpt)
if alertDetails.GetPointStr {
evalMatch.ValueStr = c.jointPointStr(series, evalMatch.ValueStr, valStrArr)
}
evalMatch.MeasurementDesc = alertDetails.MeasurementDisplayName
evalMatch.FieldDesc = alertDetails.FieldDescription.DisplayName
return evalMatch, nil
}
func (c *QueryCondition) jointPointStr(series tsdb.TimeSeries, value string, valStrArr []string) string {
str := ""
for i := 0; i < len(valStrArr); i++ {
if i == 0 {
str = fmt.Sprintf("%s=%s", series.Columns[i], value)
continue
}
str = fmt.Sprintf("%s,%s=%s", str, series.Columns[i], valStrArr[i])
}
return str
}
var fileSize = []string{"bps", "Bps", "byte"}
func (c *QueryCondition) RationalizeValueFromUnit(value float64, unit string, opt string) string {

View File

@@ -27,7 +27,7 @@ import (
)
type Reducer interface {
Reduce(series *tsdb.TimeSeries) *float64
Reduce(series *tsdb.TimeSeries) (*float64, []string)
GetType() string
}
@@ -42,14 +42,14 @@ func (s *queryReducer) GetType() string {
return s.Type
}
func (s *queryReducer) Reduce(series *tsdb.TimeSeries) *float64 {
func (s *queryReducer) Reduce(series *tsdb.TimeSeries) (*float64, []string) {
if len(series.Points) == 0 {
return nil
return nil, nil
}
value := float64(0)
allNull := true
valArr := make([]string, 0)
switch s.Type {
case "avg":
validPointsCount := 0
@@ -98,6 +98,7 @@ func (s *queryReducer) Reduce(series *tsdb.TimeSeries) *float64 {
for i := len(points) - 1; i >= 0; i-- {
if points[i].IsValid() {
value = points[i].Value()
valArr = points[i].PointValueStr()
allNull = false
break
}
@@ -136,10 +137,10 @@ func (s *queryReducer) Reduce(series *tsdb.TimeSeries) *float64 {
}
if allNull {
return nil
return nil, nil
}
return &value
return &value, valArr
}
func newSimpleReducer(t string) *queryReducer {

View File

@@ -78,7 +78,7 @@ func TestSimpleReducer(t *testing.T) {
series.Points = append(series.Points, tsdb.NewTimePointByVal(2, 5))
series.Points = append(series.Points, tsdb.NewTimePointByVal(3, 6))
result := reducer.Reduce(series)
result, _ := reducer.Reduce(series)
So(result, ShouldNotBeNil)
So(*result, ShouldEqual, 2)
})
@@ -99,9 +99,9 @@ func TestSimpleReducer(t *testing.T) {
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 2))
series.Points = append(series.Points, tsdb.NewTimePointByVal(3, 3))
series.Points = append(series.Points, tsdb.NewTimePointByVal(4, 4))
So(reducer.Reduce(series), ShouldNotBeNil)
So(*reducer.Reduce(series), ShouldEqual, 2)
reduce, _ := reducer.Reduce(series)
So(reduce, ShouldNotBeNil)
So(*reduce, ShouldEqual, 2)
})
Convey("with null values", func() {
@@ -112,8 +112,8 @@ func TestSimpleReducer(t *testing.T) {
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 1))
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 2))
So(reducer.Reduce(series), ShouldBeNil)
reduce, _ := reducer.Reduce(series)
So(reduce, ShouldBeNil)
})
})
@@ -127,8 +127,8 @@ func TestSimpleReducer(t *testing.T) {
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 2))
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 3))
series.Points = append(series.Points, tsdb.NewTimePointByVal(3, 4))
So(*reduer.Reduce(series), ShouldEqual, 3)
reduce, _ := reduer.Reduce(series)
So(*reduce, ShouldEqual, 3)
})
Convey("diff one point", func() {
@@ -154,8 +154,8 @@ func TestSimpleReducer(t *testing.T) {
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 1))
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 2))
So(reducer.Reduce(series), ShouldBeNil)
reduce, _ := reducer.Reduce(series)
So(reduce, ShouldBeNil)
})
Convey("percent_diff one point", func() {
@@ -181,8 +181,8 @@ func TestSimpleReducer(t *testing.T) {
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 1))
series.Points = append(series.Points, tsdb.NewTimePoint(nil, 2))
So(reducer.Reduce(series), ShouldBeNil)
reduce, _ := reducer.Reduce(series)
So(reduce, ShouldBeNil)
})
})
}
@@ -197,6 +197,6 @@ func testReducer(reducerType string, datapoints ...float64) float64 {
val := datapoints[idx]
serires.Points = append(serires.Points, tsdb.NewTimePoint(&val, 1234134))
}
return *reducer.Reduce(serires)
reduce, _ := reducer.Reduce(serires)
return *reduce
}

View File

@@ -32,9 +32,9 @@ func NewSuggestRuleReducer(t string, duration time.Duration) Reducer {
}
}
func (s *suggestRuleReducer) Reduce(series *tsdb.TimeSeries) *float64 {
func (s *suggestRuleReducer) Reduce(series *tsdb.TimeSeries) (*float64, []string) {
if int(s.duration.Seconds()) > len(series.Points) {
return nil
return nil, nil
}
return s.queryReducer.Reduce(series)
}

View File

@@ -36,14 +36,21 @@ func (drvF *nodeDriverF) GetType() monitor.AlertResourceType {
func (drvF *nodeDriverF) IsEvalMatched(input monitor.EvalMatch) bool {
tags := input.Tags
_, hasResType := tags[hostconsts.TELEGRAF_TAG_KEY_RES_TYPE]
resType, hasResType := tags[hostconsts.TELEGRAF_TAG_KEY_RES_TYPE]
if !hasResType {
return false
}
_, hasHostType := tags[hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE]
if resType != hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE {
return false
}
hostType, hasHostType := tags[hostconsts.TELEGRAF_TAG_KEY_HOST_TYPE]
if !hasHostType {
return false
}
if hostType != hostconsts.TELEGRAF_TAG_ONECLOUD_HOST_TYPE_HOST ||
hostType != hostconsts.TELEGRAF_TAG_ONECLOUD_HOST_TYPE_CONTROLLER {
return false
}
_, hasHost := tags[NODE_TAG_HOST_KEY]
if !hasHost {
return false

View File

@@ -17,6 +17,7 @@ import (
"yunion.io/x/onecloud/pkg/apis"
"yunion.io/x/onecloud/pkg/apis/monitor"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts"
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
merrors "yunion.io/x/onecloud/pkg/monitor/errors"
@@ -28,6 +29,7 @@ import (
const (
CommonAlertMetadataAlertType = "alert_type"
CommonAlertMetadataFieldOpt = "field_opt"
CommonAlertMetadataPointStr = "point_str"
)
var (
@@ -187,6 +189,14 @@ func (alert *SCommonAlert) getFieldOpt() string {
return alert.GetMetadata(CommonAlertMetadataFieldOpt, nil)
}
func (alert *SCommonAlert) setPointStr(ctx context.Context, userCred mcclient.TokenCredential, fieldOpt string) error {
return alert.SetMetadata(ctx, CommonAlertMetadataPointStr, fieldOpt, userCred)
}
func (alert *SCommonAlert) getPointStr() string {
return alert.GetMetadata(CommonAlertMetadataPointStr, nil)
}
func (alert *SCommonAlert) CustomizeCreate(
ctx context.Context, userCred mcclient.TokenCredential,
ownerId mcclient.IIdentityProvider,
@@ -279,6 +289,9 @@ func (alert *SCommonAlert) PostCreate(ctx context.Context,
if fieldOpt != "" {
alert.setFieldOpt(ctx, userCred, fieldOpt)
}
if input.GetPointStr {
alert.setPointStr(ctx, userCred, strconv.FormatBool(input.GetPointStr))
}
_, err := alert.PerformSetScope(ctx, userCred, query, data)
if err != nil {
log.Errorln(errors.Wrap(err, "Alert PerformSetScope"))
@@ -487,6 +500,11 @@ func (alert *SCommonAlert) GetCommonAlertMetricDetailsFromAlertCondition(index i
if fieldOpt != "" {
metricDetails.FieldOpt = strings.Split(fieldOpt, "+")[index]
}
poinStr := alert.getPointStr()
if len(poinStr) != 0 {
bl, _ := strconv.ParseBool(poinStr)
metricDetails.GetPointStr = bl
}
getCommonAlertMetricDetailsFromCondition(cond, metricDetails)
return metricDetails
}
@@ -530,17 +548,8 @@ func getCommonAlertMetricDetailsFromCondition(cond *monitor.AlertCondition,
break
}
}
newQueryTags := make([]monitor.MetricQueryTag, 0)
for i, tagFilter := range q.Model.Tags {
if tagFilter.Key == "tenant_id" {
continue
}
if tagFilter.Key == "domain_id" {
continue
}
newQueryTags = append(newQueryTags, q.Model.Tags[i])
}
cond.Query.Model.Tags = newQueryTags
cond.Query.Model.Tags = filterDefaultTags(q.Model.Tags)
metricDetails.Measurement = measurement
metricDetails.Field = field
metricDetails.DB = db
@@ -551,6 +560,23 @@ func getCommonAlertMetricDetailsFromCondition(cond *monitor.AlertCondition,
getMetricDescriptionDetails(metricDetails)
}
func filterDefaultTags(queryTag []monitor.MetricQueryTag) []monitor.MetricQueryTag {
newQueryTags := make([]monitor.MetricQueryTag, 0)
for i, tagFilter := range queryTag {
if tagFilter.Key == "tenant_id" {
continue
}
if tagFilter.Key == "domain_id" {
continue
}
if tagFilter.Key == hostconsts.TELEGRAF_TAG_KEY_RES_TYPE {
continue
}
newQueryTags = append(newQueryTags, queryTag[i])
}
return newQueryTags
}
func getMetricDescriptionDetails(metricDetails *monitor.CommonAlertMetricDetails) {
influxdbMeasurements := DataSourceManager.getMetricDescriptions([]monitor.InfluxMeasurement{
{Measurement: metricDetails.Measurement},
@@ -751,6 +777,9 @@ func (alert *SCommonAlert) PostUpdate(
log.Errorln(errors.Wrap(err, "Alert PerformSetScope"))
}
}
if updateInput.GetPointStr {
alert.setPointStr(ctx, userCred, strconv.FormatBool(updateInput.GetPointStr))
}
CommonAlertManager.SetSubscriptionAlert(alert)
}

View File

@@ -300,8 +300,8 @@ func setDefaultValue(query *monitor.AlertQuery, inputQuery *monitor.MetricInputQ
})
}
metricMeasurement, _ := MetricMeasurementManager.GetCache().Get(query.Model.Measurement)
if query.Model.Database == "" {
metricMeasurement, _ := MetricMeasurementManager.GetCache().Get(query.Model.Measurement)
database := ""
if metricMeasurement == nil {
log.Warningf("Not found measurement %s from metrics measurement cache", query.Model.Measurement)
@@ -362,6 +362,14 @@ func setDefaultValue(query *monitor.AlertQuery, inputQuery *monitor.MetricInputQ
})
}
}
if metricMeasurement.ResType == hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE {
query.Model.Tags = append(query.Model.Tags, monitor.MetricQueryTag{
Key: hostconsts.TELEGRAF_TAG_KEY_RES_TYPE,
Operator: "=",
Value: hostconsts.TELEGRAF_TAG_ONECLOUD_RES_TYPE,
Condition: "and",
})
}
}
func setDataSourceId(query *monitor.AlertQuery) {

View File

@@ -204,7 +204,7 @@ func (self *SSubscriptionManager) Eval(details monitor.CommonAlertMetricDetails,
if err != nil {
return false, nil, err
}
reduceValue := reducer.Reduce(serie)
reduceValue, _ := reducer.Reduce(serie)
evalCond := monitor.Condition{
Type: getQueryEvalType(details.Comparator),

View File

@@ -74,16 +74,19 @@ func (rp *ResponseParser) transformRowsV2(rows []Row, queryResult *tsdb.QueryRes
var result tsdb.TimeSeriesSlice
for _, row := range rows {
col := ""
columns := make([]string, 0)
for _, column := range row.Columns {
if column == "time" {
continue
}
columns = append(columns, column)
if col == "" {
col = column
continue
}
col = fmt.Sprintf("%s-%s", col, column)
}
columns = append(columns, "time")
var points tsdb.TimeSeriesPoints
for _, valuePair := range row.Values {
point, err := rp.parseTimepointV2(valuePair)
@@ -92,9 +95,10 @@ func (rp *ResponseParser) transformRowsV2(rows []Row, queryResult *tsdb.QueryRes
}
}
result = append(result, &tsdb.TimeSeries{
Name: rp.formatSerieName(row, col, query),
Points: points,
Tags: row.Tags,
Name: rp.formatSerieName(row, col, query),
Columns: columns,
Points: points,
Tags: row.Tags,
})
}
@@ -233,6 +237,5 @@ func (rp *ResponseParser) parseValueV2(value interface{}) interface{} {
ret := float64(ivalue)
return &ret
}
return nil
return number.String()
}

View File

@@ -14,7 +14,11 @@
package tsdb
import api "yunion.io/x/onecloud/pkg/apis/monitor"
import (
"strconv"
api "yunion.io/x/onecloud/pkg/apis/monitor"
)
type TsdbQuery struct {
TimeRange *TimeRange
@@ -51,6 +55,7 @@ type QueryResult struct {
type TimeSeries struct {
RawName string `json:"raw_name"`
Columns []string `json:"columns"`
Name string `json:"name"`
Points TimeSeriesPoints `json:"points"`
Tags map[string]string `json:"tags,omitempty"`
@@ -85,6 +90,14 @@ func NewTimePointByVal(value float64, timestamp float64) TimePoint {
}
func (p TimePoint) IsValid() bool {
if val, ok := p[0].(*float64); ok && val != nil {
return true
}
return false
//return p[0].(*float64) != nil
}
func (p TimePoint) IsValids() bool {
for i := 0; i < len(p)-1; i++ {
if p[i] == nil {
return false
@@ -94,7 +107,6 @@ func (p TimePoint) IsValid() bool {
}
}
return true
//return p[0].(*float64) != nil
}
func (p TimePoint) Value() float64 {
@@ -111,7 +123,25 @@ func (p TimePoint) Values() []float64 {
values = append(values, *(p[i].(*float64)))
}
return values
}
func (p TimePoint) PointValueStr() []string {
arrStr := make([]string, 0)
for i := 0; i < len(p)-1; i++ {
if p[i] == nil {
arrStr = append(arrStr, "")
}
if fval, ok := p[i].(*float64); ok {
arrStr = append(arrStr, strconv.FormatFloat((*fval), 'f', -1, 64))
continue
}
if ival, ok := p[i].(*int64); ok {
arrStr = append(arrStr, strconv.FormatInt((*ival), 64))
continue
}
arrStr = append(arrStr, p[i].(string))
}
return arrStr
}
func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints {