feat: add new card status

This commit is contained in:
ihmily
2025-07-18 11:23:21 +08:00
parent 72a58c1620
commit 2649b6fc1e
11 changed files with 194 additions and 88 deletions

View File

@@ -220,10 +220,13 @@ class RecordingManager:
in_scheduled = utils.is_current_time_within_range(scheduled_time_range)
if not in_scheduled:
recording.status_info = RecordingStatus.NOT_IN_SCHEDULED_CHECK
recording.is_live = False
logger.info(f"Skip Detection: {recording.url} not in scheduled check range {scheduled_time_range}")
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
return
recording.is_checking = True
recording.status_info = RecordingStatus.MONITORING
platform, platform_key = get_platform_info(recording.url)
if platform and platform_key and (recording.platform is None or recording.platform_key is None):
@@ -268,7 +271,6 @@ class RecordingManager:
recording.is_live = stream_info.is_live
if recording.is_live and not recording.is_recording:
recording.status_info = RecordingStatus.PREPARING_RECORDING
recording.live_title = stream_info.title
if recording.streamer_name.strip() == self._["live_room"]:
recording.streamer_name = stream_info.anchor_name
@@ -296,6 +298,7 @@ class RecordingManager:
recording.notified_live_start = True
if not recording.only_notify_no_record:
recording.status_info = RecordingStatus.PREPARING_RECORDING
recording.loop_time_seconds = self.loop_time_seconds
self.start_update(recording)
self.app.page.run_task(recorder.start_recording, stream_info)
@@ -305,11 +308,12 @@ class RecordingManager:
recording.loop_time_seconds = int(notify_loop_time or 3600)
else:
recording.loop_time_seconds = self.loop_time_seconds
recording.status_info = RecordingStatus.NOT_RECORDING
recording.cumulative_duration = timedelta()
recording.last_duration = timedelta()
recording.status_info = RecordingStatus.LIVE_BROADCASTING
recording.is_checking = False
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
self.app.page.pubsub.send_others_on_topic("update", recording)
else:
if not recording.is_live and recording.notified_live_start:
recording.notified_live_start = False
@@ -327,10 +331,11 @@ class RecordingManager:
"display_title": title,
}
)
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
self.app.page.pubsub.send_others_on_topic("update", recording)
self.app.page.run_task(self.persist_recordings)
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
self.app.page.pubsub.send_others_on_topic("update", recording)
@staticmethod
def start_update(recording: Recording):
"""Start the recording process."""

View File

@@ -1,3 +1,15 @@
from enum import Enum
class CardStateType(Enum):
RECORDING = "recording"
ERROR = "error"
LIVE = "live"
OFFLINE = "offline"
STOPPED = "stopped"
UNKNOWN = "unknown"
class RecordingStatus:
STOPPED_MONITORING = "STOPPED_MONITORING"
MONITORING = "MONITORING"
@@ -9,6 +21,7 @@ class RecordingStatus:
RECORDING_ERROR = "RECORDING_ERROR"
NOT_RECORDING_SPACE = "NOT_RECORDING_SPACE"
LIVE_STATUS_CHECK_ERROR = "LIVE_STATUS_CHECK_ERROR"
LIVE_BROADCASTING = "LIVE_BROADCASTING"
@classmethod
def get_status(cls):

View File

@@ -9,6 +9,7 @@ from ...utils import utils
from ...utils.logger import logger
from ..views.storage_view import StoragePage
from .card_dialog import CardDialog
from .recording_card_state import RecordingCardState
from .recording_dialog import RecordingDialog
from .video_player import VideoPlayer
@@ -81,11 +82,7 @@ class RecordingCardManager:
on_click=lambda e, rec=recording: self.app.page.run_task(self.recording_delete_button_click, e, rec),
)
status_prefix = ""
if not recording.monitor_status:
status_prefix = f"[{self._['monitor_stopped']}] "
display_title = f"{status_prefix}{recording.title}"
display_title = RecordingCardState.get_display_title(recording, self._)
display_title_label = ft.Text(
display_title,
size=14,
@@ -94,7 +91,7 @@ class RecordingCardManager:
no_wrap=True,
overflow=ft.TextOverflow.ELLIPSIS,
expand=True,
weight=ft.FontWeight.BOLD if recording.is_recording or recording.is_live else None,
weight=RecordingCardState.get_title_weight(recording),
)
open_folder_button = ft.IconButton(
@@ -172,61 +169,27 @@ class RecordingCardManager:
@staticmethod
def get_card_border_color(recording: Recording):
"""Get the border color of the card."""
if recording.is_recording:
return ft.Colors.GREEN
elif recording.status_info in [
RecordingStatus.RECORDING_ERROR,
RecordingStatus.LIVE_STATUS_CHECK_ERROR
]:
return ft.Colors.RED
elif not recording.is_live and recording.monitor_status:
return ft.Colors.AMBER
elif not recording.monitor_status:
return ft.Colors.GREY
return ft.Colors.TRANSPARENT
return RecordingCardState.get_border_color(recording)
def create_status_label(self, recording: Recording):
if recording.is_recording:
return ft.Container(
content=ft.Text(self._["recording"], color=ft.Colors.WHITE, size=12, weight=ft.FontWeight.BOLD),
bgcolor=ft.Colors.GREEN,
border_radius=5,
padding=5,
width=60,
height=26,
alignment=ft.alignment.center,
)
elif recording.status_info in [RecordingStatus.RECORDING_ERROR, RecordingStatus.LIVE_STATUS_CHECK_ERROR]:
return ft.Container(
content=ft.Text(self._["recording_error"], color=ft.Colors.WHITE, size=12, weight=ft.FontWeight.BOLD),
bgcolor=ft.Colors.RED,
border_radius=5,
padding=5,
width=60,
height=26,
alignment=ft.alignment.center,
)
elif not recording.is_live and recording.monitor_status:
return ft.Container(
content=ft.Text(self._["offline"], color=ft.Colors.BLACK, size=12, weight=ft.FontWeight.BOLD),
bgcolor=ft.Colors.AMBER,
border_radius=5,
padding=5,
width=60,
height=26,
alignment=ft.alignment.center,
)
elif not recording.monitor_status:
return ft.Container(
content=ft.Text(self._["no_monitor"], color=ft.Colors.WHITE, size=12, weight=ft.FontWeight.BOLD),
bgcolor=ft.Colors.GREY,
border_radius=5,
padding=5,
width=60,
height=26,
alignment=ft.alignment.center,
)
return None
config = RecordingCardState.get_status_label_config(recording, self._)
if not config:
return None
return ft.Container(
content=ft.Text(
config["text"],
color=config["text_color"],
size=12,
weight=ft.FontWeight.BOLD
),
bgcolor=config["bgcolor"],
border_radius=5,
padding=5,
width=60,
height=26,
alignment=ft.alignment.center,
)
async def update_card(self, recording):
"""Update only the recordings cards in the scrollable content area."""
@@ -234,15 +197,10 @@ class RecordingCardManager:
try:
recording_card = self.cards_obj[recording.rec_id]
status_prefix = ""
if not recording.monitor_status:
status_prefix = f"[{self._['monitor_stopped']}] "
display_title = f"{status_prefix}{recording.title}"
display_title = RecordingCardState.get_display_title(recording, self._)
if recording_card.get("display_title_label"):
recording_card["display_title_label"].value = display_title
title_label_weight = ft.FontWeight.BOLD if recording.is_recording or recording.is_live else None
recording_card["display_title_label"].weight = title_label_weight
recording_card["display_title_label"].weight = RecordingCardState.get_title_weight(recording)
new_status_label = self.create_status_label(recording)
@@ -422,7 +380,7 @@ class RecordingCardManager:
@staticmethod
def get_icon_for_recording_state(recording: Recording):
"""Return the appropriate icon based on the recording's state."""
return ft.Icons.PLAY_CIRCLE if not recording.is_recording else ft.Icons.STOP_CIRCLE
return RecordingCardState.get_recording_icon(recording)
def get_tip_for_recording_state(self, recording: Recording):
return self._["stop_record"] if recording.is_recording else self._["start_record"]
@@ -430,7 +388,7 @@ class RecordingCardManager:
@staticmethod
def get_icon_for_monitor_state(recording: Recording):
"""Return the appropriate icon based on the monitor's state."""
return ft.Icons.VISIBILITY if recording.monitor_status else ft.Icons.VISIBILITY_OFF
return RecordingCardState.get_monitor_icon(recording)
def get_tip_for_monitor_state(self, recording: Recording):
return self._["stop_monitor"] if recording.monitor_status else self._["start_monitor"]

View File

@@ -0,0 +1,90 @@
import flet as ft
from ...models.recording_model import Recording
from ...models.recording_status_model import CardStateType, RecordingStatus
class RecordingCardState:
ERROR_STATUSES = [RecordingStatus.RECORDING_ERROR, RecordingStatus.LIVE_STATUS_CHECK_ERROR]
@staticmethod
def get_card_state(recording: Recording) -> CardStateType:
if recording.is_recording:
return CardStateType.RECORDING
elif recording.status_info in RecordingCardState.ERROR_STATUSES:
return CardStateType.ERROR
elif recording.is_live and recording.monitor_status and not recording.is_recording:
return CardStateType.LIVE
elif (not recording.is_live and recording.monitor_status and
recording.status_info != RecordingStatus.NOT_IN_SCHEDULED_CHECK):
return CardStateType.OFFLINE
elif (not recording.monitor_status or
recording.status_info == RecordingStatus.NOT_IN_SCHEDULED_CHECK):
return CardStateType.STOPPED
return CardStateType.UNKNOWN
@staticmethod
def get_border_color(recording: Recording) -> ft.Colors:
state = RecordingCardState.get_card_state(recording)
color_map = {
CardStateType.RECORDING: ft.Colors.GREEN,
CardStateType.ERROR: ft.Colors.RED,
CardStateType.LIVE: ft.Colors.BLUE,
CardStateType.OFFLINE: ft.Colors.AMBER,
CardStateType.STOPPED: ft.Colors.GREY,
}
return color_map.get(state, ft.Colors.TRANSPARENT)
@staticmethod
def get_status_label_config(recording: Recording, language_dict: dict) -> dict:
state = RecordingCardState.get_card_state(recording)
configs = {
CardStateType.RECORDING: {
"text": language_dict.get("recording"),
"bgcolor": ft.Colors.GREEN,
"text_color": ft.Colors.WHITE,
},
CardStateType.ERROR: {
"text": language_dict.get("recording_error"),
"bgcolor": ft.Colors.RED,
"text_color": ft.Colors.WHITE,
},
CardStateType.LIVE: {
"text": language_dict.get("live_broadcasting"),
"bgcolor": ft.Colors.BLUE,
"text_color": ft.Colors.WHITE,
},
CardStateType.OFFLINE: {
"text": language_dict.get("offline"),
"bgcolor": ft.Colors.AMBER,
"text_color": ft.Colors.BLACK,
},
CardStateType.STOPPED: {
"text": language_dict.get("no_monitor"),
"bgcolor": ft.Colors.GREY,
"text_color": ft.Colors.WHITE,
},
}
return configs.get(state, {})
@staticmethod
def get_display_title(recording: Recording, language_dict: dict) -> str:
status_prefix = ""
if not recording.monitor_status:
status_prefix = f"[{language_dict.get('monitor_stopped')}] "
return f"{status_prefix}{recording.title}"
@staticmethod
def get_title_weight(recording: Recording) -> ft.FontWeight:
return ft.FontWeight.BOLD if recording.is_recording or recording.is_live else None
@staticmethod
def get_recording_icon(recording: Recording) -> ft.Icons:
return ft.Icons.STOP_CIRCLE if recording.is_recording else ft.Icons.PLAY_CIRCLE
@staticmethod
def get_monitor_icon(recording: Recording) -> ft.Icons:
return ft.Icons.VISIBILITY if recording.monitor_status else ft.Icons.VISIBILITY_OFF

View File

@@ -35,7 +35,7 @@ class RecordingDialog:
video_segment_time = user_config.get('video_segment_time', 1800)
segment_record = initial_values.get("segment_record", segmented_recording_enabled)
segment_time = initial_values.get("segment_time", video_segment_time)
only_notify_no_record = user_config.get("only_notify_no_record", False)
only_notify_no_record = initial_values.get("only_notify_no_record", False)
async def on_url_change(_):
"""Enable or disable the submit button based on whether the URL field is filled."""

View File

@@ -11,6 +11,8 @@ class SearchDialog(ft.AlertDialog):
filter_name = self._["filter_all"]
if self.recordings_page.current_filter == "recording":
filter_name = self._["filter_recording"]
elif self.recordings_page.current_filter == "living":
filter_name = self._["filter_living"]
elif self.recordings_page.current_filter == "error":
filter_name = self._["filter_error"]
elif self.recordings_page.current_filter == "offline":

View File

@@ -1,29 +1,45 @@
from ...models.recording_status_model import RecordingStatus
from ..components.recording_card_state import RecordingCardState
class RecordingFilters:
ERROR_STATUSES = [RecordingStatus.RECORDING_ERROR, RecordingStatus.LIVE_STATUS_CHECK_ERROR]
@staticmethod
def _is_error_status(recording) -> bool:
return recording.status_info in RecordingCardState.ERROR_STATUSES
@staticmethod
def _is_live_status(recording) -> bool:
return (recording.is_live
and recording.monitor_status
and not recording.is_recording
and recording.status_info not in RecordingCardState.ERROR_STATUSES
and recording.status_info != RecordingStatus.NOT_IN_SCHEDULED_CHECK)
@staticmethod
def _is_offline_status(recording) -> bool:
return (not recording.is_live
and recording.monitor_status
and recording.status_info not in RecordingCardState.ERROR_STATUSES
and recording.status_info != RecordingStatus.NOT_IN_SCHEDULED_CHECK)
@staticmethod
def _is_stopped_status(recording) -> bool:
return (not recording.monitor_status
or recording.status_info == RecordingStatus.NOT_IN_SCHEDULED_CHECK)
STATUS_FILTER_MAP = {
"all": lambda rec: True,
"recording": lambda rec: rec.is_recording,
"error": lambda rec: rec.status_info in RecordingFilters.ERROR_STATUSES,
"offline": lambda rec: (
not rec.is_live
and rec.monitor_status
and rec.status_info not in RecordingFilters.ERROR_STATUSES
),
"stopped": lambda rec: not rec.monitor_status,
"living": lambda rec: RecordingFilters._is_live_status(rec),
"error": lambda rec: RecordingFilters._is_error_status(rec),
"offline": lambda rec: RecordingFilters._is_offline_status(rec),
"stopped": lambda rec: RecordingFilters._is_stopped_status(rec)
}
@classmethod
def get_status_filter_result(cls, recording, filter_type) -> bool:
filter_func = cls.STATUS_FILTER_MAP.get(
filter_type,
lambda _: False
)
filter_func = cls.STATUS_FILTER_MAP.get(filter_type, lambda _: False)
return filter_func(recording)
@classmethod

View File

@@ -384,6 +384,9 @@ class HomePage(PageBase):
if rec.status == "recording":
status_icon = ft.Icons.CIRCLE
status_color = ft.Colors.GREEN
elif rec.status == "living":
status_icon = ft.Icons.LIVE_TV
status_color = ft.Colors.BLUE
elif rec.status == "error":
status_icon = ft.Icons.ERROR_OUTLINE
status_color = ft.Colors.RED

View File

@@ -214,6 +214,15 @@ class RecordingsPage(PageBase):
shape=ft.RoundedRectangleBorder(radius=5),
),
),
ft.ElevatedButton(
self._["filter_living"],
on_click=self.filter_living_on_click,
bgcolor=ft.Colors.BLUE if self.current_filter == "living" else None,
color=ft.Colors.WHITE if self.current_filter == "living" else None,
style=ft.ButtonStyle(
shape=ft.RoundedRectangleBorder(radius=5),
),
),
ft.ElevatedButton(
self._["filter_offline"],
on_click=self.filter_offline_on_click,
@@ -328,6 +337,10 @@ class RecordingsPage(PageBase):
self.current_filter = "recording"
await self.apply_filter()
async def filter_living_on_click(self, _):
self.current_filter = "living"
await self.apply_filter()
async def filter_error_on_click(self, _):
self.current_filter = "error"
await self.apply_filter()

View File

@@ -70,6 +70,7 @@
"status_filter": "Status Filter",
"filter_all": "All",
"filter_recording": "Recording",
"filter_living": "Living",
"filter_error": "Error",
"filter_offline": "Offline",
"filter_stopped": "Not Monitored",
@@ -130,6 +131,7 @@
"RECORDING_ERROR": "Recording the live stream has failed",
"NOT_RECORDING_SPACE": "Insufficient disk space to record",
"LIVE_STATUS_CHECK_ERROR": "Live status error, check address accessibility",
"LIVE_BROADCASTING": "Live Broadcasting",
"not_disk_space_tip": "Insufficient disk storage space, stop recording ⚠️"
},
"stream_manager": {
@@ -177,6 +179,7 @@
"all": "All",
"recording": "Recording",
"recording_error": "Recording Error",
"live_broadcasting": "Live Broadcasting",
"not_live": "Not Live",
"stopped": "Stopped",
"filter": "Filter",

View File

@@ -71,6 +71,7 @@
"status_filter": "状态筛选",
"filter_all": "全部",
"filter_recording": "录制中",
"filter_living": "直播中",
"filter_error": "录制错误",
"filter_offline": "未开播",
"filter_stopped": "未监控",
@@ -132,6 +133,7 @@
"RECORDING_ERROR": "直播录制失败, 等待重试",
"NOT_RECORDING_SPACE": "磁盘空间不足, 无法录制",
"LIVE_STATUS_CHECK_ERROR": "直播状态检测错误, 请检查地址是否可正常访问",
"LIVE_BROADCASTING": "正在直播中",
"not_disk_space_tip": "磁盘存储空间不足, 停止录制 ⚠️"
},
"stream_manager": {
@@ -179,6 +181,7 @@
"all": "全部",
"recording": "录制中",
"recording_error": "录制错误",
"live_broadcasting": "正在直播中",
"not_live": "未开播",
"stopped": "已停止",
"filter": "筛选",