diff --git a/app/core/recording/record_manager.py b/app/core/recording/record_manager.py index f8960a7..97b6671 100644 --- a/app/core/recording/record_manager.py +++ b/app/core/recording/record_manager.py @@ -30,6 +30,7 @@ class RecordingManager: self.initialize_dynamic_state() max_concurrent = int(self.settings.user_config.get("platform_max_concurrent_requests", 3)) self.platform_semaphores = defaultdict(lambda: asyncio.Semaphore(max_concurrent)) + self.active_recorders = {} @property def recordings(self): @@ -103,16 +104,21 @@ class RecordingManager: Start monitoring a single recording if it is not already being monitored. """ if not recording.monitor_status: + recording.is_checking = True + recording.is_live = False await self._update_recording( recording=recording, monitor_status=True, display_title=recording.title, - status_info=RecordingStatus.MONITORING, + status_info=RecordingStatus.STATUS_CHECKING, selected=False, ) - self.app.page.run_task(self.check_if_live, recording) + self.app.page.run_task(self.app.record_card_manager.update_card, recording) self.app.page.pubsub.send_others_on_topic("update", recording) + + self.app.page.run_task(self.check_if_live, recording) + if auto_save: self.app.page.run_task(self.persist_recordings) @@ -203,147 +209,161 @@ class RecordingManager: async def check_if_live(self, recording: Recording): """Check if the live stream is available, fetch stream data and update is_live status.""" - if recording.is_recording: + recording.manually_stopped = False + if recording.is_recording or recording.stopping_in_progress: + logger.debug(f"Skip check_if_live because recording is busy: {recording.url}") + return + + if recording.rec_id in self.active_recorders: + logger.debug(f"Skip check_if_live because recorder is active: {recording.url}") return if not recording.monitor_status: recording.display_title = f"[{self._['monitor_stopped']}] {recording.title}" recording.status_info = RecordingStatus.STOPPED_MONITORING + recording.is_checking = False + self.app.page.run_task(self.app.record_card_manager.update_card, recording) + return - elif not recording.is_checking: + if not recording.is_checking: recording.status_info = RecordingStatus.STATUS_CHECKING recording.detection_time = datetime.now().time() - if recording.scheduled_recording and recording.scheduled_start_time and recording.monitor_hours: - scheduled_time_range = await self.get_scheduled_time_range( - recording.scheduled_start_time, recording.monitor_hours) - recording.scheduled_time_range = scheduled_time_range - in_scheduled = utils.is_current_time_within_range(scheduled_time_range) - if not in_scheduled: - recording.status_info = RecordingStatus.NOT_IN_SCHEDULED_CHECK - recording.is_live = False - logger.info(f"Skip Detection: {recording.url} not in scheduled check range {scheduled_time_range}") - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - return - recording.is_checking = True - recording.status_info = RecordingStatus.MONITORING - platform, platform_key = get_platform_info(recording.url) + self.app.page.run_task(self.app.record_card_manager.update_card, recording) - if platform and platform_key and (recording.platform is None or recording.platform_key is None): - recording.platform = platform - recording.platform_key = platform_key + if recording.scheduled_recording and recording.scheduled_start_time and recording.monitor_hours: + scheduled_time_range = await self.get_scheduled_time_range( + recording.scheduled_start_time, recording.monitor_hours) + recording.scheduled_time_range = scheduled_time_range + in_scheduled = utils.is_current_time_within_range(scheduled_time_range) + if not in_scheduled: + recording.status_info = RecordingStatus.NOT_IN_SCHEDULED_CHECK + recording.is_live = False + recording.is_checking = False + logger.info(f"Skip Detection: {recording.url} not in scheduled check range {scheduled_time_range}") + self.app.page.run_task(self.app.record_card_manager.update_card, recording) + return + + recording.status_info = RecordingStatus.STATUS_CHECKING + platform, platform_key = get_platform_info(recording.url) + + if platform and platform_key and (recording.platform is None or recording.platform_key is None): + recording.platform = platform + recording.platform_key = platform_key + self.app.page.run_task(self.persist_recordings) + + if self.settings.user_config["language"] != "zh_CN": + platform = platform_key + + output_dir = self.settings.get_video_save_path() + await self.check_free_space(output_dir) + if not self.app.recording_enabled: + recording.is_checking = False + recording.status_info = RecordingStatus.NOT_RECORDING_SPACE + return + recording_info = { + "platform": platform, + "platform_key": platform_key, + "live_url": recording.url, + "output_dir": output_dir, + "segment_record": recording.segment_record, + "segment_time": recording.segment_time, + "save_format": recording.record_format, + "quality": recording.quality, + } + + semaphore = self.platform_semaphores[platform_key] + recorder = LiveStreamRecorder(self.app, recording, recording_info) + async with semaphore: + stream_info = await recorder.fetch_stream() + logger.info(f"Stream Data: {stream_info}") + if not stream_info or not stream_info.anchor_name: + logger.error(f"Fetch stream data failed: {recording.url}") + recording.is_checking = False + recording.status_info = RecordingStatus.LIVE_STATUS_CHECK_ERROR + if recording.monitor_status: + self.app.page.run_task(self.app.record_card_manager.update_card, recording) + self.app.page.pubsub.send_others_on_topic("update", recording) + return + if self.settings.user_config.get("remove_emojis"): + stream_info.anchor_name = utils.clean_name(stream_info.anchor_name, self._["live_room"]) + + if stream_info.is_live: + recording.live_title = stream_info.title + if recording.streamer_name.strip() == self._["live_room"]: + recording.streamer_name = stream_info.anchor_name + recording.title = f"{recording.streamer_name} - {self._[recording.quality]}" + recording.display_title = f"[{self._['is_live']}] {recording.title}" + + if not recording.is_live: + recording.is_live = stream_info.is_live + recording.notified_live_start = False + recording.notified_live_end = False + + if desktop_notify.should_push_notification(self.app): + desktop_notify.send_notification( + title=self._["notify"], + message=recording.streamer_name + ' | ' + self._["live_recording_started_message"], + app_icon=self.app.tray_manager.icon_path + ) + + msg_manager = message_pusher.MessagePusher(self.settings) + user_config = self.settings.user_config + if (msg_manager.should_push_message(self.settings, recording, message_type='start') + and not recording.notified_live_start): + push_content = self._["push_content"] + begin_push_message_text = user_config.get("custom_stream_start_content") + if begin_push_message_text: + push_content = begin_push_message_text + + push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S") + push_content = push_content.replace("[room_name]", recording.streamer_name).replace( + "[time]", push_at + ) + msg_title = user_config.get("custom_notification_title").strip() + msg_title = msg_title or self._["status_notify"] + + self.app.page.run_task(msg_manager.push_messages, msg_title, push_content) + recording.notified_live_start = True + + if not recording.only_notify_no_record: + recording.status_info = RecordingStatus.PREPARING_RECORDING + recording.loop_time_seconds = self.loop_time_seconds + self.start_update(recording) + self.app.page.run_task(recorder.start_recording, stream_info) + else: + if recording.notified_live_start: + notify_loop_time = user_config.get("notify_loop_time") + recording.loop_time_seconds = int(notify_loop_time or 600) + else: + recording.loop_time_seconds = self.loop_time_seconds + + recording.cumulative_duration = timedelta() + recording.last_duration = timedelta() + recording.status_info = RecordingStatus.LIVE_BROADCASTING + + else: + recording.is_recording = False + if recording.is_live: + recording.is_live = False + self.app.page.run_task(recorder.end_message_push) + + recording.status_info = RecordingStatus.MONITORING + title = f"{stream_info.anchor_name or recording.streamer_name} - {self._[recording.quality]}" + if recording.streamer_name == self._["live_room"] or \ + f"[{self._['is_live']}]" in recording.display_title: + recording.update( + { + "streamer_name": stream_info.anchor_name, + "title": title, + "display_title": title, + } + ) self.app.page.run_task(self.persist_recordings) - if self.settings.user_config["language"] != "zh_CN": - platform = platform_key - - output_dir = self.settings.get_video_save_path() - await self.check_free_space(output_dir) - if not self.app.recording_enabled: - recording.is_checking = False - recording.status_info = RecordingStatus.NOT_RECORDING_SPACE - return - recording_info = { - "platform": platform, - "platform_key": platform_key, - "live_url": recording.url, - "output_dir": output_dir, - "segment_record": recording.segment_record, - "segment_time": recording.segment_time, - "save_format": recording.record_format, - "quality": recording.quality, - } - - semaphore = self.platform_semaphores[platform_key] - recorder = LiveStreamRecorder(self.app, recording, recording_info) - async with semaphore: - stream_info = await recorder.fetch_stream() - logger.info(f"Stream Data: {stream_info}") - if not stream_info or not stream_info.anchor_name: - logger.error(f"Fetch stream data failed: {recording.url}") - recording.is_checking = False - recording.status_info = RecordingStatus.LIVE_STATUS_CHECK_ERROR - if recording.monitor_status: - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - return - if self.settings.user_config.get("remove_emojis"): - stream_info.anchor_name = utils.clean_name(stream_info.anchor_name, self._["live_room"]) - - if stream_info.is_live: - recording.live_title = stream_info.title - if recording.streamer_name.strip() == self._["live_room"]: - recording.streamer_name = stream_info.anchor_name - recording.title = f"{recording.streamer_name} - {self._[recording.quality]}" - recording.display_title = f"[{self._['is_live']}] {recording.title}" - - if not recording.is_live: - recording.is_live = stream_info.is_live - recording.notified_live_start = False - recording.notified_live_end = False - - if desktop_notify.should_push_notification(self.app): - desktop_notify.send_notification( - title=self._["notify"], - message=recording.streamer_name + ' | ' + self._["live_recording_started_message"], - app_icon=self.app.tray_manager.icon_path - ) - - msg_manager = message_pusher.MessagePusher(self.settings) - user_config = self.settings.user_config - if (msg_manager.should_push_message(self.settings, recording, message_type='start') - and not recording.notified_live_start): - push_content = self._["push_content"] - begin_push_message_text = user_config.get("custom_stream_start_content") - if begin_push_message_text: - push_content = begin_push_message_text - - push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S") - push_content = push_content.replace("[room_name]", recording.streamer_name).replace( - "[time]", push_at - ) - msg_title = user_config.get("custom_notification_title").strip() - msg_title = msg_title or self._["status_notify"] - - self.app.page.run_task(msg_manager.push_messages, msg_title, push_content) - recording.notified_live_start = True - - if not recording.only_notify_no_record: - recording.status_info = RecordingStatus.PREPARING_RECORDING - recording.loop_time_seconds = self.loop_time_seconds - self.start_update(recording) - self.app.page.run_task(recorder.start_recording, stream_info) - else: - if recording.notified_live_start: - notify_loop_time = user_config.get("notify_loop_time") - recording.loop_time_seconds = int(notify_loop_time or 600) - else: - recording.loop_time_seconds = self.loop_time_seconds - - recording.cumulative_duration = timedelta() - recording.last_duration = timedelta() - recording.status_info = RecordingStatus.LIVE_BROADCASTING - - else: - if recording.is_live: - recording.is_live = False - self.app.page.run_task(recorder.end_message_push) - - recording.status_info = RecordingStatus.MONITORING - title = f"{stream_info.anchor_name or recording.streamer_name} - {self._[recording.quality]}" - if recording.streamer_name == self._["live_room"] or \ - f"[{self._['is_live']}]" in recording.display_title: - recording.update( - { - "streamer_name": stream_info.anchor_name, - "title": title, - "display_title": title, - } - ) - self.app.page.run_task(self.persist_recordings) - - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - self.app.page.pubsub.send_others_on_topic("update", recording) - recording.is_checking = False + recording.is_checking = False + self.app.page.run_task(self.app.record_card_manager.update_card, recording) + self.app.page.pubsub.send_others_on_topic("update", recording) @staticmethod def start_update(recording: Recording): @@ -360,11 +380,26 @@ class RecordingManager: ) logger.info(f"Started recording for {recording.title}") - @staticmethod - def stop_recording(recording: Recording, manually_stopped: bool = True): + def stop_recording(self, recording: Recording, manually_stopped: bool = True): """Stop the recording process.""" recording.is_live = False if recording.is_recording: + + recording.stopping_in_progress = True + + logger.info(f"Trying to stop recorder for {recording.rec_id}, title: {recording.title}") + logger.debug(f"Active recorders: {list(self.active_recorders.keys())}") + + if recording.rec_id in self.active_recorders: + recorder = self.active_recorders[recording.rec_id] + logger.debug(f"Found recorder instance - id: {id(recorder)}") + recorder.request_stop() + logger.info(f"Requested stop for recorder: {recording.rec_id}") + else: + logger.warning(f"No active recorder found for {recording.rec_id}, cannot request stop") + recording.force_stop = True + logger.info(f"Set force_stop=True for recording: {recording.rec_id}") + if recording.start_time is not None: elapsed = datetime.now() - recording.start_time # Add the elapsed time to the cumulative duration. @@ -374,8 +409,11 @@ class RecordingManager: recording.start_time = None recording.is_recording = False recording.manually_stopped = manually_stopped + recording.status_info = RecordingStatus.NOT_RECORDING logger.info(f"Stopped recording for {recording.title}") + self.app.page.run_task(self._reset_stopping_flag, recording) + def get_duration(self, recording: Recording): """Get the duration of the current recording session in a formatted string.""" if recording.is_recording and recording.start_time is not None: @@ -424,3 +462,9 @@ class RecordingManager: end_time = utils.add_hours_to_time(scheduled_start_time, monitor_hours) scheduled_time_range = f"{scheduled_start_time}~{end_time}" return scheduled_time_range + + @staticmethod + async def _reset_stopping_flag(recording: Recording): + await asyncio.sleep(2) + recording.stopping_in_progress = False + logger.debug(f"Reset stopping_in_progress flag for recording: {recording.rec_id}") diff --git a/app/core/recording/stream_manager.py b/app/core/recording/stream_manager.py index b4b8e9b..9628b63 100644 --- a/app/core/recording/stream_manager.py +++ b/app/core/recording/stream_manager.py @@ -31,6 +31,7 @@ class LiveStreamRecorder: self.recording = recording self.recording_info = recording_info self.subprocess_start_info = app.subprocess_start_up_info + self.should_stop = False self.user_config = self.settings.user_config self.account_config = self.settings.accounts_config @@ -226,6 +227,21 @@ class LiveStreamRecorder: record_url = self._get_record_url(stream_info) self.set_preview_url(stream_info) + try: + if self.recording.rec_id in self.app.record_manager.active_recorders: + old_recorder = self.app.record_manager.active_recorders[self.recording.rec_id] + logger.warning( + f"Found existing recorder instance for {self.recording.rec_id}, id: {id(old_recorder)}, stopping it" + ) + old_recorder.request_stop() + + await asyncio.sleep(1) + + self.app.record_manager.active_recorders[self.recording.rec_id] = self + logger.info(f"Saved recorder instance for {self.recording.rec_id}, id: {id(self)}") + except Exception as e: + logger.error(f"Failed to save recorder instance: {e}") + if use_direct_download: logger.info(f"Use Direct Downloader to Download FLV Stream: {record_url}") headers = {} @@ -284,6 +300,9 @@ class LiveStreamRecorder: The child process executes ffmpeg for recording """ + logger.info(f"Starting ffmpeg recording - recorder id: {id(self)}, rec_id: {self.recording.rec_id}") + self.should_stop = False + try: save_file_path = ffmpeg_command[-1] @@ -302,30 +321,33 @@ class LiveStreamRecorder: logger.log("STREAM", f"Recording Stream URL: {record_url}") while True: - if not self.recording.is_recording or not self.app.recording_enabled: + if self.should_stop or self.recording.force_stop or not self.app.recording_enabled: logger.info(f"Preparing to End Recording: {live_url}") - - if os.name == "nt": - if process.stdin: - process.stdin.write(b"q") - await process.stdin.drain() - await asyncio.sleep(5) - else: - import signal - process.send_signal(signal.SIGINT) - # process.terminate() - await asyncio.sleep(5) - - if process.stdin: - process.stdin.close() - + try: + if os.name == "nt": + if process.stdin: + process.stdin.write(b"q") + await process.stdin.drain() + await asyncio.sleep(5) + else: + import signal + process.send_signal(signal.SIGINT) + # process.terminate() + await asyncio.sleep(5) + + if process.stdin: + process.stdin.close() + await asyncio.wait_for(process.wait(), timeout=15.0) except asyncio.TimeoutError: logger.warning(f"FFmpeg process did not exit gracefully, forcing termination: {live_url}") process.kill() await process.wait() + self.recording.force_stop = False + break + if process.returncode is not None: logger.info(f"Exit loop recording (normal 0 | abnormal 1): code={process.returncode}, {live_url}") break @@ -358,13 +380,19 @@ class LiveStreamRecorder: display_title = self.recording.display_title self.recording.live_title = None - if not self.recording.is_recording: + if self.recording.manually_stopped: logger.success(f"Live recording has stopped: {record_name}") else: - logger.success(f"Live recording completed: {record_name}") self.app.page.run_task(self.end_message_push) - self.recording.is_recording = False + + try: + if self.recording.rec_id in self.app.record_manager.active_recorders: + del self.app.record_manager.active_recorders[self.recording.rec_id] + logger.info(f"Removed recorder from active_recorders: {self.recording.rec_id}") + except Exception as e: + logger.error(f"Failed to remove recorder instance: {e}") + try: self.recording.update({"display_title": display_title}) self.app.page.run_task(self.app.record_card_manager.update_card, self.recording) @@ -622,6 +650,10 @@ class LiveStreamRecorder: """ Use the direct downloader to download the live stream """ + + logger.info(f"Starting direct download - recorder id: {id(self)}, rec_id: {self.recording.rec_id}") + self.should_stop = False + try: await self.direct_downloader.start_download() @@ -631,9 +663,10 @@ class LiveStreamRecorder: logger.log("STREAM", f"Direct Download Stream URL: {record_url}") while True: - if not self.recording.is_recording or not self.app.recording_enabled: + if self.should_stop or self.recording.force_stop or not self.app.recording_enabled: logger.info(f"Prepare to end direct download: {live_url}") await self.direct_downloader.stop_download() + self.recording.force_stop = False break await asyncio.sleep(1) @@ -649,12 +682,19 @@ class LiveStreamRecorder: display_title = self.recording.display_title self.recording.live_title = None - if not self.recording.is_recording: + if self.recording.manually_stopped: logger.success(f"Direct Downloading Stopped: {record_name}") else: logger.success(f"Direct Downloading Completed: {record_name}") self.app.page.run_task(self.end_message_push) - self.recording.is_recording = False + + try: + if self.recording.rec_id in self.app.record_manager.active_recorders: + del self.app.record_manager.active_recorders[self.recording.rec_id] + logger.info(f"Removed recorder from active_recorders: {self.recording.rec_id}") + except Exception as e: + logger.error(f"Failed to remove recorder instance: {e}") + if self.app.recording_enabled and not self.is_flv_preferred_platform: self.app.page.run_task(self.app.record_manager.check_if_live, self.recording) @@ -741,3 +781,12 @@ class LiveStreamRecorder: msg_title = msg_title or self._["status_notify"] self.app.page.run_task(msg_manager.push_messages, msg_title, push_content) + + def request_stop(self): + logger.info(f"Stop requested for recorder: {self.recording.url}, rec_id: {self.recording.rec_id}") + logger.info(f"Recorder instance details - id: {id(self)}, recording: {self.recording.title}") + + old_value = self.should_stop + self.should_stop = True + + logger.info(f"Set should_stop from {old_value} to {self.should_stop} for recorder: {self.recording.rec_id}") diff --git a/app/models/recording/recording_model.py b/app/models/recording/recording_model.py index 5df5193..d01889a 100644 --- a/app/models/recording/recording_model.py +++ b/app/models/recording/recording_model.py @@ -62,6 +62,9 @@ class Recording: self.is_recording = False self.start_time = None self.manually_stopped = False + self.force_stop = False + self.stopping_in_progress = False + self.stop_requested = False self.platform = None self.platform_key = None self.notified_live_start = False diff --git a/app/models/recording/recording_status_model.py b/app/models/recording/recording_status_model.py index ab2973d..2ef6931 100644 --- a/app/models/recording/recording_status_model.py +++ b/app/models/recording/recording_status_model.py @@ -7,6 +7,7 @@ class CardStateType(Enum): LIVE = "live" OFFLINE = "offline" STOPPED = "stopped" + CHECKING = "checking" UNKNOWN = "unknown" diff --git a/app/ui/components/business/recording_card.py b/app/ui/components/business/recording_card.py index 6f370cf..80d792a 100644 --- a/app/ui/components/business/recording_card.py +++ b/app/ui/components/business/recording_card.py @@ -266,7 +266,7 @@ class RecordingCardManager: recording.update( { "monitor_status": not recording.monitor_status, - "status_info": RecordingStatus.MONITORING, + "status_info": RecordingStatus.STATUS_CHECKING, "display_title": f"{recording.title}", } ) @@ -462,8 +462,8 @@ class RecordingCardManager: try: delete_alert_dialog.open = False delete_alert_dialog.update() - except (ft.core.page.PageDisconnectedException, AssertionError) as e: - logger.debug(f"Close delete dialog failed: {e}") + except (ft.core.page.PageDisconnectedException, AssertionError) as err: + logger.debug(f"Close delete dialog failed: {err}") delete_alert_dialog = ft.AlertDialog( title=ft.Text(self._["confirm"]), @@ -525,4 +525,4 @@ class RecordingCardManager: await self.update_card(recording) async def subscribe_remove_cards(self, _, recordings: list[Recording]): - await self.remove_recording_card(recordings) \ No newline at end of file + await self.remove_recording_card(recordings) diff --git a/app/ui/components/state/recording_card_state.py b/app/ui/components/state/recording_card_state.py index 82e84cf..3bdce2a 100644 --- a/app/ui/components/state/recording_card_state.py +++ b/app/ui/components/state/recording_card_state.py @@ -14,9 +14,11 @@ class RecordingCardState: return CardStateType.RECORDING elif recording.status_info in RecordingCardState.ERROR_STATUSES: return CardStateType.ERROR + elif recording.is_checking: + return CardStateType.CHECKING elif recording.is_live and recording.monitor_status and not recording.is_recording: return CardStateType.LIVE - elif (not recording.is_live and recording.monitor_status and + elif (not recording.is_live and recording.monitor_status and recording.status_info != RecordingStatus.NOT_IN_SCHEDULED_CHECK): return CardStateType.OFFLINE elif (not recording.monitor_status or @@ -33,6 +35,7 @@ class RecordingCardState: CardStateType.LIVE: ft.Colors.BLUE, CardStateType.OFFLINE: ft.Colors.AMBER, CardStateType.STOPPED: ft.Colors.GREY, + CardStateType.CHECKING: ft.Colors.PURPLE, } return color_map.get(state, ft.Colors.TRANSPARENT) @@ -66,6 +69,11 @@ class RecordingCardState: "bgcolor": ft.Colors.GREY, "text_color": ft.Colors.WHITE, }, + CardStateType.CHECKING: { + "text": language_dict.get("checking"), + "bgcolor": ft.Colors.PURPLE, + "text_color": ft.Colors.WHITE, + }, } return configs.get(state, {}) @@ -79,7 +87,7 @@ class RecordingCardState: @staticmethod def get_title_weight(recording: Recording) -> ft.FontWeight: - return ft.FontWeight.BOLD if recording.is_recording or recording.is_live else None + return ft.FontWeight.BOLD if recording.is_recording or recording.is_live or recording.is_checking else None @staticmethod def get_recording_icon(recording: Recording) -> ft.Icons: diff --git a/locales/en.json b/locales/en.json index 34b30a5..9f23f9e 100644 --- a/locales/en.json +++ b/locales/en.json @@ -186,7 +186,8 @@ "stopped": "Stopped", "filter": "Filter", "offline": "Offline", - "no_monitor": "Not Monitored" + "no_monitor": "Not Monitored", + "checking": "Checking" }, "settings_page": { "recording_settings": "Recording Settings", diff --git a/locales/zh_CN.json b/locales/zh_CN.json index e2b5ad7..1f52582 100644 --- a/locales/zh_CN.json +++ b/locales/zh_CN.json @@ -188,7 +188,8 @@ "stopped": "已停止", "filter": "筛选", "offline": "未开播", - "no_monitor": "未监控" + "no_monitor": "未监控", + "checking": "检测中" }, "settings_page": { "recording_settings": "录制设置",