修复Bot频道同步:手动同步支持新增+清理,缓存回放my_chat_member

This commit is contained in:
meoacgx
2025-12-23 23:21:12 +08:00
parent 4a601570d1
commit be1a6db30a
5 changed files with 100 additions and 14 deletions

3
.gitignore vendored
View File

@@ -66,4 +66,5 @@ docker-data/
# Local build artifacts
artifacts/
模块源码/
模块源码/
src/TelegramPanel.Web/data-protection-keys/

View File

@@ -17,26 +17,31 @@ public class BotTelegramService
{
private readonly BotManagementService _botManagement;
private readonly TelegramBotApiClient _api;
private readonly BotUpdateHub _updateHub;
private readonly ILogger<BotTelegramService> _logger;
public BotTelegramService(
BotManagementService botManagement,
TelegramBotApiClient api,
BotUpdateHub updateHub,
ILogger<BotTelegramService> logger)
{
_botManagement = botManagement;
_api = api;
_updateHub = updateHub;
_logger = logger;
}
public sealed record BotChannelSyncResult(int AppliedUpdates, int RemovedStale);
/// <summary>
/// 手动同步(对账Bot API 无法直接“枚举 Bot 作为管理员的频道”,
/// 新增/移除主要依赖 <see cref="BotUpdateHub"/> 轮询到的 my_chat_member updates
/// 手动同步(新增 + 清理):
/// - 新增/移除:尽力从 <see cref="BotUpdateHub"/> 拉取并应用 my_chat_member updates(回放/增量)
/// - 清理:对本地已记录频道做一次权限核验,移除 Bot 已被撤权/踢出的频道记录
///
/// 此处仅用于:对本地已记录的频道做一次权限核验,
/// 自动清理 Bot 已被移除/降权导致的“僵尸频道”记录(用于修复漏收 updates 的场景)。
/// 说明Telegram Bot API 无法直接“枚举 Bot 当前所在的所有频道”,因此仍以更新队列为新增来源。
/// </summary>
public async Task<int> SyncBotChannelsAsync(int botId, CancellationToken cancellationToken)
public async Task<BotChannelSyncResult> SyncBotChannelsAsync(int botId, CancellationToken cancellationToken)
{
if (botId <= 0)
throw new ArgumentException("botId 无效", nameof(botId));
@@ -47,12 +52,14 @@ public class BotTelegramService
if (!bot.IsActive)
throw new InvalidOperationException("该机器人已停用");
var applied = await DrainAndApplyMyChatMemberUpdatesAsync(botId, cancellationToken);
var channels = (await _botManagement.GetChannelsAsync(botId)).ToList();
if (channels.Count == 0)
{
bot.LastSyncAt = DateTime.UtcNow;
await _botManagement.UpdateBotAsync(bot);
return 0;
return new BotChannelSyncResult(AppliedUpdates: applied, RemovedStale: 0);
}
var botUserId = await GetBotUserIdAsync(bot.Token, cancellationToken);
@@ -96,7 +103,50 @@ public class BotTelegramService
else
_logger.LogInformation("Bot manual sync completed: no stale channels removed (botId={BotId})", botId);
return removed;
if (applied > 0)
_logger.LogInformation("Bot manual sync applied {Applied} my_chat_member updates (botId={BotId})", applied, botId);
return new BotChannelSyncResult(AppliedUpdates: applied, RemovedStale: removed);
}
private async Task<int> DrainAndApplyMyChatMemberUpdatesAsync(int botId, CancellationToken cancellationToken)
{
// 通过 BotUpdateHub 共享单一 getUpdates 轮询,避免 409 Conflict
await using var sub = await _updateHub.SubscribeAsync(botId, cancellationToken);
var applied = 0;
var batch = new List<JsonElement>(256);
// poller 启动时会有短暂延迟 + getUpdates 长轮询,因此这里给足时间(避免“点同步就结束了但更新还没拉到”)。
var deadline = DateTime.UtcNow.AddSeconds(8);
while (DateTime.UtcNow < deadline)
{
cancellationToken.ThrowIfCancellationRequested();
var drainedAny = false;
while (sub.Reader.TryRead(out var update))
{
drainedAny = true;
if (update.ValueKind == JsonValueKind.Object && update.TryGetProperty("my_chat_member", out _))
{
batch.Add(update);
if (batch.Count >= 200)
{
applied += await ApplyMyChatMemberUpdatesAsync(botId, batch, cancellationToken);
batch.Clear();
}
}
}
if (!drainedAny)
await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken);
}
if (batch.Count > 0)
applied += await ApplyMyChatMemberUpdatesAsync(botId, batch, cancellationToken);
return applied;
}
private async Task<string?> GetBotMemberStatusAsync(string token, long chatId, long botUserId, CancellationToken cancellationToken)

View File

@@ -130,6 +130,10 @@ public sealed class BotUpdateHub : IAsyncDisposable
private readonly object _subscribersLock = new();
private readonly Dictionary<Guid, Channel<JsonElement>> _subscribers = new();
private const int PendingMyChatMemberMax = 2000;
private readonly object _pendingLock = new();
private readonly Queue<JsonElement> _pendingMyChatMember = new();
private long _nextOffset;
public int BotId => _persistBotId;
@@ -182,11 +186,30 @@ public sealed class BotUpdateHub : IAsyncDisposable
var id = Guid.NewGuid();
var ch = Channel.CreateBounded<JsonElement>(SubscriberChannelOptions);
List<JsonElement>? pending = null;
lock (_subscribersLock)
{
_subscribers[id] = ch;
}
lock (_pendingLock)
{
if (_pendingMyChatMember.Count > 0)
{
pending = _pendingMyChatMember.ToList();
_pendingMyChatMember.Clear();
}
}
if (pending != null)
{
foreach (var u in pending)
{
// 尽力写入:满了就丢,避免首次订阅阻塞
ch.Writer.TryWrite(u);
}
}
return new BotUpdateSubscription(_persistBotId, ch.Reader, async () =>
{
Channel<JsonElement>? removed = null;
@@ -277,6 +300,18 @@ public sealed class BotUpdateHub : IAsyncDisposable
private void Broadcast(JsonElement update)
{
// 即使当前没有订阅者,也要缓存 my_chat_member 更新:
// 手动同步通常是“点按钮才订阅”,否则 poller 会把更新吃掉导致新增频道永远同步不到。
if (update.ValueKind == JsonValueKind.Object && update.TryGetProperty("my_chat_member", out _))
{
lock (_pendingLock)
{
_pendingMyChatMember.Enqueue(update.Clone());
while (_pendingMyChatMember.Count > PendingMyChatMemberMax)
_pendingMyChatMember.Dequeue();
}
}
List<Channel<JsonElement>> targets;
lock (_subscribersLock)
{

View File

@@ -260,11 +260,11 @@
loading = true;
try
{
var count = await BotTelegram.SyncBotChannelsAsync(selectedBotId, CancellationToken.None);
if (count > 0)
Snackbar.Add($"同步完成:已清理 {count} 个失效频道", Severity.Success);
var result = await BotTelegram.SyncBotChannelsAsync(selectedBotId, CancellationToken.None);
if (result.AppliedUpdates > 0 || result.RemovedStale > 0)
Snackbar.Add($"同步完成:应用更新 {result.AppliedUpdates} 条,清理失效 {result.RemovedStale} 个频道", Severity.Success);
else
Snackbar.Add("未发现需要理的频道。提示:新增频道依赖 Bot 自动同步getUpdates/my_chat_member", Severity.Info);
Snackbar.Add("同步完成:没有需要理的更新或失效频道。提示:请先把 Bot 设为频道管理员,再点同步。", Severity.Info);
await LoadBotData();
}
catch (Exception ex)

View File

@@ -31,8 +31,8 @@ public class BotAutoSyncBackgroundService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 默认开启满足“Bot 拉进频道后自动出现”的直觉
var enabled = _configuration.GetValue("Telegram:BotAutoSyncEnabled", true);
// 默认关闭:避免对“后台自动同步”的误解;需要时可显式开启
var enabled = _configuration.GetValue("Telegram:BotAutoSyncEnabled", false);
if (!enabled)
{
_logger.LogInformation("Bot auto sync disabled (Telegram:BotAutoSyncEnabled=false)");