mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-06-08 07:14:19 +08:00
827 lines
27 KiB
Go
827 lines
27 KiB
Go
// Copyright 2019 Yunion
|
||
//
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
|
||
package models
|
||
|
||
import (
|
||
"context"
|
||
"reflect"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"yunion.io/x/jsonutils"
|
||
"yunion.io/x/log"
|
||
"yunion.io/x/pkg/errors"
|
||
"yunion.io/x/pkg/util/rbacscope"
|
||
"yunion.io/x/pkg/utils"
|
||
"yunion.io/x/sqlchemy"
|
||
|
||
"yunion.io/x/onecloud/pkg/apihelper"
|
||
"yunion.io/x/onecloud/pkg/apis/monitor"
|
||
"yunion.io/x/onecloud/pkg/cloudcommon/db"
|
||
"yunion.io/x/onecloud/pkg/httperrors"
|
||
"yunion.io/x/onecloud/pkg/mcclient"
|
||
"yunion.io/x/onecloud/pkg/mcclient/auth"
|
||
"yunion.io/x/onecloud/pkg/util/stringutils2"
|
||
)
|
||
|
||
var (
|
||
MonitorResourceManager *SMonitorResourceManager
|
||
)
|
||
|
||
// validateTopQueryInput 验证 TopQueryInput 参数并返回解析后的值
|
||
func validateTopQueryInput(input monitor.TopQueryInput) (startTime time.Time, endTime time.Time, top int, err error) {
|
||
startTime = input.StartTime
|
||
endTime = input.EndTime
|
||
if startTime.IsZero() || endTime.IsZero() {
|
||
return time.Time{}, time.Time{}, 0, httperrors.NewInputParameterError("start_time and end_time must be specified")
|
||
}
|
||
if startTime.After(endTime) {
|
||
return time.Time{}, time.Time{}, 0, httperrors.NewInputParameterError("start_time must be before end_time")
|
||
}
|
||
top = *input.Top
|
||
if top <= 0 {
|
||
top = 5 // 默认返回 top 5
|
||
}
|
||
return startTime, endTime, top, nil
|
||
}
|
||
|
||
type IMonitorResourceCache interface {
|
||
Get(resId string) (jsonutils.JSONObject, bool)
|
||
}
|
||
|
||
type sMonitorResourceCache struct {
|
||
length int
|
||
sync.Map
|
||
}
|
||
|
||
func (c *sMonitorResourceCache) set(resId string, obj jsonutils.JSONObject) {
|
||
c.Store(resId, obj)
|
||
c.length++
|
||
}
|
||
|
||
func (c *sMonitorResourceCache) remove(resId string) {
|
||
c.Delete(resId)
|
||
c.length--
|
||
}
|
||
|
||
func (c *sMonitorResourceCache) Get(resId string) (jsonutils.JSONObject, bool) {
|
||
obj, ok := c.Load(resId)
|
||
if !ok {
|
||
return nil, false
|
||
}
|
||
return obj.(jsonutils.JSONObject), true
|
||
}
|
||
|
||
func init() {
|
||
MonitorResourceManager = &SMonitorResourceManager{
|
||
SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
|
||
&SMonitorResource{},
|
||
"monitor_resource_tbl",
|
||
"monitorresource",
|
||
"monitorresources",
|
||
),
|
||
monitorResModelSets: NewModelSets(),
|
||
}
|
||
MonitorResourceManager.SetVirtualObject(MonitorResourceManager)
|
||
RegistryResourceSync(NewGuestResourceSync())
|
||
RegistryResourceSync(NewHostResourceSync())
|
||
RegistryResourceSync(NewRdsResourceSync())
|
||
RegistryResourceSync(NewRedisResourceSync())
|
||
RegistryResourceSync(NewOssResourceSync())
|
||
RegistryResourceSync(NewAccountResourceSync())
|
||
RegistryResourceSync(NewStorageResourceSync())
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) GetModelSets() *MonitorResModelSets {
|
||
return manager.monitorResModelSets
|
||
}
|
||
|
||
// +onecloud:swagger-gen-model-singular=monitorresource
|
||
// +onecloud:swagger-gen-model-plural=monitorresources
|
||
type SMonitorResourceManager struct {
|
||
db.SVirtualResourceBaseManager
|
||
db.SEnabledResourceBaseManager
|
||
|
||
monitorResModelSets *MonitorResModelSets
|
||
apih *apihelper.APIHelper
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) SetAPIHelper(h *apihelper.APIHelper) {
|
||
if manager.apih != nil {
|
||
panic("MonitorResourceManager's apihelper already set")
|
||
}
|
||
manager.apih = h
|
||
}
|
||
|
||
type SMonitorResource struct {
|
||
db.SVirtualResourceBase
|
||
db.SEnabledResourceBase
|
||
|
||
AlertState string `width:"36" charset:"ascii" list:"user" default:"init" update:"user" json:"alert_state"`
|
||
ResId string `width:"256" charset:"ascii" index:"true" list:"user" update:"user" json:"res_id"`
|
||
ResType string `width:"36" charset:"ascii" list:"user" update:"user" json:"res_type"`
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) GetMonitorResources(input monitor.MonitorResourceListInput) ([]SMonitorResource, error) {
|
||
monitorResources := make([]SMonitorResource, 0)
|
||
query := manager.Query()
|
||
if input.OnlyResId {
|
||
query = query.AppendField(query.Field("id"), query.Field("res_id"))
|
||
}
|
||
query = manager.FieldListFilter(query, input)
|
||
err := db.FetchModelObjects(manager, query, &monitorResources)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "SMonitorResourceManager FetchModelObjects err")
|
||
}
|
||
return monitorResources, nil
|
||
}
|
||
|
||
func (man *SMonitorResourceManager) GetMonitorResourceByResId(id string) (*SMonitorResource, error) {
|
||
resources, err := MonitorResourceManager.GetMonitorResources(monitor.MonitorResourceListInput{ResId: []string{id}})
|
||
if err != nil {
|
||
return nil, errors.Wrapf(err, "SMonitorResourceManager GetMonitorResources by resId: %s", id)
|
||
}
|
||
if len(resources) == 0 {
|
||
return nil, errors.Errorf("SMonitorResourceManager GetMonitorResources by resId: %s not found", id)
|
||
}
|
||
return &resources[0], nil
|
||
}
|
||
|
||
type SdeleteRes struct {
|
||
resType string
|
||
notIn []string
|
||
in []string
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) DeleteMonitorResources(ctx context.Context, userCred mcclient.TokenCredential, input SdeleteRes) error {
|
||
monitorResources := make([]SMonitorResource, 0)
|
||
errs := make([]error, 0)
|
||
query := manager.Query()
|
||
if len(input.notIn) != 0 {
|
||
query.NotIn("res_id", input.notIn)
|
||
}
|
||
if len(input.in) != 0 {
|
||
query.In("res_id", input.in)
|
||
}
|
||
if len(input.resType) != 0 {
|
||
query.Equals("res_type", input.resType)
|
||
}
|
||
err := db.FetchModelObjects(manager, query, &monitorResources)
|
||
if err != nil {
|
||
return errors.Wrap(err, "SMonitorResourceManager FetchModelObjects when DeleteMonitorResources err")
|
||
}
|
||
for _, res := range monitorResources {
|
||
err := (&res).RealDelete(ctx, userCred)
|
||
if err != nil {
|
||
errs = append(errs, errors.Wrapf(err, "delete monitorResource:%s err", res.GetId()))
|
||
}
|
||
}
|
||
if len(errs) != 0 {
|
||
return errors.NewAggregate(errs)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) GetMonitorResourceById(id string) (*SMonitorResource, error) {
|
||
iModel, err := db.FetchById(manager, id)
|
||
if err != nil {
|
||
return nil, errors.Wrapf(err, "GetMonitorResourceById:%s err", id)
|
||
}
|
||
return iModel.(*SMonitorResource), nil
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) ListItemFilter(
|
||
ctx context.Context, q *sqlchemy.SQuery,
|
||
userCred mcclient.TokenCredential,
|
||
query monitor.MonitorResourceListInput,
|
||
) (*sqlchemy.SQuery, error) {
|
||
// 如果指定了时间段和 top 参数,执行特殊的 top 查询
|
||
if query.Top != nil {
|
||
return manager.getTopResourcesByAlertCount(ctx, q, userCred, query)
|
||
}
|
||
|
||
var err error
|
||
q, err = manager.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VirtualResourceListInput)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
|
||
}
|
||
q, err = manager.SEnabledResourceBaseManager.ListItemFilter(ctx, q, userCred, query.EnabledResourceBaseListInput)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "SEnabledResourceBaseManager.ListItemFilter")
|
||
}
|
||
|
||
q = manager.FieldListFilter(q, query)
|
||
|
||
return q, nil
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) FieldListFilter(q *sqlchemy.SQuery, query monitor.MonitorResourceListInput) *sqlchemy.SQuery {
|
||
if len(query.ResType) != 0 {
|
||
q.Equals("res_type", query.ResType)
|
||
}
|
||
if len(query.ResId) != 0 {
|
||
q.In("res_id", query.ResId)
|
||
}
|
||
if len(query.ResName) != 0 {
|
||
q = q.Filter(sqlchemy.OR(
|
||
sqlchemy.Contains(q.Field("name"), query.ResName),
|
||
sqlchemy.Equals(q.Field("res_id"), query.ResName),
|
||
))
|
||
}
|
||
if len(query.AlertStates) != 0 {
|
||
q.In("alert_state", query.AlertStates)
|
||
}
|
||
return q
|
||
}
|
||
|
||
func (man *SMonitorResourceManager) OrderByExtraFields(
|
||
ctx context.Context,
|
||
q *sqlchemy.SQuery,
|
||
userCred mcclient.TokenCredential,
|
||
input monitor.MonitorResourceListInput,
|
||
) (*sqlchemy.SQuery, error) {
|
||
var err error
|
||
q, err = man.SVirtualResourceBaseManager.OrderByExtraFields(ctx, q, userCred, input.VirtualResourceListInput)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.OrderByExtraFields")
|
||
}
|
||
return q, nil
|
||
}
|
||
|
||
// getTopResourcesByAlertCount 查询指定时间段内报警数量最多的 top N 资源
|
||
func (man *SMonitorResourceManager) getTopResourcesByAlertCount(
|
||
ctx context.Context,
|
||
q *sqlchemy.SQuery,
|
||
userCred mcclient.TokenCredential,
|
||
query monitor.MonitorResourceListInput,
|
||
) (*sqlchemy.SQuery, error) {
|
||
// 验证时间段和 top 参数
|
||
startTime, endTime, top, err := validateTopQueryInput(query.TopQueryInput)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 查询指定时间段内的 AlertRecord
|
||
recordQuery := AlertRecordManager.Query("res_ids", "res_type")
|
||
recordQuery = recordQuery.GE("created_at", startTime).LE("created_at", endTime)
|
||
recordQuery = recordQuery.IsNotNull("res_type").IsNotEmpty("res_type")
|
||
recordQuery = recordQuery.IsNotEmpty("res_ids")
|
||
|
||
// 如果指定了 ResType,添加过滤条件
|
||
if len(query.ResType) > 0 {
|
||
recordQuery = recordQuery.Equals("res_type", query.ResType)
|
||
}
|
||
|
||
// 应用权限过滤 - 使用 FilterByOwner 方法
|
||
// 从 query 中获取 scope,如果没有则使用默认值
|
||
scope := rbacscope.ScopeSystem
|
||
if len(query.VirtualResourceListInput.Scope) > 0 {
|
||
scope = rbacscope.TRbacScope(query.VirtualResourceListInput.Scope)
|
||
}
|
||
recordQuery = AlertRecordManager.SMonitorScopedResourceManager.FilterByOwner(
|
||
ctx, recordQuery, AlertRecordManager, userCred, userCred, scope)
|
||
|
||
// 执行查询获取所有记录
|
||
type RecordRow struct {
|
||
ResIds string
|
||
ResType string
|
||
}
|
||
rows := make([]RecordRow, 0)
|
||
err = recordQuery.All(&rows)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "query alert records")
|
||
}
|
||
|
||
// 统计每个资源的报警数量
|
||
resourceAlertCount := make(map[string]int)
|
||
for _, row := range rows {
|
||
if len(row.ResIds) == 0 {
|
||
continue
|
||
}
|
||
// 解析 res_ids(逗号分隔)
|
||
resIds := strings.Split(row.ResIds, ",")
|
||
for _, resId := range resIds {
|
||
resId = strings.TrimSpace(resId)
|
||
if len(resId) > 0 {
|
||
// 如果指定了 ResType,需要匹配 res_type
|
||
if len(query.ResType) > 0 && row.ResType != query.ResType {
|
||
continue
|
||
}
|
||
resourceAlertCount[resId]++
|
||
}
|
||
}
|
||
}
|
||
|
||
// 转换为切片并按报警数量排序
|
||
type ResourceCount struct {
|
||
ResId string
|
||
Count int
|
||
}
|
||
resourceCounts := make([]ResourceCount, 0, len(resourceAlertCount))
|
||
for resId, count := range resourceAlertCount {
|
||
resourceCounts = append(resourceCounts, ResourceCount{
|
||
ResId: resId,
|
||
Count: count,
|
||
})
|
||
}
|
||
|
||
// 按报警数量降序排序
|
||
for i := 0; i < len(resourceCounts)-1; i++ {
|
||
for j := i + 1; j < len(resourceCounts); j++ {
|
||
if resourceCounts[i].Count < resourceCounts[j].Count {
|
||
resourceCounts[i], resourceCounts[j] = resourceCounts[j], resourceCounts[i]
|
||
}
|
||
}
|
||
}
|
||
|
||
// 获取 top N 的资源 ID
|
||
topResIds := make([]string, 0, top)
|
||
for i := 0; i < top && i < len(resourceCounts); i++ {
|
||
topResIds = append(topResIds, resourceCounts[i].ResId)
|
||
}
|
||
|
||
if len(topResIds) == 0 {
|
||
// 如果没有找到任何记录,返回空查询
|
||
return q.FilterByFalse(), nil
|
||
}
|
||
|
||
// 用 top res_id 过滤 MonitorResource 查询
|
||
q, err = man.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VirtualResourceListInput)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
q, err = man.SEnabledResourceBaseManager.ListItemFilter(ctx, q, userCred, query.EnabledResourceBaseListInput)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
q = man.FieldListFilter(q, query)
|
||
q = q.In("res_id", topResIds)
|
||
|
||
return q, nil
|
||
}
|
||
|
||
func (man *SMonitorResourceManager) HasName() bool {
|
||
return false
|
||
}
|
||
|
||
func (man *SMonitorResourceManager) ValidateCreateData(
|
||
ctx context.Context, userCred mcclient.TokenCredential,
|
||
ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject,
|
||
data monitor.MonitorResourceCreateInput) (monitor.MonitorResourceCreateInput, error) {
|
||
//rule 查询到资源信息后没有将资源id,进行转换
|
||
if len(data.ResId) == 0 {
|
||
return data, httperrors.NewInputParameterError("not found res_id %q", data.ResId)
|
||
}
|
||
if len(data.ResType) == 0 {
|
||
return data, httperrors.NewInputParameterError("not found res_type %q", data.ResType)
|
||
}
|
||
return data, nil
|
||
}
|
||
|
||
func (self *SMonitorResource) CustomizeCreate(ctx context.Context, userCred mcclient.TokenCredential,
|
||
ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
|
||
return nil
|
||
}
|
||
|
||
func (man *SMonitorResourceManager) FetchCustomizeColumns(
|
||
ctx context.Context,
|
||
userCred mcclient.TokenCredential,
|
||
query jsonutils.JSONObject,
|
||
objs []interface{},
|
||
fields stringutils2.SSortedStrings,
|
||
isList bool,
|
||
) []monitor.MonitorResourceDetails {
|
||
rows := make([]monitor.MonitorResourceDetails, len(objs))
|
||
virtRows := man.SVirtualResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
|
||
resIds := make([]string, len(objs))
|
||
for i := range rows {
|
||
rows[i] = monitor.MonitorResourceDetails{
|
||
VirtualResourceDetails: virtRows[i],
|
||
}
|
||
mr := objs[i].(*SMonitorResource)
|
||
resIds[i] = mr.ResId
|
||
_, object := MonitorResourceManager.GetResourceObj(mr.ResId)
|
||
if object != nil {
|
||
object.Unmarshal(&rows[i])
|
||
}
|
||
}
|
||
sq := MonitorResourceAlertManager.Query().In("monitor_resource_id", resIds).SubQuery()
|
||
q := sq.Query(
|
||
sq.Field("monitor_resource_id"),
|
||
sqlchemy.COUNT("count", sq.Field("row_id")),
|
||
).GroupBy(sq.Field("monitor_resource_id"))
|
||
mrs := []struct {
|
||
MonitorResourceId string
|
||
Count int64
|
||
}{}
|
||
err := q.All(&mrs)
|
||
if err != nil {
|
||
log.Errorf("query monitor resource alert error: %v", err)
|
||
return rows
|
||
}
|
||
mrMap := make(map[string]int64)
|
||
for _, mr := range mrs {
|
||
mrMap[mr.MonitorResourceId] = mr.Count
|
||
}
|
||
for i := range rows {
|
||
rows[i].AttachAlertCount = mrMap[resIds[i]]
|
||
}
|
||
return rows
|
||
}
|
||
|
||
func (self *SMonitorResource) AttachAlert(ctx context.Context, userCred mcclient.TokenCredential, alertId string, metric string, match monitor.EvalMatch) (*SMonitorResourceAlert, error) {
|
||
iModel, _ := db.NewModelObject(MonitorResourceAlertManager)
|
||
input := monitor.MonitorResourceJointCreateInput{
|
||
MonitorResourceId: self.ResId,
|
||
AlertId: alertId,
|
||
AlertState: monitor.MONITOR_RESOURCE_ALERT_STATUS_ATTACH,
|
||
Metric: metric,
|
||
Data: match,
|
||
}
|
||
data := input.JSON(&input)
|
||
err := data.Unmarshal(iModel)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "MonitorResourceJointCreateInput unmarshal to joint err")
|
||
}
|
||
if err := MonitorResourceAlertManager.TableSpec().Insert(ctx, iModel); err != nil {
|
||
return nil, errors.Wrap(err, "insert MonitorResourceJoint model err")
|
||
}
|
||
return iModel.(*SMonitorResourceAlert), nil
|
||
}
|
||
|
||
func (self *SMonitorResource) UpdateAlertState() error {
|
||
joints, _ := MonitorResourceAlertManager.GetJoinsByListInput(monitor.MonitorResourceJointListInput{MonitorResourceId: self.ResId})
|
||
jointState := monitor.MONITOR_RESOURCE_ALERT_STATUS_ATTACH
|
||
if len(joints) == 0 {
|
||
jointState = monitor.MONITOR_RESOURCE_ALERT_STATUS_INIT
|
||
}
|
||
for _, joint := range joints {
|
||
if joint.AlertState == monitor.MONITOR_RESOURCE_ALERT_STATUS_ALERTING && time.Now().Sub(joint.
|
||
TriggerTime) < time.Minute*30 {
|
||
jointState = monitor.MONITOR_RESOURCE_ALERT_STATUS_ALERTING
|
||
}
|
||
}
|
||
_, err := db.Update(self, func() error {
|
||
self.AlertState = jointState
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return errors.Wrapf(err, "SMonitorResource:%s UpdateAlertState err", self.Name)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (self *SMonitorResource) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
|
||
err := self.DetachJoint(ctx, userCred)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return self.SVirtualResourceBase.Delete(ctx, userCred)
|
||
}
|
||
|
||
func (self *SMonitorResource) DetachJoint(ctx context.Context, userCred mcclient.TokenCredential) error {
|
||
err := MonitorResourceAlertManager.DetachJoint(ctx, userCred,
|
||
monitor.MonitorResourceJointListInput{MonitorResourceId: self.ResId})
|
||
if err != nil {
|
||
return errors.Wrap(err, "SMonitorResource DetachJoint err")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
type AlertStatusCount struct {
|
||
CountId int64
|
||
AlertState string
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) GetPropertyAlert(ctx context.Context, userCred mcclient.TokenCredential,
|
||
data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
|
||
scope, _ := data.GetString("scope")
|
||
if len(scope) == 0 {
|
||
scope = "system"
|
||
}
|
||
result := jsonutils.NewDict()
|
||
for resType, _ := range GetResourceSyncMap() {
|
||
query := manager.Query("alert_state")
|
||
owner, _ := manager.FetchOwnerId(ctx, data)
|
||
if owner == nil {
|
||
owner = userCred
|
||
}
|
||
query = manager.FilterByOwner(ctx, query, manager, userCred, owner, rbacscope.TRbacScope(scope))
|
||
query = query.AppendField(sqlchemy.COUNT("count_id", query.Field("id")))
|
||
input := monitor.MonitorResourceListInput{ResType: resType}
|
||
query = manager.FieldListFilter(query, input)
|
||
query.GroupBy(query.Field("alert_state"))
|
||
log.Errorf("query:%s", query.String())
|
||
rows, err := query.Rows()
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "getMonitorResourceAlert query err")
|
||
}
|
||
defer rows.Close()
|
||
total := int64(0)
|
||
resTypeDict := jsonutils.NewDict()
|
||
for rows.Next() {
|
||
row := new(AlertStatusCount)
|
||
err := query.Row2Struct(rows, row)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "MonitorResource Row2Struct err")
|
||
}
|
||
resTypeDict.Add(jsonutils.NewInt(row.CountId), row.AlertState)
|
||
total += row.CountId
|
||
}
|
||
resTypeDict.Add(jsonutils.NewInt(total), "total")
|
||
result.Add(resTypeDict, resType)
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) UpdateMonitorResourceAttachJointByRecord(ctx context.Context, userCred mcclient.TokenCredential, record *SAlertRecord) error {
|
||
matches, _ := record.GetEvalData()
|
||
input := &UpdateMonitorResourceAlertInput{
|
||
AlertId: record.AlertId,
|
||
Matches: matches,
|
||
ResType: record.ResType,
|
||
AlertState: record.State,
|
||
SendState: record.SendState,
|
||
TriggerTime: record.CreatedAt,
|
||
AlertRecordId: record.GetId(),
|
||
}
|
||
if err := manager.UpdateMonitorResourceAttachJoint(ctx, userCred, input); err != nil {
|
||
return errors.Wrap(err, "UpdateMonitorResourceAttachJoint")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
type UpdateMonitorResourceAlertInput struct {
|
||
AlertId string
|
||
Matches []monitor.EvalMatch
|
||
ResType string
|
||
AlertState string
|
||
SendState string
|
||
TriggerTime time.Time
|
||
AlertRecordId string
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) UpdateMonitorResourceAttachJoint(ctx context.Context, userCred mcclient.TokenCredential, input *UpdateMonitorResourceAlertInput) error {
|
||
resType := input.ResType
|
||
if resType == monitor.METRIC_RES_TYPE_AGENT {
|
||
resType = monitor.METRIC_RES_TYPE_GUEST
|
||
}
|
||
matches := input.Matches
|
||
errs := make([]error, 0)
|
||
matchResourceIds := make([]string, 0)
|
||
for _, match := range matches {
|
||
resId := monitor.GetMeasurementResourceId(match.Tags, input.ResType)
|
||
if len(resId) == 0 {
|
||
continue
|
||
}
|
||
matchResourceIds = append(matchResourceIds, resId)
|
||
monitorResources, err := manager.GetMonitorResources(monitor.MonitorResourceListInput{ResType: resType, ResId: []string{resId}})
|
||
if err != nil {
|
||
errs = append(errs, errors.Wrapf(err, "SMonitorResourceManager GetMonitorResources by resId:%s err", resId))
|
||
continue
|
||
}
|
||
for _, res := range monitorResources {
|
||
err := res.UpdateAttachJoint(ctx, userCred, input, match)
|
||
if err != nil {
|
||
errs = append(errs, errors.Wrap(err, "UpdateAttachJoint"))
|
||
}
|
||
}
|
||
}
|
||
resourceAlerts, err := MonitorResourceAlertManager.GetJoinsByListInput(monitor.MonitorResourceJointListInput{
|
||
AlertId: input.AlertId,
|
||
AlertState: input.AlertState,
|
||
})
|
||
if err != nil {
|
||
return errors.Wrapf(err, "get monitor_resource_joint by alertId: %s", input.AlertId)
|
||
}
|
||
deleteJointIds := make([]int64, 0)
|
||
for _, joint := range resourceAlerts {
|
||
metricName := joint.Metric
|
||
isMetricFound := false
|
||
for _, match := range matches {
|
||
if match.Metric == metricName {
|
||
isMetricFound = true
|
||
break
|
||
}
|
||
}
|
||
if utils.IsInStringArray(joint.MonitorResourceId, matchResourceIds) && isMetricFound {
|
||
continue
|
||
}
|
||
deleteJointIds = append(deleteJointIds, joint.RowId)
|
||
}
|
||
if len(deleteJointIds) != 0 {
|
||
err = MonitorResourceAlertManager.DetachJoint(ctx, userCred, monitor.MonitorResourceJointListInput{JointId: deleteJointIds})
|
||
if err != nil {
|
||
return errors.Wrapf(err, "DetachJoint by alertId:%s err", input.AlertId)
|
||
}
|
||
}
|
||
return errors.NewAggregate(errs)
|
||
}
|
||
|
||
func (self *SMonitorResource) UpdateAttachJoint(ctx context.Context, userCred mcclient.TokenCredential, input *UpdateMonitorResourceAlertInput, match monitor.EvalMatch) error {
|
||
joints, err := MonitorResourceAlertManager.GetJoinsByListInput(
|
||
monitor.MonitorResourceJointListInput{
|
||
MonitorResourceId: self.ResId,
|
||
AlertId: input.AlertId,
|
||
Metric: match.Metric,
|
||
})
|
||
if err != nil {
|
||
return errors.Wrapf(err, "SMonitorResource: %s(%s) get joints by monitorResourceId %q , metric %q and alertId %q", self.Name, self.Id, self.ResId, match.Metric, input.AlertId)
|
||
}
|
||
errs := make([]error, 0)
|
||
updateJoints := make([]SMonitorResourceAlert, 0)
|
||
for _, joint := range joints {
|
||
if joint.Metric == match.Metric {
|
||
tmpJoint := joint
|
||
updateJoints = append(updateJoints, tmpJoint)
|
||
}
|
||
}
|
||
// 报警时发现没有进行关联,增加attach
|
||
if len(updateJoints) == 0 {
|
||
newJoint, err := self.AttachAlert(ctx, userCred, input.AlertId, match.Metric, match)
|
||
if err != nil {
|
||
log.Errorf("attach alert error: %s", err)
|
||
}
|
||
log.Infof("Attach Alert joint: %#v, match: %s", newJoint, jsonutils.Marshal(match))
|
||
if err := newJoint.UpdateAlertRecordData(ctx, userCred, input, &match); err != nil {
|
||
errs = append(errs, errors.Wrapf(err, "new joint %s:%s %s:%s UpdateAlertRecordData err",
|
||
MonitorResourceAlertManager.GetMasterFieldName(), self.ResId,
|
||
MonitorResourceAlertManager.GetSlaveFieldName(), input.AlertId))
|
||
}
|
||
} else {
|
||
for _, joint := range updateJoints {
|
||
err := joint.UpdateAlertRecordData(ctx, userCred, input, &match)
|
||
if err != nil {
|
||
errs = append(errs, errors.Wrapf(err, "joint %s:%s %s:%s UpdateAlertRecordData err",
|
||
MonitorResourceAlertManager.GetMasterFieldName(), self.ResId,
|
||
MonitorResourceAlertManager.GetSlaveFieldName(), input.AlertId))
|
||
}
|
||
}
|
||
}
|
||
if err := self.UpdateAlertState(); err != nil {
|
||
errs = append(errs, errors.Wrapf(err, "UpdateAlertState"))
|
||
}
|
||
return errors.NewAggregate(errs)
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) GetResourceObj(id string) (bool, jsonutils.JSONObject) {
|
||
for _, set := range manager.GetModelSets().ModelSetList() {
|
||
setRv := reflect.ValueOf(set)
|
||
mRv := setRv.MapIndex(reflect.ValueOf(id))
|
||
if mRv.IsValid() {
|
||
return true, jsonutils.Marshal(mRv.Interface())
|
||
}
|
||
}
|
||
return false, nil
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) GetResourceObjByResType(typ string) (bool, []jsonutils.JSONObject) {
|
||
manager.GetModelSets()
|
||
for _, set := range manager.GetModelSets().ModelSetList() {
|
||
if _, ok := set.(IMonitorResModelSet); !ok {
|
||
continue
|
||
}
|
||
if set.(IMonitorResModelSet).GetResType() != typ {
|
||
continue
|
||
}
|
||
setRv := reflect.ValueOf(set)
|
||
objects := make([]jsonutils.JSONObject, 0)
|
||
for _, kRv := range setRv.MapKeys() {
|
||
mRv := setRv.MapIndex(kRv)
|
||
objects = append(objects, jsonutils.Marshal(mRv.Interface()))
|
||
}
|
||
return true, objects
|
||
}
|
||
return false, nil
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) SyncManually(ctx context.Context) {
|
||
manager.apih.RunManually(ctx)
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) SyncResources(ctx context.Context, mss *MonitorResModelSets) error {
|
||
userCred := auth.AdminCredential()
|
||
errs := make([]error, 0)
|
||
log.Infof("start sync monitorresource")
|
||
aliveIds := make([]string, 0)
|
||
for _, set := range mss.ModelSetList() {
|
||
setRv := reflect.ValueOf(set)
|
||
needSync, typ := manager.GetSetType(set)
|
||
log.Infof("Type: %s, length: %d", typ, len(setRv.MapKeys()))
|
||
if !needSync {
|
||
log.Infof("Type: %s don't need sync", typ)
|
||
continue
|
||
}
|
||
for _, kRv := range setRv.MapKeys() {
|
||
mRv := setRv.MapIndex(kRv)
|
||
//log.Errorf("resID:%s", kRv.String())
|
||
input := monitor.MonitorResourceListInput{
|
||
ResId: []string{kRv.String()},
|
||
}
|
||
res, err := MonitorResourceManager.GetMonitorResources(input)
|
||
if err != nil {
|
||
return errors.Wrapf(err, "GetMonitorResources by input: %s", jsonutils.Marshal(input).String())
|
||
}
|
||
if mRv.IsValid() {
|
||
aliveIds = append(aliveIds, kRv.String())
|
||
obj := jsonutils.Marshal(mRv.Interface())
|
||
if len(res) == 0 {
|
||
// no find to create
|
||
createData := newMonitorResourceCreateInput(obj, typ)
|
||
_, err = db.DoCreate(MonitorResourceManager, ctx, userCred, nil, createData,
|
||
userCred)
|
||
if err != nil {
|
||
name, _ := createData.GetString("name")
|
||
errs = append(errs, errors.Wrapf(err, "monitorResource:%s resType:%s DoCreate err", name, typ))
|
||
}
|
||
continue
|
||
}
|
||
_, err = db.Update(&res[0], func() error {
|
||
obj.(*jsonutils.JSONDict).Remove("id")
|
||
(&res[0]).ResType = typ
|
||
obj.Unmarshal(&res[0])
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
errs = append(errs, errors.Wrapf(err, "monitorResource:%s Update err", res[0].Name))
|
||
continue
|
||
}
|
||
res[0].PostUpdate(ctx, userCred, jsonutils.NewDict(), newMonitorResourceCreateInput(obj, typ))
|
||
continue
|
||
}
|
||
if len(res) != 0 {
|
||
log.Infof("delete monitor resource,resId: %s,resType: %s", res[0].ResId, res[0].ResType)
|
||
err := (&res[0]).RealDelete(ctx, userCred)
|
||
if err != nil {
|
||
errs = append(errs, errors.Wrapf(err, "delete monitorResource:%s err", res[0].GetId()))
|
||
}
|
||
}
|
||
}
|
||
err := manager.DeleteMonitorResources(ctx, userCred, SdeleteRes{notIn: aliveIds, resType: typ})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
log.Infof("SMonitorResourceManager SyncResources End")
|
||
err := CommonAlertManager.Run(ctx)
|
||
if err != nil {
|
||
log.Errorf("CommonAlertManager UpdateMonitorResourceJoint err:%v", err)
|
||
}
|
||
return errors.NewAggregate(errs)
|
||
}
|
||
|
||
func (manager *SMonitorResourceManager) GetSetType(set apihelper.IModelSet) (bool, string) {
|
||
if iset, ok := set.(IMonitorResModelSet); ok {
|
||
return iset.NeedSync(), iset.GetResType()
|
||
}
|
||
return false, "NONE"
|
||
}
|
||
|
||
func newMonitorResourceCreateInput(input jsonutils.JSONObject, typ string) jsonutils.JSONObject {
|
||
monitorResource := jsonutils.DeepCopy(input).(*jsonutils.JSONDict)
|
||
id, _ := monitorResource.GetString("id")
|
||
monitorResource.Add(jsonutils.NewString(id), "res_id")
|
||
monitorResource.Remove("id")
|
||
monitorResource.Add(jsonutils.NewString(typ), "res_type")
|
||
if monitorResource.Contains("metadata") {
|
||
metadata, _ := monitorResource.Get("metadata")
|
||
monitorResource.Add(metadata, "__meta__")
|
||
}
|
||
|
||
return monitorResource
|
||
}
|
||
|
||
type MonitorResourceDoActionF func(obj *SMonitorResource, ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input *monitor.MonitorResourceDoActionInput) (jsonutils.JSONObject, error)
|
||
|
||
var (
|
||
monitorResourceDoActionMap = make(map[string]MonitorResourceDoActionF)
|
||
)
|
||
|
||
func RegisterMonitorResourceDoAction(action string, f MonitorResourceDoActionF) {
|
||
if _, ok := monitorResourceDoActionMap[action]; ok {
|
||
log.Fatalf("action %s already registered for monitor resource do action", action)
|
||
}
|
||
monitorResourceDoActionMap[action] = f
|
||
}
|
||
|
||
func (res *SMonitorResource) PerformDoAction(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input *monitor.MonitorResourceDoActionInput) (jsonutils.JSONObject, error) {
|
||
f, ok := monitorResourceDoActionMap[input.Action]
|
||
if !ok {
|
||
return nil, errors.Errorf("action %q not found for monitor resource do action", input.Action)
|
||
}
|
||
return f(res, ctx, userCred, query, input)
|
||
}
|