fix(notify): alert send

This commit is contained in:
马鸿飞
2023-07-25 16:51:20 +08:00
parent 1fe6285aef
commit 0d5d19d2ef
22 changed files with 191 additions and 135 deletions

View File

@@ -78,23 +78,27 @@ type RobotUpdateOptions struct {
type SrobotUpdateOptions struct {
Address string
Lang string
Header string
Body string
Header *string
Body *string
MsgKey string
}
func (ru *RobotUpdateOptions) Params() (jsonutils.JSONObject, error) {
dict := jsonutils.NewDict()
jsonutils.Update(&dict, ru)
header, err := jsonutils.Parse([]byte(ru.Header))
if err != nil {
return nil, errors.Wrap(err, "parse header")
if ru.Header != nil {
header, err := jsonutils.Parse([]byte(*ru.Header))
if err != nil {
return nil, errors.Wrap(err, "parse header")
}
dict.Set("header", header)
}
dict.Set("header", header)
body, err := jsonutils.Parse([]byte(ru.Body))
if err != nil {
return nil, errors.Wrap(err, "parse body")
if ru.Body != nil {
body, err := jsonutils.Parse([]byte(*ru.Body))
if err != nil {
return nil, errors.Wrap(err, "parse body")
}
dict.Set("body", body)
}
dict.Set("body", body)
return dict, nil
}

View File

@@ -89,7 +89,7 @@ func (cm *SConfigManager) ValidateCreateData(ctx context.Context, userCred mccli
}
driver := GetDriver(input.Type)
// validate
message, err := driver.ValidateConfig(api.NotifyConfig{
message, err := driver.ValidateConfig(ctx, api.NotifyConfig{
SNotifyConfigContent: *input.Content,
Attribution: input.Attribution,
DomainId: input.ProjectDomainId,
@@ -136,7 +136,7 @@ func (c *SConfig) ValidateUpdateData(ctx context.Context, userCred mcclient.Toke
// check if changed
if input.Content != nil {
driver := GetDriver(c.Type)
message, err := driver.ValidateConfig(api.NotifyConfig{
message, err := driver.ValidateConfig(ctx, api.NotifyConfig{
DomainId: c.DomainId,
Attribution: c.Attribution,
SNotifyConfigContent: *input.Content,
@@ -332,7 +332,7 @@ func (cm *SConfigManager) PerformValidate(ctx context.Context, userCred mcclient
}
// validate
driver := GetDriver(input.Type)
message, err := driver.ValidateConfig(api.NotifyConfig{
message, err := driver.ValidateConfig(ctx, api.NotifyConfig{
SNotifyConfigContent: *input.Content,
DomainId: userCred.GetDomainId(),
})
@@ -355,7 +355,7 @@ func (confManager *SConfigManager) InitializeData() error {
for _, config := range res {
driver := GetDriver(config.Type)
driver.RegisterConfig(config)
err := driver.GetAccessToken(config.DomainId)
err := driver.GetAccessToken(context.Background(), config.DomainId)
if err != nil {
session := auth.GetAdminSession(context.Background(), options.Options.Region)
logclient.AddSimpleActionLog(&config, logclient.ACT_INIT_NOTIFY_CONFIGMAP, err, session.GetToken(), false)

View File

@@ -178,7 +178,7 @@ func (eq *SEmailQueue) doSend(ctx context.Context) {
}
eq.setStatus(ctx, api.EmailSending, nil)
driver := GetDriver(api.EMAIL)
err = driver.Send(api.SendParams{
err = driver.Send(ctx, api.SendParams{
EmailMsg: msg,
})
if err != nil {

View File

@@ -132,10 +132,6 @@ func (nm *SNotificationManager) ValidateCreateData(ctx context.Context, userCred
}
}
input.Receivers = idSet.UnsortedList()
if len(input.Receivers)+len(input.Contacts) == 0 {
return input, httperrors.NewInputParameterError("no valid receiver or contact")
}
if len(input.Receivers)+len(input.Contacts)+len(input.Robots) == 0 {
return input, httperrors.NewInputParameterError("no valid receiver or contact")
}
@@ -724,7 +720,7 @@ func (n *SNotification) GetTemplate(ctx context.Context, topicId, lang string, n
webhookMsg.Set("action", jsonutils.NewString(aStr))
webhookMsg.Set("result", jsonutils.NewString(resultStr))
webhookMsg.Set("resource_details", msg)
if no.ContactType == api.WEBHOOK {
if no.ContactType == api.WEBHOOK || no.ContactType == api.WEBHOOK_ROBOT {
return api.SendParams{
Title: no.Event.StringWithDeli("_"),
Message: webhookMsg.String(),

View File

@@ -15,20 +15,22 @@
package models
import (
"context"
api "yunion.io/x/onecloud/pkg/apis/notify"
)
type ISenderDriver interface {
GetSenderType() string
Send(args api.SendParams) error
ValidateConfig(api.NotifyConfig) (string, error)
ContactByMobile(mobile, domainId string) (string, error)
Send(ctx context.Context, args api.SendParams) error
ValidateConfig(ctx context.Context, args api.NotifyConfig) (string, error)
ContactByMobile(ctx context.Context, mobile, domainId string) (string, error)
IsRobot() bool
IsPersonal() bool
IsSystemConfigContactType() bool
IsValid() bool
IsPullType() bool
GetAccessToken(domainId string) error
GetAccessToken(ctx context.Context, domainId string) error
RegisterConfig(config SConfig)
}
@@ -69,6 +71,6 @@ func GetValidPersonalSenderTypes() []string {
}
func GetDriver(sendType string) ISenderDriver {
driver, _ := driverTable[sendType]
driver := driverTable[sendType]
return driver
}

View File

@@ -17,6 +17,7 @@ package models
import (
"context"
"fmt"
"strings"
"golang.org/x/text/language"
@@ -91,7 +92,7 @@ func (rm *SRobotManager) ValidateCreateData(ctx context.Context, userCred mcclie
input.SetEnabled()
input.Status = api.ROBOT_STATUS_READY
driver := GetDriver(fmt.Sprintf("%s-robot", input.Type))
err = driver.Send(api.SendParams{
err = driver.Send(ctx, api.SendParams{
Receivers: api.SNotifyReceiver{
Contact: input.Address,
DomainId: input.ProjectDomainId,
@@ -103,7 +104,10 @@ func (rm *SRobotManager) ValidateCreateData(ctx context.Context, userCred mcclie
Message: "This is a verification message, please ignore.",
})
if err != nil {
return input, err
if errors.ErrConnectRefused == errors.Cause(err) {
return input, errors.Wrapf(errors.ErrNotImplemented, "url not allow :%s", err.Error())
}
return input, errors.Wrap(err, "robot validate")
}
return input, nil
}
@@ -150,15 +154,41 @@ func (r *SRobot) ValidateUpdateData(ctx context.Context, userCred mcclient.Token
}
if len(input.Address) > 0 {
// check Address
dirver := GetDriver(r.Type)
err := dirver.Send(api.SendParams{})
dirver := GetDriver(fmt.Sprintf("%s-robot", r.Type))
err := dirver.Send(ctx, api.SendParams{
Header: input.Header,
Body: input.Body,
MsgKey: input.MsgKey,
Title: "Validate",
Message: "This is a verification message, please ignore.",
Receivers: api.SNotifyReceiver{
Contact: input.Address,
},
})
if err != nil {
if errors.ErrConnectRefused == errors.Cause(err) {
return input, errors.Wrapf(errors.ErrNotImplemented, "url not allow :%s", err.Error())
}
return input, errors.Wrap(err, "unable to validate address")
}
}
return input, nil
}
func (r *SRobot) PostUpdate(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) {
db.Update(r, func() error {
header, ok := data.Get("header")
if ok == nil && header.IsZero() {
r.Header = nil
}
body, ok := data.Get("body")
if ok == nil && body.IsZero() {
r.Body = nil
}
return nil
})
}
func (r *SRobot) CustomizeCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
err := r.SSharableVirtualResourceBase.CustomizeCreate(ctx, userCred, ownerId, query, data)
if err != nil {
@@ -188,7 +218,7 @@ func (r *SRobot) IsEnabled() bool {
}
func (r *SRobot) IsEnabledContactType(ctype string) (bool, error) {
return ctype == api.ROBOT || ctype == api.WEBHOOK, nil
return strings.Contains(ctype, api.ROBOT) || ctype == api.WEBHOOK, nil
}
func (r *SRobot) IsVerifiedContactType(ctype string) (bool, error) {

View File

@@ -225,8 +225,8 @@ func (tm *STemplateManager) FillWithTemplate(ctx context.Context, lang string, n
// if strings.Contains(no.Topic, "-cn") || strings.Contains(no.Topic, "-en") {
// no.Topic = no.Topic[:len(no.Topic)-3]
// }
var q *sqlchemy.SQuery
q = tm.Query().Equals("topic", strings.ToUpper(no.Topic)).Equals("lang", lang).In("contact_type", []string{CONTACTTYPE_ALL, no.ContactType})
q := tm.Query().Equals("topic", strings.ToUpper(no.Topic)).Equals("lang", lang).In("contact_type", []string{CONTACTTYPE_ALL, no.ContactType})
err = db.FetchModelObjects(tm, q, &templates)
if errors.Cause(err) == sql.ErrNoRows || len(templates) == 0 {
// no such template, return as is

View File

@@ -15,6 +15,7 @@
package sender
import (
"context"
"fmt"
"net/url"
"strings"
@@ -36,7 +37,7 @@ func (dingSender *SDingTalkSender) GetSenderType() string {
return api.DINGTALK
}
func (dingSender *SDingTalkSender) Send(args api.SendParams) error {
func (dingSender *SDingTalkSender) Send(ctx context.Context, args api.SendParams) error {
body := map[string]interface{}{
"agent_id": models.ConfigMap[fmt.Sprintf("%s-%s", api.DINGTALK, args.DomainId)].Content.AgentId,
"msg": map[string]interface{}{
@@ -50,21 +51,21 @@ func (dingSender *SDingTalkSender) Send(args api.SendParams) error {
}
params := url.Values{}
params.Set("access_token", models.ConfigMap[fmt.Sprintf("%s-%s", api.DINGTALK, args.DomainId)].Content.AccessToken)
req, err := sendRequest(ApiDingtalkSendMessage, httputils.POST, nil, params, jsonutils.Marshal(body))
req, err := sendRequest(ctx, ApiDingtalkSendMessage, httputils.POST, nil, params, jsonutils.Marshal(body))
if err != nil {
subCode, _ := req.GetString("sub_code")
switch subCode {
// token失效或不合法
case "40014":
// 尝试重新获取token
err = dingSender.GetAccessToken(fmt.Sprintf("%s-%s", api.DINGTALK, args.DomainId))
err = dingSender.GetAccessToken(ctx, fmt.Sprintf("%s-%s", api.DINGTALK, args.DomainId))
if err != nil {
return errors.Wrap(err, "reset token")
}
// 重新发送通知
params = url.Values{}
params.Set("access_token", models.ConfigMap[fmt.Sprintf("%s-%s", api.DINGTALK, args.DomainId)].Content.AccessToken)
req, err = sendRequest(ApiDingtalkSendMessage, httputils.POST, nil, params, jsonutils.Marshal(body))
req, err = sendRequest(ctx, ApiDingtalkSendMessage, httputils.POST, nil, params, jsonutils.Marshal(body))
if err != nil {
return errors.Wrap(err, "dingtalk resend message")
}
@@ -80,13 +81,13 @@ func (dingSender *SDingTalkSender) Send(args api.SendParams) error {
"agent_id": models.ConfigMap[fmt.Sprintf("%s-%s", api.DINGTALK, args.DomainId)].Content.AgentId,
"task_id": task_id,
}
_, err = sendRequest(ApiDingtalkSendMessage, httputils.POST, nil, params, jsonutils.Marshal(body))
_, err = sendRequest(ctx, ApiDingtalkSendMessage, httputils.POST, nil, params, jsonutils.Marshal(body))
return err
}
func (dingSender *SDingTalkSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (dingSender *SDingTalkSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
// 校验accesstoken
_, err := dingSender.getAccessToken(config.AppKey, config.AppSecret)
_, err := dingSender.getAccessToken(ctx, config.AppKey, config.AppSecret)
if err != nil {
if strings.Contains(err.Error(), "40089") {
return "invalid AppKey or AppSecret", err
@@ -96,8 +97,8 @@ func (dingSender *SDingTalkSender) ValidateConfig(config api.NotifyConfig) (stri
return "", nil
}
func (dingSender *SDingTalkSender) ContactByMobile(mobile, domainId string) (string, error) {
err := dingSender.GetAccessToken(domainId)
func (dingSender *SDingTalkSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
err := dingSender.GetAccessToken(ctx, domainId)
if err != nil {
return "", err
}
@@ -106,7 +107,7 @@ func (dingSender *SDingTalkSender) ContactByMobile(mobile, domainId string) (str
})
params := url.Values{}
params.Set("access_token", models.ConfigMap[fmt.Sprintf("%s-%s", api.DINGTALK, domainId)].Content.AccessToken)
res, err := sendRequest(ApiDingtalkGetUserByMobile, httputils.POST, nil, params, body)
res, err := sendRequest(ctx, ApiDingtalkGetUserByMobile, httputils.POST, nil, params, body)
if err != nil {
return "", errors.Wrap(err, "get user by mobile")
}
@@ -141,10 +142,10 @@ func (dingSender *SDingTalkSender) RegisterConfig(config models.SConfig) {
models.ConfigMap[fmt.Sprintf("%s-%s", config.Type, config.DomainId)] = config
}
func (dingSender *SDingTalkSender) GetAccessToken(domainId string) error {
func (dingSender *SDingTalkSender) GetAccessToken(ctx context.Context, domainId string) error {
key := fmt.Sprintf("%s-%s", api.DINGTALK, domainId)
appKey, appSecret := models.ConfigMap[key].Content.AppKey, models.ConfigMap[key].Content.AppSecret
token, err := dingSender.getAccessToken(appKey, appSecret)
token, err := dingSender.getAccessToken(ctx, appKey, appSecret)
if err != nil {
return errors.Wrap(err, "dingtalk getAccessToken")
}
@@ -152,11 +153,11 @@ func (dingSender *SDingTalkSender) GetAccessToken(domainId string) error {
return nil
}
func (dingSender *SDingTalkSender) getAccessToken(appKey, appSecret string) (string, error) {
func (dingSender *SDingTalkSender) getAccessToken(ctx context.Context, appKey, appSecret string) (string, error) {
params := url.Values{}
params.Set("appkey", appKey)
params.Set("appsecret", appSecret)
res, err := sendRequest(ApiDingtalkGetToken, httputils.GET, nil, params, nil)
res, err := sendRequest(ctx, ApiDingtalkGetToken, httputils.GET, nil, params, nil)
if err != nil {
return "", errors.Wrap(err, "get dingtalk token")
}

View File

@@ -15,6 +15,7 @@
package sender
import (
"context"
"fmt"
"strings"
@@ -39,8 +40,7 @@ type SDingTalkRobotSender struct {
func (dingRobotSender *SDingTalkRobotSender) GetSenderType() string {
return api.DINGTALK_ROBOT
}
func (dingRobotSender *SDingTalkRobotSender) Send(args api.SendParams) error {
func (dingRobotSender *SDingTalkRobotSender) Send(ctx context.Context, args api.SendParams) error {
var token string
var atStr strings.Builder
title, msg := args.Title, args.Message
@@ -71,11 +71,11 @@ func (dingRobotSender *SDingTalkRobotSender) Send(args api.SendParams) error {
return errors.Wrap(err, "this is res err")
}
func (dingRobotSender *SDingTalkRobotSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (dingRobotSender *SDingTalkRobotSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
func (dingRobotSender *SDingTalkRobotSender) ContactByMobile(mobile, domainId string) (string, error) {
func (dingRobotSender *SDingTalkRobotSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
@@ -99,7 +99,7 @@ func (dingRobotSender *SDingTalkRobotSender) IsSystemConfigContactType() bool {
return true
}
func (dingRobotSender *SDingTalkRobotSender) GetAccessToken(key string) error {
func (dingRobotSender *SDingTalkRobotSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}

View File

@@ -15,6 +15,7 @@
package sender
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
@@ -54,7 +55,7 @@ func (emailSender *SEmailSender) GetSenderType() string {
return api.EMAIL
}
func (emailSender *SEmailSender) Send(args api.SendParams) error {
func (emailSender *SEmailSender) Send(ctx context.Context, args api.SendParams) error {
// 初始化emaliClient
hostNmae, hostPort, userName, password := models.ConfigMap[api.EMAIL].Content.Hostname, models.ConfigMap[api.EMAIL].Content.Hostport, models.ConfigMap[api.EMAIL].Content.Username, models.ConfigMap[api.EMAIL].Content.Password
dialer := gomail.NewDialer(hostNmae, hostPort, userName, password)
@@ -161,7 +162,7 @@ func (emailSender *SEmailSender) Send(args api.SendParams) error {
return nil
}
func (emailSender *SEmailSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (emailSender *SEmailSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
errChan := make(chan error, 1)
go func() {
dialer := gomail.NewDialer(config.Hostname, config.Hostport, config.Username, config.Password)
@@ -192,7 +193,7 @@ func (emailSender *SEmailSender) ValidateConfig(config api.NotifyConfig) (string
}
}
func (emailSender *SEmailSender) ContactByMobile(mobile, domainId string) (string, error) {
func (emailSender *SEmailSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", nil
}
@@ -216,16 +217,16 @@ func (emailSender *SEmailSender) IsSystemConfigContactType() bool {
return true
}
func (emailSender *SEmailSender) GetAccessToken(key string) error {
func (emailSender *SEmailSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}
func (emailSender *SEmailSender) sendMessageWithToken(uri string, method httputils.THttpMethod, header http.Header, params url.Values, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
func (emailSender *SEmailSender) sendMessageWithToken(ctx context.Context, uri string, method httputils.THttpMethod, header http.Header, params url.Values, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
if params == nil {
params = url.Values{}
}
params.Set("access_token", models.ConfigMap[api.WORKWX].Content.AccessToken)
return sendRequest(uri, httputils.POST, nil, params, jsonutils.Marshal(body))
return sendRequest(ctx, uri, httputils.POST, nil, params, jsonutils.Marshal(body))
}
func (emailSender *SEmailSender) RegisterConfig(config models.SConfig) {

View File

@@ -14,6 +14,7 @@
package sender
import (
"context"
"fmt"
"net/http"
"net/url"
@@ -38,7 +39,7 @@ func (self *SFeishuSender) GetSenderType() string {
const ApiSendMessageForFeishuByOpenId = feishu.ApiRobotSendMessage + "?receive_id_type=open_id"
// 发送飞书消息
func (feishuSender *SFeishuSender) Send(args api.SendParams) error {
func (feishuSender *SFeishuSender) Send(ctx context.Context, args api.SendParams) error {
// 发送通知消息体
body := map[string]interface{}{
"open_id": args.Receivers.Contact,
@@ -49,7 +50,7 @@ func (feishuSender *SFeishuSender) Send(args api.SendParams) error {
} // 添加bearer token请求头
header := http.Header{}
header.Add("Authorization", fmt.Sprintf("Bearer %s", models.ConfigMap[fmt.Sprintf("%s-%s", api.FEISHU, args.DomainId)].Content.AccessToken))
rep, err := sendRequest(ApiSendMessageForFeishuByOpenId, httputils.POST, header, nil, jsonutils.Marshal(body))
rep, err := sendRequest(ctx, ApiSendMessageForFeishuByOpenId, httputils.POST, header, nil, jsonutils.Marshal(body))
if err == nil {
return nil
}
@@ -63,12 +64,12 @@ func (feishuSender *SFeishuSender) Send(args api.SendParams) error {
}
switch code {
case "99991663": //token过期
err = feishuSender.GetAccessToken(fmt.Sprintf("%s-%s", api.FEISHU, args.DomainId))
err = feishuSender.GetAccessToken(ctx, fmt.Sprintf("%s-%s", api.FEISHU, args.DomainId))
if err != nil {
return errors.Wrap(err, "tenant token invalid && getToken err")
}
header.Set("Authorization", fmt.Sprintf("Bearer %s", models.ConfigMap[fmt.Sprintf("%s-%s", api.FEISHU, args.DomainId)].Content.AccessToken))
_, err = sendRequest(ApiSendMessageForFeishuByOpenId, httputils.POST, header, nil, jsonutils.Marshal(body))
_, err = sendRequest(ctx, ApiSendMessageForFeishuByOpenId, httputils.POST, header, nil, jsonutils.Marshal(body))
if err == nil {
return nil
}
@@ -80,7 +81,7 @@ func (feishuSender *SFeishuSender) Send(args api.SendParams) error {
}
// 校验appId与appSecret
func (feishuSender *SFeishuSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (feishuSender *SFeishuSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
rep, err := feishuSender.getAccessToken(config.AppId, config.AppSecret)
if err == nil {
return "", nil
@@ -96,12 +97,12 @@ func (feishuSender *SFeishuSender) ValidateConfig(config api.NotifyConfig) (stri
}
// 根据用户手机号获取用户的open_id
func (feishuSender *SFeishuSender) ContactByMobile(mobile, domainId string) (string, error) {
func (feishuSender *SFeishuSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
body := jsonutils.NewDict()
body.Set("mobiles", jsonutils.NewArray(jsonutils.NewString(mobile)))
header := http.Header{}
// 考虑到获取用户id需求较少可通过直接更新token来避免token失效
err := feishuSender.GetAccessToken(domainId)
err := feishuSender.GetAccessToken(ctx, domainId)
if err != nil {
return "", errors.Wrap(err, "GetAccessToken")
}
@@ -110,7 +111,7 @@ func (feishuSender *SFeishuSender) ContactByMobile(mobile, domainId string) (str
params.Set("mobiles", mobile)
header.Set("Authorization", fmt.Sprintf("Bearer %s", models.ConfigMap[fmt.Sprintf("%s-%s", api.FEISHU, domainId)].Content.AccessToken))
resp, err := sendRequest(ApiFetchUserID, httputils.GET, header, params, body)
resp, err := sendRequest(ctx, ApiFetchUserID, httputils.GET, header, params, body)
if err != nil {
return "", err
}
@@ -155,7 +156,7 @@ func (feishuSender *SFeishuSender) RegisterConfig(config models.SConfig) {
}
// 获取token
func (feishuSender *SFeishuSender) GetAccessToken(domainId string) error {
func (feishuSender *SFeishuSender) GetAccessToken(ctx context.Context, domainId string) error {
key := fmt.Sprintf("%s-%s", api.FEISHU, domainId)
appId, appSecret := models.ConfigMap[key].Content.AppId, models.ConfigMap[key].Content.AppSecret
resp, err := feishuSender.getAccessToken(appId, appSecret)

View File

@@ -14,6 +14,7 @@
package sender
import (
"context"
"fmt"
"strings"
@@ -36,7 +37,7 @@ func (feishuRobotSender *SFeishuRobotSender) GetSenderType() string {
return api.FEISHU_ROBOT
}
func (feishuRobotSender *SFeishuRobotSender) Send(args api.SendParams) error {
func (feishuRobotSender *SFeishuRobotSender) Send(ctx context.Context, args api.SendParams) error {
var token string
var errs []error
title, msg := args.Title, args.Message
@@ -73,11 +74,11 @@ func (feishuRobotSender *SFeishuRobotSender) Send(args api.SendParams) error {
return errors.NewAggregate(errs)
}
func (feishuRobotSender *SFeishuRobotSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (feishuRobotSender *SFeishuRobotSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
func (feishuRobotSender *SFeishuRobotSender) ContactByMobile(mobile, domainId string) (string, error) {
func (feishuRobotSender *SFeishuRobotSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
@@ -101,7 +102,7 @@ func (feishuRobotSender *SFeishuRobotSender) IsSystemConfigContactType() bool {
return true
}
func (feishuRobotSender *SFeishuRobotSender) GetAccessToken(key string) error {
func (feishuRobotSender *SFeishuRobotSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}

View File

@@ -15,6 +15,8 @@
package sender
import (
"context"
"yunion.io/x/cloudmux/pkg/cloudprovider"
"yunion.io/x/pkg/errors"
@@ -31,7 +33,7 @@ func (smsSender *SMobileSender) GetSenderType() string {
return api.MOBILE
}
func (smsSender *SMobileSender) Send(args api.SendParams) error {
func (smsSender *SMobileSender) Send(ctx context.Context, args api.SendParams) error {
smsSendParams := api.SSMSSendParams{
TemplateParas: args.Message,
To: args.Receivers.Contact,
@@ -46,7 +48,7 @@ func (smsSender *SMobileSender) Send(args api.SendParams) error {
})
}
func (smsSender *SMobileSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (smsSender *SMobileSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
driver := models.GetSMSDriver(config.SmsDriver)
if driver == nil {
return "", errors.Wrap(errors.ErrNotFound, "driver disabled")
@@ -83,7 +85,7 @@ func (smsSender *SMobileSender) DeleteConfig(config api.NotifyConfig) error {
return cloudprovider.ErrNotImplemented
}
func (smsSender *SMobileSender) ContactByMobile(mobile, domainId string) (string, error) {
func (smsSender *SMobileSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", nil
}
@@ -107,7 +109,7 @@ func (smsSender *SMobileSender) IsSystemConfigContactType() bool {
return true
}
func (smsSender *SMobileSender) GetAccessToken(key string) error {
func (smsSender *SMobileSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}

View File

@@ -33,11 +33,10 @@ var (
cli = &http.Client{
Transport: httputils.GetTransport(true),
}
ctx = context.Background()
)
// 通知请求
func sendRequest(url string, method httputils.THttpMethod, header http.Header, params url.Values, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
func sendRequest(ctx context.Context, url string, method httputils.THttpMethod, header http.Header, params url.Values, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
if header == nil {
header = http.Header{}
}

View File

@@ -14,6 +14,8 @@
package sender
import (
"context"
"yunion.io/x/cloudmux/pkg/cloudprovider"
api "yunion.io/x/onecloud/pkg/apis/notify"
@@ -28,11 +30,11 @@ func (self *SWebconsoleSender) GetSenderType() string {
return api.WEBCONSOLE
}
func (self *SWebconsoleSender) Send(args api.SendParams) error {
func (self *SWebconsoleSender) Send(ctx context.Context, args api.SendParams) error {
return nil
}
func (websender *SWebconsoleSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (websender *SWebconsoleSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
@@ -48,7 +50,7 @@ func (websender *SWebconsoleSender) DeleteConfig(config api.NotifyConfig) error
return cloudprovider.ErrNotImplemented
}
func (websender *SWebconsoleSender) ContactByMobile(mobile, domainId string) (string, error) {
func (websender *SWebconsoleSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
@@ -72,7 +74,7 @@ func (websender *SWebconsoleSender) IsSystemConfigContactType() bool {
return true
}
func (websender *SWebconsoleSender) GetAccessToken(key string) error {
func (websender *SWebconsoleSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}

View File

@@ -14,6 +14,7 @@
package sender
import (
"context"
"net/http"
"strings"
@@ -34,13 +35,10 @@ func (self *SWebhookSender) GetSenderType() string {
return api.WEBHOOK_ROBOT
}
func (self *SWebhookSender) Send(args api.SendParams) error {
body, err := jsonutils.ParseString(args.Message)
if err != nil {
return errors.Wrapf(err, "unable to parse %q", args.Message)
}
if _, ok := body.(*jsonutils.JSONString); err != nil || ok {
dict := jsonutils.NewDict()
func (self *SWebhookSender) Send(ctx context.Context, args api.SendParams) error {
dict := jsonutils.NewDict()
header := http.Header{}
if len(args.Event) == 0 {
if len(args.MsgKey) == 0 {
dict.Set("Msg", jsonutils.NewString(args.Message))
} else {
@@ -49,10 +47,23 @@ func (self *SWebhookSender) Send(args api.SendParams) error {
if args.Body != nil {
jsonutils.Update(dict, args.Body)
}
body = dict
} else {
body, err := jsonutils.ParseString(args.Message)
if err != nil {
return errors.Wrapf(err, "unable to parse %q", args.Message)
}
jsonutils.Update(dict, body)
if len(args.MsgKey) == 0 {
dict.Set("Msg", jsonutils.NewString(args.Message))
} else {
dict.Set(args.MsgKey, jsonutils.NewString(args.Message))
}
if args.Body != nil {
jsonutils.Update(dict, args.Body)
}
event := strings.ToUpper(args.Event)
header.Set(EVENT_HEADER, event)
}
event := strings.ToUpper(args.Event)
header := http.Header{}
if args.Header != nil {
resmap, _ := args.Header.GetMap()
for k, v := range resmap {
@@ -64,12 +75,11 @@ func (self *SWebhookSender) Send(args api.SendParams) error {
}
}
header.Set(EVENT_HEADER, event)
_, _, err = httputils.JSONRequest(cli, ctx, httputils.POST, args.Receivers.Contact, header, body, false)
return err
_, _, err := httputils.JSONRequest(cli, ctx, httputils.POST, args.Receivers.Contact, header, dict, false)
return errors.Wrap(err, "webhook send")
}
func (websender *SWebhookSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (websender *SWebhookSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
@@ -85,7 +95,7 @@ func (websender *SWebhookSender) DeleteConfig(config api.NotifyConfig) error {
return cloudprovider.ErrNotImplemented
}
func (websender *SWebhookSender) ContactByMobile(mobile, domainId string) (string, error) {
func (websender *SWebhookSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
@@ -109,7 +119,7 @@ func (websender *SWebhookSender) IsSystemConfigContactType() bool {
return true
}
func (websender *SWebhookSender) GetAccessToken(key string) error {
func (websender *SWebhookSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}

View File

@@ -37,11 +37,11 @@ func (websocket *SWebsocketSender) GetSenderType() string {
return api.WEBSOCKET
}
func (websocket *SWebsocketSender) Send(params api.SendParams) error {
return websocket.send(params)
func (websocket *SWebsocketSender) Send(ctx context.Context, params api.SendParams) error {
return websocket.send(ctx, params)
}
func (websocket *SWebsocketSender) send(args api.SendParams) error {
func (websocket *SWebsocketSender) send(ctx context.Context, args api.SendParams) error {
params := jsonutils.NewDict()
params.Add(jsonutils.NewString("notify"), "obj_type")
params.Add(jsonutils.NewString(""), "obj_id")
@@ -101,15 +101,15 @@ func (websocket *SWebsocketSender) IsSystemConfigContactType() bool {
return true
}
func (websocket *SWebsocketSender) ContactByMobile(mobile, domainId string) (string, error) {
func (websocket *SWebsocketSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", nil
}
func (websocket *SWebsocketSender) GetAccessToken(key string) error {
func (websocket *SWebsocketSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}
func (websocket *SWebsocketSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (websocket *SWebsocketSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
return "", cloudprovider.ErrNotImplemented
}

View File

@@ -15,6 +15,7 @@
package sender
import (
"context"
"fmt"
"net/http"
"net/url"
@@ -36,7 +37,7 @@ func (workwxSender *SWorkwxSender) GetSenderType() string {
return api.WORKWX
}
func (workwxSender *SWorkwxSender) Send(args api.SendParams) error {
func (workwxSender *SWorkwxSender) Send(ctx context.Context, args api.SendParams) error {
body := map[string]interface{}{
"agentid": models.ConfigMap[fmt.Sprintf("%s-%s", api.WORKWX, args.DomainId)].Content.AgentId,
"msgtype": "markdown",
@@ -45,7 +46,7 @@ func (workwxSender *SWorkwxSender) Send(args api.SendParams) error {
},
"touser": args.Receivers.Contact,
}
respObj, err := workwxSender.sendMessageWithToken(ApiWorkwxSendMessage, fmt.Sprintf("%s-%s", api.WORKWX, args.DomainId), httputils.POST, nil, nil, jsonutils.Marshal(body))
respObj, err := workwxSender.sendMessageWithToken(ctx, ApiWorkwxSendMessage, fmt.Sprintf("%s-%s", api.WORKWX, args.DomainId), httputils.POST, nil, nil, jsonutils.Marshal(body))
if err != nil {
return errors.Wrap(err, "workwx send message")
}
@@ -55,11 +56,11 @@ func (workwxSender *SWorkwxSender) Send(args api.SendParams) error {
if resp.ErrCode > 0 {
// 对于token过期情况进行重新获取token并重新发消息
if len(resp.UnlicensedUser) > 0 || resp.ErrCode == 42001 {
err = workwxSender.GetAccessToken(args.DomainId)
err = workwxSender.GetAccessToken(ctx, args.DomainId)
if err != nil {
return errors.Wrap(err, "retenant token invalid && getToken err")
}
secRespObj, err := workwxSender.sendMessageWithToken(ApiWorkwxSendMessage, fmt.Sprintf("%s-%s", api.WORKWX, args.DomainId), httputils.POST, nil, nil, jsonutils.Marshal(body))
secRespObj, err := workwxSender.sendMessageWithToken(ctx, ApiWorkwxSendMessage, fmt.Sprintf("%s-%s", api.WORKWX, args.DomainId), httputils.POST, nil, nil, jsonutils.Marshal(body))
secRespObj.Unmarshal(&resp)
if err == nil && resp.ErrCode == 0 {
return nil
@@ -72,9 +73,9 @@ func (workwxSender *SWorkwxSender) Send(args api.SendParams) error {
return nil
}
func (workwxSender *SWorkwxSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (workwxSender *SWorkwxSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
// 校验accesstoken
_, err := workwxSender.getAccessToken(config.CorpId, config.Secret)
_, err := workwxSender.getAccessToken(ctx, config.CorpId, config.Secret)
if err != nil {
switch {
case strings.Contains(err.Error(), "40013"):
@@ -87,15 +88,15 @@ func (workwxSender *SWorkwxSender) ValidateConfig(config api.NotifyConfig) (stri
return "", nil
}
func (workwxSender *SWorkwxSender) ContactByMobile(mobile, domainId string) (string, error) {
err := workwxSender.GetAccessToken(domainId)
func (workwxSender *SWorkwxSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
err := workwxSender.GetAccessToken(ctx, domainId)
if err != nil {
return "", err
}
body := jsonutils.Marshal(map[string]interface{}{
"mobile": mobile,
})
res, err := workwxSender.sendMessageWithToken(ApiWorkwxGetUserByMobile, fmt.Sprintf("%s-%s", api.WORKWX, domainId), httputils.POST, nil, nil, jsonutils.Marshal(body))
res, err := workwxSender.sendMessageWithToken(ctx, ApiWorkwxGetUserByMobile, fmt.Sprintf("%s-%s", api.WORKWX, domainId), httputils.POST, nil, nil, jsonutils.Marshal(body))
if err != nil {
return "", errors.Wrap(err, "get user by mobile")
}
@@ -130,10 +131,10 @@ func (workwxSender *SWorkwxSender) RegisterConfig(config models.SConfig) {
models.ConfigMap[fmt.Sprintf("%s-%s", config.Type, config.DomainId)] = config
}
func (workwxSender *SWorkwxSender) GetAccessToken(domainId string) error {
func (workwxSender *SWorkwxSender) GetAccessToken(ctx context.Context, domainId string) error {
key := fmt.Sprintf("%s-%s", api.WORKWX, domainId)
corpId, secret := models.ConfigMap[key].Content.CorpId, models.ConfigMap[key].Content.Secret
token, err := workwxSender.getAccessToken(corpId, secret)
token, err := workwxSender.getAccessToken(ctx, corpId, secret)
if err != nil {
return errors.Wrap(err, "workwx getAccessToken")
}
@@ -141,23 +142,23 @@ func (workwxSender *SWorkwxSender) GetAccessToken(domainId string) error {
return nil
}
func (workwxSender *SWorkwxSender) getAccessToken(corpId, secret string) (string, error) {
func (workwxSender *SWorkwxSender) getAccessToken(ctx context.Context, corpId, secret string) (string, error) {
params := url.Values{}
params.Set("corpid", corpId)
params.Set("corpsecret", secret)
res, err := sendRequest(ApiWorkwxGetToken, httputils.GET, nil, params, nil)
res, err := sendRequest(ctx, ApiWorkwxGetToken, httputils.GET, nil, params, nil)
if err != nil {
return "", errors.Wrap(err, "get workwx token")
}
return res.GetString("access_token")
}
func (workwxSender *SWorkwxSender) sendMessageWithToken(uri, key string, method httputils.THttpMethod, header http.Header, params url.Values, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
func (workwxSender *SWorkwxSender) sendMessageWithToken(ctx context.Context, uri, key string, method httputils.THttpMethod, header http.Header, params url.Values, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
if params == nil {
params = url.Values{}
}
params.Set("access_token", models.ConfigMap[key].Content.AccessToken)
return sendRequest(uri, httputils.POST, nil, params, jsonutils.Marshal(body))
return sendRequest(ctx, uri, httputils.POST, nil, params, jsonutils.Marshal(body))
}
func init() {

View File

@@ -14,6 +14,7 @@
package sender
import (
"context"
"fmt"
"yunion.io/x/cloudmux/pkg/cloudprovider"
@@ -33,7 +34,7 @@ func (workwxRobotSender *SWorkwxRobotSender) GetSenderType() string {
return api.WORKWX_ROBOT
}
func (workwxRobotSender *SWorkwxRobotSender) Send(args api.SendParams) error {
func (workwxRobotSender *SWorkwxRobotSender) Send(ctx context.Context, args api.SendParams) error {
errs := []error{}
content := fmt.Sprintf("# %s\n\n%s", args.Title, args.Message)
mid := map[string]interface{}{
@@ -42,7 +43,7 @@ func (workwxRobotSender *SWorkwxRobotSender) Send(args api.SendParams) error {
"content": content,
},
}
req, err := sendRequest(args.Receivers.Contact, httputils.POST, nil, nil, jsonutils.Marshal(mid))
req, err := sendRequest(ctx, args.Receivers.Contact, httputils.POST, nil, nil, jsonutils.Marshal(mid))
if err != nil {
return errors.Wrap(err, "sendRequest")
}
@@ -56,11 +57,11 @@ func (workwxRobotSender *SWorkwxRobotSender) Send(args api.SendParams) error {
return errors.NewAggregate(errs)
}
func (workwxRobotSender *SWorkwxRobotSender) ValidateConfig(config api.NotifyConfig) (string, error) {
func (workwxRobotSender *SWorkwxRobotSender) ValidateConfig(ctx context.Context, config api.NotifyConfig) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
func (workwxRobotSender *SWorkwxRobotSender) ContactByMobile(mobile, domainId string) (string, error) {
func (workwxRobotSender *SWorkwxRobotSender) ContactByMobile(ctx context.Context, mobile, domainId string) (string, error) {
return "", cloudprovider.ErrNotImplemented
}
@@ -84,7 +85,7 @@ func (workwxRobotSender *SWorkwxRobotSender) IsSystemConfigContactType() bool {
return true
}
func (workwxRobotSender *SWorkwxRobotSender) GetAccessToken(key string) error {
func (workwxRobotSender *SWorkwxRobotSender) GetAccessToken(ctx context.Context, key string) error {
return nil
}

View File

@@ -16,6 +16,7 @@ package tasks
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
@@ -75,7 +76,7 @@ func (self *NotificationSendTask) OnInit(ctx context.Context, obj db.IStandalone
}
event, err := models.EventManager.GetEvent(notification.EventId)
if err != nil {
if !strings.Contains(err.Error(), "no rows in result set") {
if err != sql.ErrNoRows {
self.taskFailed(ctx, notification, errors.Wrapf(err, "GetEvent").Error(), true)
return
}
@@ -105,11 +106,6 @@ func (self *NotificationSendTask) OnInit(ctx context.Context, obj db.IStandalone
continue
}
// check contact enabled
enabled, err := receiver.IsEnabledContactType(notification.ContactType)
if err != nil {
logclient.AddSimpleActionLog(notification, logclient.ACT_SEND_NOTIFICATION, errors.Wrapf(err, "GetEnabledContactTypes"), self.GetUserCred(), false)
continue
}
if notification.ContactType == apis.WEBHOOK {
notification.ContactType = apis.WEBHOOK_ROBOT
}
@@ -117,6 +113,11 @@ func (self *NotificationSendTask) OnInit(ctx context.Context, obj db.IStandalone
robot := receiver.(*models.SRobot)
notification.ContactType = fmt.Sprintf("%s-robot", robot.Type)
}
enabled, err := receiver.IsEnabledContactType(notification.ContactType)
if err != nil {
logclient.AddSimpleActionLog(notification, logclient.ACT_SEND_NOTIFICATION, errors.Wrapf(err, "GetEnabledContactTypes"), self.GetUserCred(), false)
continue
}
driver := models.GetDriver(notification.ContactType)
if driver == nil || !enabled {
sendFail(&rns[i], fmt.Sprintf("disabled contactType %q", notification.ContactType))
@@ -202,7 +203,11 @@ func (self *NotificationSendTask) OnInit(ctx context.Context, obj db.IStandalone
continue
}
// send
p, err := notification.GetTemplate(ctx, event.TopicId, lang, nn)
topicId := ""
if event != nil {
topicId = event.TopicId
}
p, err := notification.GetTemplate(ctx, topicId, lang, nn)
if err != nil {
logclient.AddSimpleActionLog(notification, logclient.ACT_SEND_NOTIFICATION, errors.Wrapf(err, "FillWithTemplate(%s)", lang), self.GetUserCred(), false)
continue
@@ -210,7 +215,7 @@ func (self *NotificationSendTask) OnInit(ctx context.Context, obj db.IStandalone
if event != nil {
p.Event = event.Event
}
if notification.ContactType != apis.MOBILE {
if notification.ContactType != apis.MOBILE && notification.ContactType != apis.WEBHOOK_ROBOT {
switch lang {
case apis.TEMPLATE_LANG_CN:
p.Message += "\n来自 " + options.Options.ApiServer
@@ -274,7 +279,7 @@ func (notificationSendTask *NotificationSendTask) batchSend(ctx context.Context,
params.Header = robot.Header
params.Body = robot.Body
params.MsgKey = robot.MsgKey
err = driver.Send(params)
err = driver.Send(ctx, params)
if err != nil {
fails = append(fails, FailedReceiverSpec{ReceiverSpec: receivers[i], Reason: err.Error()})
}
@@ -294,7 +299,7 @@ func (notificationSendTask *NotificationSendTask) batchSend(ctx context.Context,
mobile := strings.Join(mobileArr, "")
params.Receivers.Contact = mobile
}
err = driver.Send(params)
err = driver.Send(ctx, params)
if err != nil {
fails = append(fails, FailedReceiverSpec{ReceiverSpec: receivers[i], Reason: err.Error()})
}
@@ -302,7 +307,7 @@ func (notificationSendTask *NotificationSendTask) batchSend(ctx context.Context,
receiver := receivers[i].receiver.(*models.SContact)
params.Receivers.Contact, _ = receiver.GetContact(notification.ContactType)
driver := models.GetDriver(notification.ContactType)
err = driver.Send(params)
err = driver.Send(ctx, params)
if err != nil {
fails = append(fails, FailedReceiverSpec{ReceiverSpec: receivers[i], Reason: err.Error()})
}

View File

@@ -105,7 +105,7 @@ func (self *SubcontactPullTask) OnInit(ctx context.Context, obj db.IStandaloneMo
if utils.IsInStringArray(contactType, PullContactType) {
content := ""
driver := models.GetDriver(contactType)
content, err = driver.ContactByMobile(mobile, self.UserCred.GetDomainId())
content, err = driver.ContactByMobile(ctx, mobile, self.UserCred.GetDomainId())
if err != nil {
var reason string
if errors.Cause(err) == apis.ErrNoSuchMobile {

View File

@@ -116,7 +116,7 @@ func (self *VerificationSendTask) OnInit(ctx context.Context, obj db.IStandalone
}
param.EmailMsg = emailMsg
driver := models.GetDriver(contactType)
err = driver.Send(param)
err = driver.Send(ctx, param)
// err = models.NotifyService.Send(ctx, contactType, param)
if err != nil {
self.taskFailed(ctx, receiver, err.Error())