mirror of
https://github.com/ihmily/StreamCap.git
synced 2026-05-22 19:59:51 +08:00
fix: type hint improve
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
from typing import Any
|
||||
from typing import TypeVar
|
||||
|
||||
import aiofiles
|
||||
|
||||
from ...utils.logger import logger
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class ConfigManager:
|
||||
def __init__(self, run_path):
|
||||
@@ -163,7 +165,7 @@ class ConfigManager:
|
||||
error_message="An error occurred while saving cookies config",
|
||||
)
|
||||
|
||||
def get_config_value(self, key: str, default: Any = None):
|
||||
def get_config_value(self, key: str, default: T = None) -> T:
|
||||
user_config = self.load_user_config()
|
||||
default_config = self.load_default_config()
|
||||
return user_config.get(key, default_config.get(key, default))
|
||||
|
||||
@@ -4,7 +4,7 @@ import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from typing import TypeVar
|
||||
|
||||
from ...messages import desktop_notify, message_pusher
|
||||
from ...models.media.video_quality_model import VideoQuality
|
||||
@@ -17,6 +17,8 @@ from ..platforms import platform_handlers
|
||||
from ..platforms.platform_handlers import StreamData
|
||||
from ..runtime.process_manager import BackgroundService
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class LiveStreamRecorder:
|
||||
DEFAULT_SEGMENT_TIME = "1800"
|
||||
@@ -54,7 +56,7 @@ class LiveStreamRecorder:
|
||||
for key in ("recording_manager", "stream_manager"):
|
||||
self._.update(language.get(key, {}))
|
||||
|
||||
def _get_info(self, key: str, default: Any = None):
|
||||
def _get_info(self, key: str, default: T = None) -> T:
|
||||
return self.recording_info.get(key, default) or default
|
||||
|
||||
def is_use_proxy(self):
|
||||
@@ -739,5 +741,4 @@ class LiveStreamRecorder:
|
||||
msg_title = msg_title or self._["status_notify"]
|
||||
|
||||
self.app.page.run_task(msg_manager.push_messages, msg_title, push_content)
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from typing import Any
|
||||
from typing import Any, Literal, TypedDict
|
||||
|
||||
import flet as ft
|
||||
import httpx
|
||||
@@ -9,6 +9,41 @@ import httpx
|
||||
from ...utils.logger import logger
|
||||
|
||||
|
||||
class UpdateSource(TypedDict):
|
||||
name: str
|
||||
enabled: bool
|
||||
priority: int
|
||||
type: Literal["github", "custom"]
|
||||
repo: str
|
||||
url: str
|
||||
timeout: int
|
||||
|
||||
|
||||
class SuccessfulUpdateInfo(TypedDict):
|
||||
has_update: Literal[True]
|
||||
latest_version: str
|
||||
current_version: str
|
||||
release_notes: str
|
||||
download_url: str
|
||||
download_urls: dict[str, str]
|
||||
source: str
|
||||
|
||||
|
||||
class FailedUpdateInfo(TypedDict):
|
||||
has_update: Literal[False]
|
||||
error: str
|
||||
source: str
|
||||
|
||||
|
||||
UpdateInfo = SuccessfulUpdateInfo | FailedUpdateInfo
|
||||
|
||||
|
||||
class UpdateConfig(TypedDict):
|
||||
update_sources: list[UpdateSource]
|
||||
check_interval: int
|
||||
auto_check: bool
|
||||
|
||||
|
||||
class UpdateChecker:
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
@@ -26,7 +61,7 @@ class UpdateChecker:
|
||||
return "0.0.0"
|
||||
|
||||
@staticmethod
|
||||
def _load_update_config() -> dict[str, Any]:
|
||||
def _load_update_config() -> UpdateConfig:
|
||||
auto_check = os.getenv("AUTO_CHECK_UPDATE", "false").lower() == "true"
|
||||
update_source = os.getenv("UPDATE_SOURCE", "both").lower()
|
||||
github_repo = os.getenv("GITHUB_REPO", "ihmily/StreamCap")
|
||||
@@ -42,6 +77,7 @@ class UpdateChecker:
|
||||
"priority": 1 if update_source == "github" else 0,
|
||||
"type": "github",
|
||||
"repo": github_repo,
|
||||
"url": "https://api.github.com/repos/" + github_repo + "/releases/latest",
|
||||
"timeout": 10
|
||||
})
|
||||
|
||||
@@ -51,6 +87,7 @@ class UpdateChecker:
|
||||
"enabled": True,
|
||||
"priority": 1 if update_source == "custom" else 2,
|
||||
"type": "custom",
|
||||
"repo": custom_api,
|
||||
"url": custom_api,
|
||||
"timeout": 5
|
||||
})
|
||||
@@ -61,7 +98,7 @@ class UpdateChecker:
|
||||
"auto_check": auto_check
|
||||
}
|
||||
|
||||
async def check_for_updates(self) -> dict[str, Any]:
|
||||
async def check_for_updates(self) -> UpdateInfo:
|
||||
"""Check for updates, prioritizing sources with higher priority"""
|
||||
sources = sorted(
|
||||
[s for s in self.update_config["update_sources"] if s["enabled"]],
|
||||
@@ -94,7 +131,7 @@ class UpdateChecker:
|
||||
|
||||
return results[-1] if results else {"has_update": False, "error": "All update sources check failed"}
|
||||
|
||||
async def _check_github_update(self, source: dict[str, Any]) -> dict[str, Any]:
|
||||
async def _check_github_update(self, source: UpdateSource) -> UpdateInfo:
|
||||
"""Check for updates from GitHub"""
|
||||
try:
|
||||
timeout = httpx.Timeout(source["timeout"])
|
||||
@@ -132,7 +169,7 @@ class UpdateChecker:
|
||||
logger.error(f"Failed to check update from GitHub: {e}")
|
||||
return {"has_update": False, "error": str(e), "source": source["name"]}
|
||||
|
||||
async def _check_custom_update(self, source: dict[str, Any]) -> dict[str, Any]:
|
||||
async def _check_custom_update(self, source: UpdateSource) -> UpdateInfo:
|
||||
"""Check for updates from custom source
|
||||
|
||||
Expected API Response Format:
|
||||
|
||||
Reference in New Issue
Block a user