mirror of
https://github.com/ihmily/StreamCap.git
synced 2026-05-07 05:57:45 +08:00
fix: optimize message push status log
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
from asyncio import create_task
|
||||
from typing import Optional
|
||||
|
||||
from ..models.recording.recording_model import Recording
|
||||
@@ -82,86 +81,79 @@ class MessagePusher:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def log_push_result(service_name: str, result: dict) -> None:
|
||||
if result.get("success"):
|
||||
logger.info(f"Push {service_name} message successfully: {result['success']}")
|
||||
if result.get("error") or (not result.get("success") and not result.get("error")):
|
||||
logger.error(f"Push {service_name} message failed: {result['error']}")
|
||||
|
||||
async def push_messages(self, msg_title: str, push_content: str) -> None:
|
||||
"""Push messages to all enabled notification services"""
|
||||
if self.settings.user_config.get("dingtalk_enabled"):
|
||||
create_task(
|
||||
self.notifier.send_to_dingtalk(
|
||||
url=self.settings.user_config.get("dingtalk_webhook_url"),
|
||||
content=push_content,
|
||||
number=self.settings.user_config.get("dingtalk_at_objects"),
|
||||
is_atall=self.settings.user_config.get("dingtalk_at_all"),
|
||||
)
|
||||
result = await self.notifier.send_to_dingtalk(
|
||||
url=self.settings.user_config.get("dingtalk_webhook_url"),
|
||||
content=push_content,
|
||||
number=self.settings.user_config.get("dingtalk_at_objects"),
|
||||
is_atall=self.settings.user_config.get("dingtalk_at_all"),
|
||||
)
|
||||
logger.info("Push DingTalk message successfully")
|
||||
self.log_push_result("DingTalk", result)
|
||||
|
||||
if self.settings.user_config.get("wechat_enabled"):
|
||||
create_task(
|
||||
self.notifier.send_to_wechat(
|
||||
url=self.settings.user_config.get("wechat_webhook_url"), title=msg_title, content=push_content
|
||||
)
|
||||
result = await self.notifier.send_to_wechat(
|
||||
url=self.settings.user_config.get("wechat_webhook_url"), title=msg_title, content=push_content
|
||||
)
|
||||
logger.info("Push Wechat message successfully")
|
||||
self.log_push_result("Wechat", result)
|
||||
|
||||
if self.settings.user_config.get("bark_enabled"):
|
||||
create_task(
|
||||
self.notifier.send_to_bark(
|
||||
api=self.settings.user_config.get("bark_webhook_url"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
level=self.settings.user_config.get("bark_interrupt_level"),
|
||||
sound=self.settings.user_config.get("bark_sound"),
|
||||
)
|
||||
result = await self.notifier.send_to_bark(
|
||||
api=self.settings.user_config.get("bark_webhook_url"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
level=self.settings.user_config.get("bark_interrupt_level"),
|
||||
sound=self.settings.user_config.get("bark_sound"),
|
||||
)
|
||||
logger.info("Push Bark message successfully")
|
||||
self.log_push_result("Bark", result)
|
||||
|
||||
if self.settings.user_config.get("ntfy_enabled"):
|
||||
create_task(
|
||||
self.notifier.send_to_ntfy(
|
||||
api=self.settings.user_config.get("ntfy_server_url"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
tags=self.settings.user_config.get("ntfy_tags"),
|
||||
action_url=self.settings.user_config.get("ntfy_action_url"),
|
||||
email=self.settings.user_config.get("ntfy_email"),
|
||||
)
|
||||
result = await self.notifier.send_to_ntfy(
|
||||
api=self.settings.user_config.get("ntfy_server_url"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
tags=self.settings.user_config.get("ntfy_tags"),
|
||||
action_url=self.settings.user_config.get("ntfy_action_url"),
|
||||
email=self.settings.user_config.get("ntfy_email"),
|
||||
)
|
||||
logger.info("Push Ntfy message successfully")
|
||||
self.log_push_result("Ntfy", result)
|
||||
|
||||
if self.settings.user_config.get("telegram_enabled"):
|
||||
create_task(
|
||||
self.notifier.send_to_telegram(
|
||||
chat_id=self.settings.user_config.get("telegram_chat_id"),
|
||||
token=self.settings.user_config.get("telegram_api_token"),
|
||||
content=push_content,
|
||||
proxy=self._get_proxy(),
|
||||
)
|
||||
result = await self.notifier.send_to_telegram(
|
||||
chat_id=self.settings.user_config.get("telegram_chat_id"),
|
||||
token=self.settings.user_config.get("telegram_api_token"),
|
||||
content=push_content,
|
||||
proxy=self._get_proxy(),
|
||||
)
|
||||
logger.info("Push Telegram message successfully")
|
||||
self.log_push_result("Telegram", result)
|
||||
|
||||
if self.settings.user_config.get("email_enabled"):
|
||||
create_task(
|
||||
self.notifier.send_to_email(
|
||||
email_host=self.settings.user_config.get("smtp_server"),
|
||||
login_email=self.settings.user_config.get("email_username"),
|
||||
password=self.settings.user_config.get("email_password"),
|
||||
sender_email=self.settings.user_config.get("sender_email"),
|
||||
sender_name=self.settings.user_config.get("sender_name"),
|
||||
to_email=self.settings.user_config.get("recipient_email"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
)
|
||||
result = await self.notifier.send_to_email(
|
||||
email_host=self.settings.user_config.get("smtp_server"),
|
||||
login_email=self.settings.user_config.get("email_username"),
|
||||
password=self.settings.user_config.get("email_password"),
|
||||
sender_email=self.settings.user_config.get("sender_email"),
|
||||
sender_name=self.settings.user_config.get("sender_name"),
|
||||
to_email=self.settings.user_config.get("recipient_email"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
)
|
||||
logger.info("Push Email message successfully")
|
||||
self.log_push_result("Email", result)
|
||||
|
||||
if self.settings.user_config.get("serverchan_enabled"):
|
||||
create_task(
|
||||
self.notifier.send_to_serverchan(
|
||||
sendkey=self.settings.user_config.get("serverchan_sendkey"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
channel=self.settings.user_config.get("serverchan_channel", 9),
|
||||
tags=self.settings.user_config.get("serverchan_tags", "直播通知"),
|
||||
)
|
||||
result = await self.notifier.send_to_serverchan(
|
||||
sendkey=self.settings.user_config.get("serverchan_sendkey"),
|
||||
title=msg_title,
|
||||
content=push_content,
|
||||
channel=self.settings.user_config.get("serverchan_channel", 9),
|
||||
tags=self.settings.user_config.get("serverchan_tags", "Live Status Update"),
|
||||
)
|
||||
logger.info("Push ServerChan message successfully")
|
||||
self.log_push_result("ServerChan", result)
|
||||
|
||||
@@ -71,7 +71,7 @@ class NotificationService:
|
||||
open_ssl: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
receivers = to_email.replace(",", ",").split(",") if to_email.strip() else []
|
||||
|
||||
results = {"success": [], "error": []}
|
||||
try:
|
||||
message = MIMEMultipart()
|
||||
send_name = base64.b64encode(sender_name.encode("utf-8")).decode()
|
||||
@@ -91,10 +91,12 @@ class NotificationService:
|
||||
smtp_obj = smtplib.SMTP(email_host, int(smtp_port))
|
||||
smtp_obj.login(login_email, password)
|
||||
smtp_obj.sendmail(sender_email, receivers, message.as_string())
|
||||
return {"success": receivers, "error": []}
|
||||
results["success"] = receivers
|
||||
|
||||
except smtplib.SMTPException as e:
|
||||
logger.info(f"Email push failed, push email: {to_email}, Error message: {e}")
|
||||
return {"success": [], "error": receivers}
|
||||
results["error"] = receivers
|
||||
return results
|
||||
|
||||
async def send_to_telegram(
|
||||
self, chat_id: int, token: str, content: str, proxy: Optional[str] = None) -> dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user