diff --git a/app/core/config/config_manager.py b/app/core/config/config_manager.py index cf62360..c0e271b 100644 --- a/app/core/config/config_manager.py +++ b/app/core/config/config_manager.py @@ -1,12 +1,14 @@ import json import os import shutil -from typing import Any +from typing import TypeVar import aiofiles from ...utils.logger import logger +T = TypeVar("T") + class ConfigManager: def __init__(self, run_path): @@ -163,7 +165,7 @@ class ConfigManager: error_message="An error occurred while saving cookies config", ) - def get_config_value(self, key: str, default: Any = None): + def get_config_value(self, key: str, default: T = None) -> T: user_config = self.load_user_config() default_config = self.load_default_config() return user_config.get(key, default_config.get(key, default)) diff --git a/app/core/recording/stream_manager.py b/app/core/recording/stream_manager.py index 1338665..0348a79 100644 --- a/app/core/recording/stream_manager.py +++ b/app/core/recording/stream_manager.py @@ -4,7 +4,7 @@ import shutil import subprocess import time from datetime import datetime -from typing import Any +from typing import TypeVar from ...messages import desktop_notify, message_pusher from ...models.media.video_quality_model import VideoQuality @@ -17,6 +17,8 @@ from ..platforms import platform_handlers from ..platforms.platform_handlers import StreamData from ..runtime.process_manager import BackgroundService +T = TypeVar("T") + class LiveStreamRecorder: DEFAULT_SEGMENT_TIME = "1800" @@ -54,7 +56,7 @@ class LiveStreamRecorder: for key in ("recording_manager", "stream_manager"): self._.update(language.get(key, {})) - def _get_info(self, key: str, default: Any = None): + def _get_info(self, key: str, default: T = None) -> T: return self.recording_info.get(key, default) or default def is_use_proxy(self): @@ -739,5 +741,4 @@ class LiveStreamRecorder: msg_title = msg_title or self._["status_notify"] self.app.page.run_task(msg_manager.push_messages, msg_title, push_content) - - + diff --git a/app/core/update/update_checker.py b/app/core/update/update_checker.py index 41c03de..1fe27bd 100644 --- a/app/core/update/update_checker.py +++ b/app/core/update/update_checker.py @@ -1,7 +1,7 @@ import asyncio import json import os -from typing import Any +from typing import Any, Literal, TypedDict import flet as ft import httpx @@ -9,6 +9,41 @@ import httpx from ...utils.logger import logger +class UpdateSource(TypedDict): + name: str + enabled: bool + priority: int + type: Literal["github", "custom"] + repo: str + url: str + timeout: int + + +class SuccessfulUpdateInfo(TypedDict): + has_update: Literal[True] + latest_version: str + current_version: str + release_notes: str + download_url: str + download_urls: dict[str, str] + source: str + + +class FailedUpdateInfo(TypedDict): + has_update: Literal[False] + error: str + source: str + + +UpdateInfo = SuccessfulUpdateInfo | FailedUpdateInfo + + +class UpdateConfig(TypedDict): + update_sources: list[UpdateSource] + check_interval: int + auto_check: bool + + class UpdateChecker: def __init__(self, app): self.app = app @@ -26,7 +61,7 @@ class UpdateChecker: return "0.0.0" @staticmethod - def _load_update_config() -> dict[str, Any]: + def _load_update_config() -> UpdateConfig: auto_check = os.getenv("AUTO_CHECK_UPDATE", "false").lower() == "true" update_source = os.getenv("UPDATE_SOURCE", "both").lower() github_repo = os.getenv("GITHUB_REPO", "ihmily/StreamCap") @@ -42,6 +77,7 @@ class UpdateChecker: "priority": 1 if update_source == "github" else 0, "type": "github", "repo": github_repo, + "url": "https://api.github.com/repos/" + github_repo + "/releases/latest", "timeout": 10 }) @@ -51,6 +87,7 @@ class UpdateChecker: "enabled": True, "priority": 1 if update_source == "custom" else 2, "type": "custom", + "repo": custom_api, "url": custom_api, "timeout": 5 }) @@ -61,7 +98,7 @@ class UpdateChecker: "auto_check": auto_check } - async def check_for_updates(self) -> dict[str, Any]: + async def check_for_updates(self) -> UpdateInfo: """Check for updates, prioritizing sources with higher priority""" sources = sorted( [s for s in self.update_config["update_sources"] if s["enabled"]], @@ -94,7 +131,7 @@ class UpdateChecker: return results[-1] if results else {"has_update": False, "error": "All update sources check failed"} - async def _check_github_update(self, source: dict[str, Any]) -> dict[str, Any]: + async def _check_github_update(self, source: UpdateSource) -> UpdateInfo: """Check for updates from GitHub""" try: timeout = httpx.Timeout(source["timeout"]) @@ -132,7 +169,7 @@ class UpdateChecker: logger.error(f"Failed to check update from GitHub: {e}") return {"has_update": False, "error": str(e), "source": source["name"]} - async def _check_custom_update(self, source: dict[str, Any]) -> dict[str, Any]: + async def _check_custom_update(self, source: UpdateSource) -> UpdateInfo: """Check for updates from custom source Expected API Response Format: