fix: optimize recording manager

This commit is contained in:
ihmily
2025-08-22 15:27:06 +08:00
parent 2015b63075
commit bab6cbe4cc
8 changed files with 270 additions and 163 deletions

View File

@@ -30,6 +30,7 @@ class RecordingManager:
self.initialize_dynamic_state()
max_concurrent = int(self.settings.user_config.get("platform_max_concurrent_requests", 3))
self.platform_semaphores = defaultdict(lambda: asyncio.Semaphore(max_concurrent))
self.active_recorders = {}
@property
def recordings(self):
@@ -103,16 +104,21 @@ class RecordingManager:
Start monitoring a single recording if it is not already being monitored.
"""
if not recording.monitor_status:
recording.is_checking = True
recording.is_live = False
await self._update_recording(
recording=recording,
monitor_status=True,
display_title=recording.title,
status_info=RecordingStatus.MONITORING,
status_info=RecordingStatus.STATUS_CHECKING,
selected=False,
)
self.app.page.run_task(self.check_if_live, recording)
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
self.app.page.pubsub.send_others_on_topic("update", recording)
self.app.page.run_task(self.check_if_live, recording)
if auto_save:
self.app.page.run_task(self.persist_recordings)
@@ -203,147 +209,161 @@ class RecordingManager:
async def check_if_live(self, recording: Recording):
"""Check if the live stream is available, fetch stream data and update is_live status."""
if recording.is_recording:
recording.manually_stopped = False
if recording.is_recording or recording.stopping_in_progress:
logger.debug(f"Skip check_if_live because recording is busy: {recording.url}")
return
if recording.rec_id in self.active_recorders:
logger.debug(f"Skip check_if_live because recorder is active: {recording.url}")
return
if not recording.monitor_status:
recording.display_title = f"[{self._['monitor_stopped']}] {recording.title}"
recording.status_info = RecordingStatus.STOPPED_MONITORING
recording.is_checking = False
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
return
elif not recording.is_checking:
if not recording.is_checking:
recording.status_info = RecordingStatus.STATUS_CHECKING
recording.detection_time = datetime.now().time()
if recording.scheduled_recording and recording.scheduled_start_time and recording.monitor_hours:
scheduled_time_range = await self.get_scheduled_time_range(
recording.scheduled_start_time, recording.monitor_hours)
recording.scheduled_time_range = scheduled_time_range
in_scheduled = utils.is_current_time_within_range(scheduled_time_range)
if not in_scheduled:
recording.status_info = RecordingStatus.NOT_IN_SCHEDULED_CHECK
recording.is_live = False
logger.info(f"Skip Detection: {recording.url} not in scheduled check range {scheduled_time_range}")
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
return
recording.is_checking = True
recording.status_info = RecordingStatus.MONITORING
platform, platform_key = get_platform_info(recording.url)
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
if platform and platform_key and (recording.platform is None or recording.platform_key is None):
recording.platform = platform
recording.platform_key = platform_key
if recording.scheduled_recording and recording.scheduled_start_time and recording.monitor_hours:
scheduled_time_range = await self.get_scheduled_time_range(
recording.scheduled_start_time, recording.monitor_hours)
recording.scheduled_time_range = scheduled_time_range
in_scheduled = utils.is_current_time_within_range(scheduled_time_range)
if not in_scheduled:
recording.status_info = RecordingStatus.NOT_IN_SCHEDULED_CHECK
recording.is_live = False
recording.is_checking = False
logger.info(f"Skip Detection: {recording.url} not in scheduled check range {scheduled_time_range}")
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
return
recording.status_info = RecordingStatus.STATUS_CHECKING
platform, platform_key = get_platform_info(recording.url)
if platform and platform_key and (recording.platform is None or recording.platform_key is None):
recording.platform = platform
recording.platform_key = platform_key
self.app.page.run_task(self.persist_recordings)
if self.settings.user_config["language"] != "zh_CN":
platform = platform_key
output_dir = self.settings.get_video_save_path()
await self.check_free_space(output_dir)
if not self.app.recording_enabled:
recording.is_checking = False
recording.status_info = RecordingStatus.NOT_RECORDING_SPACE
return
recording_info = {
"platform": platform,
"platform_key": platform_key,
"live_url": recording.url,
"output_dir": output_dir,
"segment_record": recording.segment_record,
"segment_time": recording.segment_time,
"save_format": recording.record_format,
"quality": recording.quality,
}
semaphore = self.platform_semaphores[platform_key]
recorder = LiveStreamRecorder(self.app, recording, recording_info)
async with semaphore:
stream_info = await recorder.fetch_stream()
logger.info(f"Stream Data: {stream_info}")
if not stream_info or not stream_info.anchor_name:
logger.error(f"Fetch stream data failed: {recording.url}")
recording.is_checking = False
recording.status_info = RecordingStatus.LIVE_STATUS_CHECK_ERROR
if recording.monitor_status:
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
self.app.page.pubsub.send_others_on_topic("update", recording)
return
if self.settings.user_config.get("remove_emojis"):
stream_info.anchor_name = utils.clean_name(stream_info.anchor_name, self._["live_room"])
if stream_info.is_live:
recording.live_title = stream_info.title
if recording.streamer_name.strip() == self._["live_room"]:
recording.streamer_name = stream_info.anchor_name
recording.title = f"{recording.streamer_name} - {self._[recording.quality]}"
recording.display_title = f"[{self._['is_live']}] {recording.title}"
if not recording.is_live:
recording.is_live = stream_info.is_live
recording.notified_live_start = False
recording.notified_live_end = False
if desktop_notify.should_push_notification(self.app):
desktop_notify.send_notification(
title=self._["notify"],
message=recording.streamer_name + ' | ' + self._["live_recording_started_message"],
app_icon=self.app.tray_manager.icon_path
)
msg_manager = message_pusher.MessagePusher(self.settings)
user_config = self.settings.user_config
if (msg_manager.should_push_message(self.settings, recording, message_type='start')
and not recording.notified_live_start):
push_content = self._["push_content"]
begin_push_message_text = user_config.get("custom_stream_start_content")
if begin_push_message_text:
push_content = begin_push_message_text
push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
push_content = push_content.replace("[room_name]", recording.streamer_name).replace(
"[time]", push_at
)
msg_title = user_config.get("custom_notification_title").strip()
msg_title = msg_title or self._["status_notify"]
self.app.page.run_task(msg_manager.push_messages, msg_title, push_content)
recording.notified_live_start = True
if not recording.only_notify_no_record:
recording.status_info = RecordingStatus.PREPARING_RECORDING
recording.loop_time_seconds = self.loop_time_seconds
self.start_update(recording)
self.app.page.run_task(recorder.start_recording, stream_info)
else:
if recording.notified_live_start:
notify_loop_time = user_config.get("notify_loop_time")
recording.loop_time_seconds = int(notify_loop_time or 600)
else:
recording.loop_time_seconds = self.loop_time_seconds
recording.cumulative_duration = timedelta()
recording.last_duration = timedelta()
recording.status_info = RecordingStatus.LIVE_BROADCASTING
else:
recording.is_recording = False
if recording.is_live:
recording.is_live = False
self.app.page.run_task(recorder.end_message_push)
recording.status_info = RecordingStatus.MONITORING
title = f"{stream_info.anchor_name or recording.streamer_name} - {self._[recording.quality]}"
if recording.streamer_name == self._["live_room"] or \
f"[{self._['is_live']}]" in recording.display_title:
recording.update(
{
"streamer_name": stream_info.anchor_name,
"title": title,
"display_title": title,
}
)
self.app.page.run_task(self.persist_recordings)
if self.settings.user_config["language"] != "zh_CN":
platform = platform_key
output_dir = self.settings.get_video_save_path()
await self.check_free_space(output_dir)
if not self.app.recording_enabled:
recording.is_checking = False
recording.status_info = RecordingStatus.NOT_RECORDING_SPACE
return
recording_info = {
"platform": platform,
"platform_key": platform_key,
"live_url": recording.url,
"output_dir": output_dir,
"segment_record": recording.segment_record,
"segment_time": recording.segment_time,
"save_format": recording.record_format,
"quality": recording.quality,
}
semaphore = self.platform_semaphores[platform_key]
recorder = LiveStreamRecorder(self.app, recording, recording_info)
async with semaphore:
stream_info = await recorder.fetch_stream()
logger.info(f"Stream Data: {stream_info}")
if not stream_info or not stream_info.anchor_name:
logger.error(f"Fetch stream data failed: {recording.url}")
recording.is_checking = False
recording.status_info = RecordingStatus.LIVE_STATUS_CHECK_ERROR
if recording.monitor_status:
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
return
if self.settings.user_config.get("remove_emojis"):
stream_info.anchor_name = utils.clean_name(stream_info.anchor_name, self._["live_room"])
if stream_info.is_live:
recording.live_title = stream_info.title
if recording.streamer_name.strip() == self._["live_room"]:
recording.streamer_name = stream_info.anchor_name
recording.title = f"{recording.streamer_name} - {self._[recording.quality]}"
recording.display_title = f"[{self._['is_live']}] {recording.title}"
if not recording.is_live:
recording.is_live = stream_info.is_live
recording.notified_live_start = False
recording.notified_live_end = False
if desktop_notify.should_push_notification(self.app):
desktop_notify.send_notification(
title=self._["notify"],
message=recording.streamer_name + ' | ' + self._["live_recording_started_message"],
app_icon=self.app.tray_manager.icon_path
)
msg_manager = message_pusher.MessagePusher(self.settings)
user_config = self.settings.user_config
if (msg_manager.should_push_message(self.settings, recording, message_type='start')
and not recording.notified_live_start):
push_content = self._["push_content"]
begin_push_message_text = user_config.get("custom_stream_start_content")
if begin_push_message_text:
push_content = begin_push_message_text
push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
push_content = push_content.replace("[room_name]", recording.streamer_name).replace(
"[time]", push_at
)
msg_title = user_config.get("custom_notification_title").strip()
msg_title = msg_title or self._["status_notify"]
self.app.page.run_task(msg_manager.push_messages, msg_title, push_content)
recording.notified_live_start = True
if not recording.only_notify_no_record:
recording.status_info = RecordingStatus.PREPARING_RECORDING
recording.loop_time_seconds = self.loop_time_seconds
self.start_update(recording)
self.app.page.run_task(recorder.start_recording, stream_info)
else:
if recording.notified_live_start:
notify_loop_time = user_config.get("notify_loop_time")
recording.loop_time_seconds = int(notify_loop_time or 600)
else:
recording.loop_time_seconds = self.loop_time_seconds
recording.cumulative_duration = timedelta()
recording.last_duration = timedelta()
recording.status_info = RecordingStatus.LIVE_BROADCASTING
else:
if recording.is_live:
recording.is_live = False
self.app.page.run_task(recorder.end_message_push)
recording.status_info = RecordingStatus.MONITORING
title = f"{stream_info.anchor_name or recording.streamer_name} - {self._[recording.quality]}"
if recording.streamer_name == self._["live_room"] or \
f"[{self._['is_live']}]" in recording.display_title:
recording.update(
{
"streamer_name": stream_info.anchor_name,
"title": title,
"display_title": title,
}
)
self.app.page.run_task(self.persist_recordings)
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
self.app.page.pubsub.send_others_on_topic("update", recording)
recording.is_checking = False
recording.is_checking = False
self.app.page.run_task(self.app.record_card_manager.update_card, recording)
self.app.page.pubsub.send_others_on_topic("update", recording)
@staticmethod
def start_update(recording: Recording):
@@ -360,11 +380,26 @@ class RecordingManager:
)
logger.info(f"Started recording for {recording.title}")
@staticmethod
def stop_recording(recording: Recording, manually_stopped: bool = True):
def stop_recording(self, recording: Recording, manually_stopped: bool = True):
"""Stop the recording process."""
recording.is_live = False
if recording.is_recording:
recording.stopping_in_progress = True
logger.info(f"Trying to stop recorder for {recording.rec_id}, title: {recording.title}")
logger.debug(f"Active recorders: {list(self.active_recorders.keys())}")
if recording.rec_id in self.active_recorders:
recorder = self.active_recorders[recording.rec_id]
logger.debug(f"Found recorder instance - id: {id(recorder)}")
recorder.request_stop()
logger.info(f"Requested stop for recorder: {recording.rec_id}")
else:
logger.warning(f"No active recorder found for {recording.rec_id}, cannot request stop")
recording.force_stop = True
logger.info(f"Set force_stop=True for recording: {recording.rec_id}")
if recording.start_time is not None:
elapsed = datetime.now() - recording.start_time
# Add the elapsed time to the cumulative duration.
@@ -374,8 +409,11 @@ class RecordingManager:
recording.start_time = None
recording.is_recording = False
recording.manually_stopped = manually_stopped
recording.status_info = RecordingStatus.NOT_RECORDING
logger.info(f"Stopped recording for {recording.title}")
self.app.page.run_task(self._reset_stopping_flag, recording)
def get_duration(self, recording: Recording):
"""Get the duration of the current recording session in a formatted string."""
if recording.is_recording and recording.start_time is not None:
@@ -424,3 +462,9 @@ class RecordingManager:
end_time = utils.add_hours_to_time(scheduled_start_time, monitor_hours)
scheduled_time_range = f"{scheduled_start_time}~{end_time}"
return scheduled_time_range
@staticmethod
async def _reset_stopping_flag(recording: Recording):
await asyncio.sleep(2)
recording.stopping_in_progress = False
logger.debug(f"Reset stopping_in_progress flag for recording: {recording.rec_id}")

View File

@@ -31,6 +31,7 @@ class LiveStreamRecorder:
self.recording = recording
self.recording_info = recording_info
self.subprocess_start_info = app.subprocess_start_up_info
self.should_stop = False
self.user_config = self.settings.user_config
self.account_config = self.settings.accounts_config
@@ -226,6 +227,21 @@ class LiveStreamRecorder:
record_url = self._get_record_url(stream_info)
self.set_preview_url(stream_info)
try:
if self.recording.rec_id in self.app.record_manager.active_recorders:
old_recorder = self.app.record_manager.active_recorders[self.recording.rec_id]
logger.warning(
f"Found existing recorder instance for {self.recording.rec_id}, id: {id(old_recorder)}, stopping it"
)
old_recorder.request_stop()
await asyncio.sleep(1)
self.app.record_manager.active_recorders[self.recording.rec_id] = self
logger.info(f"Saved recorder instance for {self.recording.rec_id}, id: {id(self)}")
except Exception as e:
logger.error(f"Failed to save recorder instance: {e}")
if use_direct_download:
logger.info(f"Use Direct Downloader to Download FLV Stream: {record_url}")
headers = {}
@@ -284,6 +300,9 @@ class LiveStreamRecorder:
The child process executes ffmpeg for recording
"""
logger.info(f"Starting ffmpeg recording - recorder id: {id(self)}, rec_id: {self.recording.rec_id}")
self.should_stop = False
try:
save_file_path = ffmpeg_command[-1]
@@ -302,30 +321,33 @@ class LiveStreamRecorder:
logger.log("STREAM", f"Recording Stream URL: {record_url}")
while True:
if not self.recording.is_recording or not self.app.recording_enabled:
if self.should_stop or self.recording.force_stop or not self.app.recording_enabled:
logger.info(f"Preparing to End Recording: {live_url}")
if os.name == "nt":
if process.stdin:
process.stdin.write(b"q")
await process.stdin.drain()
await asyncio.sleep(5)
else:
import signal
process.send_signal(signal.SIGINT)
# process.terminate()
await asyncio.sleep(5)
if process.stdin:
process.stdin.close()
try:
if os.name == "nt":
if process.stdin:
process.stdin.write(b"q")
await process.stdin.drain()
await asyncio.sleep(5)
else:
import signal
process.send_signal(signal.SIGINT)
# process.terminate()
await asyncio.sleep(5)
if process.stdin:
process.stdin.close()
await asyncio.wait_for(process.wait(), timeout=15.0)
except asyncio.TimeoutError:
logger.warning(f"FFmpeg process did not exit gracefully, forcing termination: {live_url}")
process.kill()
await process.wait()
self.recording.force_stop = False
break
if process.returncode is not None:
logger.info(f"Exit loop recording (normal 0 | abnormal 1): code={process.returncode}, {live_url}")
break
@@ -358,13 +380,19 @@ class LiveStreamRecorder:
display_title = self.recording.display_title
self.recording.live_title = None
if not self.recording.is_recording:
if self.recording.manually_stopped:
logger.success(f"Live recording has stopped: {record_name}")
else:
logger.success(f"Live recording completed: {record_name}")
self.app.page.run_task(self.end_message_push)
self.recording.is_recording = False
try:
if self.recording.rec_id in self.app.record_manager.active_recorders:
del self.app.record_manager.active_recorders[self.recording.rec_id]
logger.info(f"Removed recorder from active_recorders: {self.recording.rec_id}")
except Exception as e:
logger.error(f"Failed to remove recorder instance: {e}")
try:
self.recording.update({"display_title": display_title})
self.app.page.run_task(self.app.record_card_manager.update_card, self.recording)
@@ -622,6 +650,10 @@ class LiveStreamRecorder:
"""
Use the direct downloader to download the live stream
"""
logger.info(f"Starting direct download - recorder id: {id(self)}, rec_id: {self.recording.rec_id}")
self.should_stop = False
try:
await self.direct_downloader.start_download()
@@ -631,9 +663,10 @@ class LiveStreamRecorder:
logger.log("STREAM", f"Direct Download Stream URL: {record_url}")
while True:
if not self.recording.is_recording or not self.app.recording_enabled:
if self.should_stop or self.recording.force_stop or not self.app.recording_enabled:
logger.info(f"Prepare to end direct download: {live_url}")
await self.direct_downloader.stop_download()
self.recording.force_stop = False
break
await asyncio.sleep(1)
@@ -649,12 +682,19 @@ class LiveStreamRecorder:
display_title = self.recording.display_title
self.recording.live_title = None
if not self.recording.is_recording:
if self.recording.manually_stopped:
logger.success(f"Direct Downloading Stopped: {record_name}")
else:
logger.success(f"Direct Downloading Completed: {record_name}")
self.app.page.run_task(self.end_message_push)
self.recording.is_recording = False
try:
if self.recording.rec_id in self.app.record_manager.active_recorders:
del self.app.record_manager.active_recorders[self.recording.rec_id]
logger.info(f"Removed recorder from active_recorders: {self.recording.rec_id}")
except Exception as e:
logger.error(f"Failed to remove recorder instance: {e}")
if self.app.recording_enabled and not self.is_flv_preferred_platform:
self.app.page.run_task(self.app.record_manager.check_if_live, self.recording)
@@ -741,3 +781,12 @@ class LiveStreamRecorder:
msg_title = msg_title or self._["status_notify"]
self.app.page.run_task(msg_manager.push_messages, msg_title, push_content)
def request_stop(self):
logger.info(f"Stop requested for recorder: {self.recording.url}, rec_id: {self.recording.rec_id}")
logger.info(f"Recorder instance details - id: {id(self)}, recording: {self.recording.title}")
old_value = self.should_stop
self.should_stop = True
logger.info(f"Set should_stop from {old_value} to {self.should_stop} for recorder: {self.recording.rec_id}")

View File

@@ -62,6 +62,9 @@ class Recording:
self.is_recording = False
self.start_time = None
self.manually_stopped = False
self.force_stop = False
self.stopping_in_progress = False
self.stop_requested = False
self.platform = None
self.platform_key = None
self.notified_live_start = False

View File

@@ -7,6 +7,7 @@ class CardStateType(Enum):
LIVE = "live"
OFFLINE = "offline"
STOPPED = "stopped"
CHECKING = "checking"
UNKNOWN = "unknown"

View File

@@ -266,7 +266,7 @@ class RecordingCardManager:
recording.update(
{
"monitor_status": not recording.monitor_status,
"status_info": RecordingStatus.MONITORING,
"status_info": RecordingStatus.STATUS_CHECKING,
"display_title": f"{recording.title}",
}
)
@@ -462,8 +462,8 @@ class RecordingCardManager:
try:
delete_alert_dialog.open = False
delete_alert_dialog.update()
except (ft.core.page.PageDisconnectedException, AssertionError) as e:
logger.debug(f"Close delete dialog failed: {e}")
except (ft.core.page.PageDisconnectedException, AssertionError) as err:
logger.debug(f"Close delete dialog failed: {err}")
delete_alert_dialog = ft.AlertDialog(
title=ft.Text(self._["confirm"]),
@@ -525,4 +525,4 @@ class RecordingCardManager:
await self.update_card(recording)
async def subscribe_remove_cards(self, _, recordings: list[Recording]):
await self.remove_recording_card(recordings)
await self.remove_recording_card(recordings)

View File

@@ -14,9 +14,11 @@ class RecordingCardState:
return CardStateType.RECORDING
elif recording.status_info in RecordingCardState.ERROR_STATUSES:
return CardStateType.ERROR
elif recording.is_checking:
return CardStateType.CHECKING
elif recording.is_live and recording.monitor_status and not recording.is_recording:
return CardStateType.LIVE
elif (not recording.is_live and recording.monitor_status and
elif (not recording.is_live and recording.monitor_status and
recording.status_info != RecordingStatus.NOT_IN_SCHEDULED_CHECK):
return CardStateType.OFFLINE
elif (not recording.monitor_status or
@@ -33,6 +35,7 @@ class RecordingCardState:
CardStateType.LIVE: ft.Colors.BLUE,
CardStateType.OFFLINE: ft.Colors.AMBER,
CardStateType.STOPPED: ft.Colors.GREY,
CardStateType.CHECKING: ft.Colors.PURPLE,
}
return color_map.get(state, ft.Colors.TRANSPARENT)
@@ -66,6 +69,11 @@ class RecordingCardState:
"bgcolor": ft.Colors.GREY,
"text_color": ft.Colors.WHITE,
},
CardStateType.CHECKING: {
"text": language_dict.get("checking"),
"bgcolor": ft.Colors.PURPLE,
"text_color": ft.Colors.WHITE,
},
}
return configs.get(state, {})
@@ -79,7 +87,7 @@ class RecordingCardState:
@staticmethod
def get_title_weight(recording: Recording) -> ft.FontWeight:
return ft.FontWeight.BOLD if recording.is_recording or recording.is_live else None
return ft.FontWeight.BOLD if recording.is_recording or recording.is_live or recording.is_checking else None
@staticmethod
def get_recording_icon(recording: Recording) -> ft.Icons:

View File

@@ -186,7 +186,8 @@
"stopped": "Stopped",
"filter": "Filter",
"offline": "Offline",
"no_monitor": "Not Monitored"
"no_monitor": "Not Monitored",
"checking": "Checking"
},
"settings_page": {
"recording_settings": "Recording Settings",

View File

@@ -188,7 +188,8 @@
"stopped": "已停止",
"filter": "筛选",
"offline": "未开播",
"no_monitor": "未监控"
"no_monitor": "未监控",
"checking": "检测中"
},
"settings_page": {
"recording_settings": "录制设置",