feat: add auto-update notification system

This commit is contained in:
ihmily
2025-05-21 18:38:57 +08:00
parent bb721edcbc
commit 2013e6042e
6 changed files with 324 additions and 4 deletions

View File

@@ -19,3 +19,18 @@ CUSTOM_VIDEO_ROOT_DIR=
# Set external URL for the video API (example: http://www.example.com)
VIDEO_API_EXTERNAL_URL=
# Enable automatic update check (true/false)
AUTO_CHECK_UPDATE=false
# Update source priority (github, custom, both)
UPDATE_SOURCE=both
# GitHub repository for updates
GITHUB_REPO=ihmily/StreamCap
# Custom update API URL (leave empty to disable)
CUSTOM_UPDATE_API=
# Update check interval in seconds (default: 86400 = 1 day)
UPDATE_CHECK_INTERVAL=86400

View File

@@ -1,4 +1,5 @@
import os
import time
import flet as ft
@@ -6,6 +7,7 @@ from . import InstallationManager, execute_dir
from .core.config_manager import ConfigManager
from .core.language_manager import LanguageManager
from .core.record_manager import RecordingManager
from .core.update_checker import UpdateChecker
from .process_manager import AsyncProcessManager
from .ui.components.recording_card import RecordingCardManager
from .ui.components.show_snackbar import ShowSnackBar
@@ -15,6 +17,7 @@ from .ui.views.home_view import HomePage
from .ui.views.settings_view import SettingsPage
from .ui.views.storage_view import StoragePage
from .utils import utils
from .utils.logger import logger
class App:
@@ -62,8 +65,10 @@ class App:
self._loading_page = False
self.recording_enabled = True
self.install_manager = InstallationManager(self)
self.update_checker = UpdateChecker(self)
self.page.run_task(self.install_manager.check_env)
self.page.run_task(self.record_manager.check_free_space)
self.page.run_task(self._check_for_updates)
def initialize_pages(self):
return {
@@ -97,3 +102,23 @@ class App:
def add_ffmpeg_process(self, process):
self.process_manager.add_process(process)
async def _check_for_updates(self):
"""Check for updates when the application starts"""
try:
if not self.update_checker.update_config["auto_check"]:
return
last_check_time = self.settings.user_config.get("last_update_check", 0)
current_time = time.time()
check_interval = self.update_checker.update_config["check_interval"]
if current_time - last_check_time >= check_interval:
update_info = await self.update_checker.check_for_updates()
self.settings.user_config["last_update_check"] = current_time
await self.config_manager.save_user_config(self.settings.user_config)
if update_info.get("has_update", False):
await self.update_checker.show_update_dialog(update_info)
except Exception as e:
logger.error(f"Update check failed: {e}")

230
app/core/update_checker.py Normal file
View File

@@ -0,0 +1,230 @@
import asyncio
import json
import os
from typing import Any
import flet as ft
import httpx
from ..utils.logger import logger
class UpdateChecker:
def __init__(self, app):
self.app = app
self.current_version = self._get_current_version()
self.update_config = self._load_update_config()
def _get_current_version(self) -> str:
try:
config_path = os.path.join(self.app.run_path, "config", "version.json")
with open(config_path, encoding="utf-8") as f:
version_data = json.load(f)
return version_data["version_updates"][0]["version"]
except Exception as e:
logger.error(f"Failed to get current version: {e}")
return "0.0.0"
@staticmethod
def _load_update_config() -> dict[str, Any]:
auto_check = os.getenv("AUTO_CHECK_UPDATE", "true").lower() == "true"
update_source = os.getenv("UPDATE_SOURCE", "both").lower()
github_repo = os.getenv("GITHUB_REPO", "ihmily/StreamCap")
custom_api = os.getenv("CUSTOM_UPDATE_API", "")
check_interval = int(os.getenv("UPDATE_CHECK_INTERVAL", "86400"))
update_sources = []
if update_source in ["github", "both"]:
update_sources.append({
"name": "GitHub",
"enabled": True,
"priority": 1 if update_source == "github" else 0,
"type": "github",
"repo": github_repo,
"timeout": 10
})
if update_source in ["custom", "both"] and custom_api:
update_sources.append({
"name": "Custom",
"enabled": True,
"priority": 1 if update_source == "custom" else 2,
"type": "custom",
"url": custom_api,
"timeout": 5
})
return {
"update_sources": update_sources,
"check_interval": check_interval,
"auto_check": auto_check
}
async def check_for_updates(self) -> dict[str, Any]:
"""Check for updates, prioritizing sources with higher priority"""
sources = sorted(
[s for s in self.update_config["update_sources"] if s["enabled"]],
key=lambda x: x["priority"],
reverse=True
)
if not sources:
logger.warning("No available update sources configured")
return {"has_update": False, "error": "No available update sources configured"}
tasks = []
for source in sources:
if source["type"] == "github":
tasks.append(self._check_github_update(source))
elif source["type"] == "custom":
tasks.append(self._check_custom_update(source))
# Wait for any task to complete successfully or all to fail
results = []
for task in asyncio.as_completed(tasks):
try:
result = await task
if result["has_update"] or "error" not in result:
return result
results.append(result)
except Exception as e:
logger.error(f"Update check failed: {e}")
results.append({"has_update": False, "error": str(e)})
return results[-1] if results else {"has_update": False, "error": "All update sources check failed"}
async def _check_github_update(self, source: dict[str, Any]) -> dict[str, Any]:
"""Check for updates from GitHub"""
try:
timeout = httpx.Timeout(source["timeout"])
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
"https://api.github.com/repos/" + source['repo'] + "/releases/latest"
)
if response.status_code == 200:
latest_release = response.json()
latest_version = latest_release["tag_name"].lstrip("v")
if self._compare_versions(latest_version, self.current_version) > 0:
download_urls = {}
for asset in latest_release.get("assets", []):
name = asset["name"].lower()
if "win" in name or "windows" in name:
download_urls["windows"] = asset["browser_download_url"]
elif "mac" in name or "macos" in name:
download_urls["macos"] = asset["browser_download_url"]
elif "linux" in name:
download_urls["linux"] = asset["browser_download_url"]
return {
"has_update": True,
"latest_version": latest_version,
"current_version": self.current_version,
"release_notes": latest_release["body"],
"download_url": latest_release["html_url"],
"download_urls": download_urls,
"source": source["name"]
}
return {"has_update": False, "source": source["name"]}
except Exception as e:
logger.error(f"Failed to check update from GitHub: {e}")
return {"has_update": False, "error": str(e), "source": source["name"]}
async def _check_custom_update(self, source: dict[str, Any]) -> dict[str, Any]:
"""Check for updates from custom source
Expected API Response Format:
{
"has_update": bool, # Whether there is a new version available
"latest_version": str, # Latest version number (e.g. "1.0.0")
"current_version": str, # Current version number
"release_notes": str, # Release notes or update description
"download_url": str, # Main download page URL
"download_urls": { # Optional: Platform-specific download URLs
"windows": str, # Windows download URL
"macos": str, # macOS download URL
"linux": str # Linux download URL
}
}
"""
try:
timeout = httpx.Timeout(source["timeout"])
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
source["url"],
params={"current_version": self.current_version}
)
if response.status_code == 200:
update_info = response.json()
if update_info.get("has_update", False):
return {
**update_info,
"source": source["name"]
}
return {"has_update": False, "source": source["name"]}
return {"has_update": False, "error": f"API returned status code: {response.status_code}",
"source": source["name"]}
except Exception as e:
logger.error(f"Failed to check update from custom source: {e}")
return {"has_update": False, "error": str(e), "source": source["name"]}
@staticmethod
def _compare_versions(version1: str, version2: str) -> int:
"""Compare version numbers, returns 1 if version1 > version2, 0 if equal, -1 if less"""
v1_parts = [int(x) for x in version1.split(".")]
v2_parts = [int(x) for x in version2.split(".")]
for i in range(max(len(v1_parts), len(v2_parts))):
v1 = v1_parts[i] if i < len(v1_parts) else 0
v2 = v2_parts[i] if i < len(v2_parts) else 0
if v1 > v2:
return 1
elif v1 < v2:
return -1
return 0
async def show_update_dialog(self, update_info: dict[str, Any]) -> None:
_ = self.app.language_manager.language.get("update", {})
dialog = ft.AlertDialog(
title=ft.Text(_["new_version"].format(version=update_info.get("latest_version"))),
content=ft.Column([
ft.Text(_["current_version"].format(version=update_info.get("current_version"))),
ft.Text(_["latest_version"].format(version=update_info.get("latest_version"))),
ft.Text(_["update_source"].format(source=update_info.get("source", _["unknown"]))),
], spacing=10, width=400, height=300),
actions=[
ft.TextButton(_["later"], on_click=lambda _: self.close_dialog()),
ft.TextButton(_["download"], on_click=lambda _: self.open_download_page(update_info))
],
)
self.app.dialog_area.content = dialog
self.app.dialog_area.content.open = True
self.app.dialog_area.update()
def close_dialog(self) -> None:
if self.app.dialog_area.content:
self.app.dialog_area.content.open = False
self.app.dialog_area.update()
def open_download_page(self, update_info: dict[str, Any]) -> None:
import platform
import webbrowser
url = update_info.get("download_url", "https://github.com/ihmily/StreamCap/releases/latest")
download_urls = update_info.get("download_urls", {})
if download_urls:
system = platform.system().lower()
if system == "windows" and "windows" in download_urls:
url = download_urls["windows"]
elif system == "darwin" and "macos" in download_urls:
url = download_urls["macos"]
elif system == "linux" and "linux" in download_urls:
url = download_urls["linux"]
webbrowser.open(url)
self.close_dialog()

View File

@@ -163,14 +163,19 @@ class AboutPage(PageBase):
controls=[
ft.TextButton(
self._["view_update"],
icon=ft.Icons.CODE,
icon=ft.icons.CODE,
on_click=self.open_update_page,
),
ft.TextButton(
self._["view_docs"],
icon=ft.Icons.DESCRIPTION,
icon=ft.icons.DESCRIPTION,
on_click=self.open_dos_page,
),
ft.TextButton(
self.app.language_manager.language.get("update", {}).get("check_update"),
icon=ft.icons.UPDATE,
on_click=self._check_for_updates,
),
],
alignment=ft.MainAxisAlignment.START,
),
@@ -238,7 +243,7 @@ class AboutPage(PageBase):
@staticmethod
async def open_dos_page(_):
url = "https://github.com/ihmily/StreamCap"
url = "https://github.com/ihmily/StreamCap/wiki"
webbrowser.open(url)
async def on_keyboard(self, e: ft.KeyboardEvent):
@@ -246,3 +251,18 @@ class AboutPage(PageBase):
self.app.dialog_area.content = HelpDialog(self.app)
self.app.dialog_area.content.open = True
self.app.dialog_area.update()
async def _check_for_updates(self, _):
_ = self.app.language_manager.language.get("update", {})
await self.app.snack_bar.show_snack_bar(_.get("checking_update"))
update_info = await self.app.update_checker.check_for_updates()
if update_info.get("has_update", False):
await self.app.update_checker.show_update_dialog(update_info)
else:
if "error" in update_info:
await self.app.snack_bar.show_snack_bar(
f"{_.get('update_check_failed')}: {update_info.get('error', '')}")
else:
await self.app.snack_bar.show_snack_bar(_.get("no_update_available"))

View File

@@ -355,5 +355,20 @@
"previewing": "Previewing",
"view_stream_source_now": "Accessing live stream source",
"unsupported_play_on_web": "⚠️ Only MP4 files can be previewed on the web."
},
"update": {
"new_version": "New Version {version} Available",
"current_version": "Current Version: {version}",
"latest_version": "Latest Version: {version}",
"update_source": "Update Source: {source}",
"release_notes": "Release Notes:",
"no_details": "No details available",
"later": "Later",
"download": "Download Update",
"check_update": "Check for Updates",
"checking_update": "Checking for updates...",
"update_check_failed": "Update check failed",
"no_update_available": "You are using the latest version",
"unknown": "Unknown"
}
}

View File

@@ -357,5 +357,20 @@
"previewing":"正在预览",
"view_stream_source_now": "正在访问直播源",
"unsupported_play_on_web": "⚠️ Web端只支持预览MP4文件"
},
"update": {
"new_version": "发现新版本 {version}",
"current_version": "当前版本: {version}",
"latest_version": "最新版本: {version}",
"update_source": "更新源: {source}",
"release_notes": "更新内容:",
"no_details": "无详细说明",
"later": "稍后再说",
"download": "下载更新",
"check_update": "检查更新",
"checking_update": "正在检查更新...",
"update_check_failed": "检查更新失败",
"no_update_available": "当前已是最新版本",
"unknown": "未知"
}
}