Files
edict/scripts/refresh_live_data.py
cft0808 efd36ab729 feat: 添加 Copilot 模型配置 + 旨意看板归档功能
模型配置:
- 新增 Copilot 系列模型 (Claude Sonnet 4, Claude Opus 4.5, GPT-4o, Gemini 2.5 Pro, o3-mini)
- 新增 github-copilot/claude-opus-4.6 模型

旨意看板归档:
- 看板顶部新增筛选栏: 进行中 / 已归档 / 全部
- Done/Cancelled 状态自动归入归档视图
- 支持单条归档/取消归档操作
- 支持一键归档所有已完成任务
- 归档卡片虚线边框半透明展示
- Tab 徽章仅统计活跃旨意数

其他:
- agents SOUL.md 更新
- 脚本健壮性改进 (file_lock, refresh, sync)
2026-02-26 21:09:05 +08:00

115 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import json, pathlib, datetime, logging
from file_lock import atomic_json_write, atomic_json_read
log = logging.getLogger('refresh')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(name)s] %(message)s', datefmt='%H:%M:%S')
BASE = pathlib.Path(__file__).parent.parent
DATA = BASE / 'data'
def read_json(path, default):
try:
return json.loads(path.read_text())
except Exception:
return default
def output_meta(path):
p = pathlib.Path(path)
if not p.exists():
return {"exists": False, "lastModified": None}
ts = datetime.datetime.fromtimestamp(p.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')
return {"exists": True, "lastModified": ts}
def main():
# 使用 officials_stats.json与 sync_officials_stats.py 统一)
officials_data = read_json(DATA / 'officials_stats.json', {})
officials = officials_data.get('officials', []) if isinstance(officials_data, dict) else officials_data
# 任务源优先tasks_source.json可对接外部系统同步写入
tasks = atomic_json_read(DATA / 'tasks_source.json', [])
if not tasks:
tasks = read_json(DATA / 'tasks.json', [])
sync_status = read_json(DATA / 'sync_status.json', {})
org_map = {}
for o in officials:
label = o.get('label', o.get('name', ''))
if label:
org_map[label] = label
now_ts = datetime.datetime.now(datetime.timezone.utc)
for t in tasks:
t['org'] = t.get('org') or org_map.get(t.get('official', ''), '')
t['outputMeta'] = output_meta(t.get('output', ''))
# 心跳时效检测:对 Doing/Assigned 状态的任务标注活跃度
if t.get('state') in ('Doing', 'Assigned', 'Review'):
updated_raw = t.get('updatedAt') or t.get('sourceMeta', {}).get('updatedAt')
age_sec = None
if updated_raw:
try:
if isinstance(updated_raw, (int, float)):
updated_dt = datetime.datetime.fromtimestamp(updated_raw / 1000, tz=datetime.timezone.utc)
else:
updated_dt = datetime.datetime.fromisoformat(str(updated_raw).replace('Z', '+00:00'))
age_sec = (now_ts - updated_dt).total_seconds()
except Exception:
pass
if age_sec is None:
t['heartbeat'] = {'status': 'unknown', 'label': '⚪ 未知', 'ageSec': None}
elif age_sec < 180:
t['heartbeat'] = {'status': 'active', 'label': f'🟢 活跃 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)}
elif age_sec < 600:
t['heartbeat'] = {'status': 'warn', 'label': f'🟡 可能停滞 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)}
else:
t['heartbeat'] = {'status': 'stalled', 'label': f'🔴 已停滞 {int(age_sec//60)}分钟', 'ageSec': int(age_sec)}
else:
t['heartbeat'] = None
today_done = sum(1 for t in tasks if t.get('state') == 'Done')
in_progress = sum(1 for t in tasks if t.get('state') in ['Doing', 'Review', 'Next', 'Blocked'])
blocked = sum(1 for t in tasks if t.get('state') == 'Blocked')
history = []
for t in tasks:
if t.get('state') == 'Done':
lm = t.get('outputMeta', {}).get('lastModified')
history.append({
'at': lm or '未知',
'official': t.get('official'),
'task': t.get('title'),
'out': t.get('output'),
'qa': '通过' if t.get('outputMeta', {}).get('exists') else '待补成果'
})
payload = {
'generatedAt': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'taskSource': 'tasks_source.json' if (DATA / 'tasks_source.json').exists() else 'tasks.json',
'officials': officials,
'tasks': tasks,
'history': history,
'metrics': {
'officialCount': len(officials),
'todayDone': today_done,
'inProgress': in_progress,
'blocked': blocked
},
'syncStatus': sync_status,
'health': {
'syncOk': bool(sync_status.get('ok', False)),
'syncLatencyMs': sync_status.get('durationMs'),
'missingFieldCount': len(sync_status.get('missingFields', {})),
}
}
atomic_json_write(DATA / 'live_status.json', payload)
log.info(f'updated live_status.json ({len(tasks)} tasks)')
if __name__ == '__main__':
main()