feat: 支持续活跃小数间隔与取消状态,验证异步并保持严格间隔

This commit is contained in:
meoacgx
2026-03-10 23:40:12 +08:00
parent 0faba7a1fe
commit e7a67a8555
7 changed files with 304 additions and 77 deletions

View File

@@ -146,6 +146,21 @@ public class BatchTaskManagementService
await TrimHistoryTasksIfNeededAsync();
}
public async Task CancelTaskAsync(int taskId)
{
var task = await _batchTaskRepository.GetFreshByIdAsync(taskId);
if (task == null)
return;
if (task.Status is "completed" or "failed" or "canceled")
return;
task.Status = "canceled";
task.CompletedAt = DateTime.UtcNow;
await _batchTaskRepository.UpdateFreshAsync(task);
await TrimHistoryTasksIfNeededAsync();
}
public async Task DeleteTaskAsync(int id)
{
var task = await _batchTaskRepository.GetFreshByIdAsync(id);

View File

@@ -7,7 +7,7 @@ public class BatchTask
{
public int Id { get; set; }
public string TaskType { get; set; } = null!; // invite/set_admin/create_channel等
public string Status { get; set; } = "pending"; // pending/running/paused/completed/failed
public string Status { get; set; } = "pending"; // pending/running/paused/completed/failed/canceled
public int Total { get; set; }
public int Completed { get; set; }
public int Failed { get; set; }

View File

@@ -55,7 +55,7 @@ public class BatchTaskRepository : Repository<BatchTask>, IBatchTaskRepository
return 0;
var staleTasks = await _dbSet
.Where(t => t.Status == "completed" || t.Status == "failed")
.Where(t => t.Status == "completed" || t.Status == "failed" || t.Status == "canceled")
.OrderByDescending(t => t.CreatedAt)
.ThenByDescending(t => t.Id)
.Skip(keepCount)

View File

@@ -60,12 +60,14 @@
<MudGrid>
<MudItem xs="12" md="4">
<MudNumericField T="int" @bind-Value="delayMinMs" @bind-Value:after="OnInputsChanged"
Label="最小间隔(秒)" Variant="Variant.Outlined" Min="0" Max="600" />
<MudNumericField T="decimal" @bind-Value="delayMinSeconds" @bind-Value:after="OnInputsChanged"
Label="最小间隔(秒)" Variant="Variant.Outlined" Min="0" Max="600" Step="0.01m"
HelperText="支持小数秒,例如 0.01、0.1" />
</MudItem>
<MudItem xs="12" md="4">
<MudNumericField T="int" @bind-Value="delayMaxMs" @bind-Value:after="OnInputsChanged"
Label="最大间隔(秒)" Variant="Variant.Outlined" Min="0" Max="600" />
<MudNumericField T="decimal" @bind-Value="delayMaxSeconds" @bind-Value:after="OnInputsChanged"
Label="最大间隔(秒)" Variant="Variant.Outlined" Min="0" Max="600" Step="0.01m"
HelperText="支持小数秒,例如 0.01、0.1" />
</MudItem>
<MudItem xs="12" md="4">
<MudNumericField T="int" @bind-Value="maxMessages" @bind-Value:after="OnInputsChanged"
@@ -157,8 +159,8 @@
private IEnumerable<int> selectedCategoryIds = Array.Empty<int>();
private string targetsText = string.Empty;
private string dictionaryText = string.Empty;
private int delayMinMs = 15;
private int delayMaxMs = 45;
private decimal delayMinSeconds = 15m;
private decimal delayMaxSeconds = 45m;
private int maxMessages;
private bool enableAiVerification;
private int verificationTimeoutSeconds = 15;
@@ -222,8 +224,8 @@
selectedCategoryIds = NormalizeCategoryIds(cfg.CategoryIds, cfg.CategoryId);
targetsText = string.Join(Environment.NewLine, cfg.Targets ?? new List<string>());
dictionaryText = string.Join(Environment.NewLine, cfg.Dictionary ?? new List<string>());
delayMinMs = ConvertMillisecondsToSeconds(cfg.DelayMinMs);
delayMaxMs = ConvertMillisecondsToSeconds(cfg.DelayMaxMs);
delayMinSeconds = ConvertMillisecondsToSeconds(cfg.DelayMinMs);
delayMaxSeconds = ConvertMillisecondsToSeconds(cfg.DelayMaxMs);
maxMessages = cfg.MaxMessages;
accountMode = NormalizeMode(cfg.AccountMode);
targetMode = NormalizeMode(cfg.TargetMode);
@@ -343,13 +345,13 @@
}
}
if (delayMinMs < 0 || delayMaxMs < 0)
if (delayMinSeconds < 0 || delayMaxSeconds < 0)
{
await DraftChanged.InvokeAsync(new ModuleTaskDraft(0, null, false, "间隔不能为负数"));
return;
}
if (delayMaxMs < delayMinMs)
if (delayMaxSeconds < delayMinSeconds)
{
await DraftChanged.InvokeAsync(new ModuleTaskDraft(0, null, false, "最大间隔不能小于最小间隔"));
return;
@@ -392,8 +394,8 @@
CategoryNames = categoryNames,
Targets = targets,
Dictionary = dictionary,
DelayMinMs = delayMinMs * 1000,
DelayMaxMs = delayMaxMs * 1000,
DelayMinMs = ConvertSecondsToMilliseconds(delayMinSeconds),
DelayMaxMs = ConvertSecondsToMilliseconds(delayMaxSeconds),
MaxMessages = maxMessages,
AccountMode = accountMode,
TargetMode = targetMode,
@@ -421,12 +423,20 @@
return ids;
}
private static int ConvertMillisecondsToSeconds(int value)
private static decimal ConvertMillisecondsToSeconds(int value)
{
if (value <= 0)
return 0m;
return Math.Round(value / 1000m, 3, MidpointRounding.AwayFromZero);
}
private static int ConvertSecondsToMilliseconds(decimal value)
{
if (value <= 0)
return 0;
return Math.Max(1, (int)Math.Ceiling(value / 1000d));
return (int)Math.Round(value * 1000m, MidpointRounding.AwayFromZero);
}
private static bool IsValidMode(string? value)

View File

@@ -45,6 +45,7 @@
<MudSelectItem Value='@("all")'>全部</MudSelectItem>
<MudSelectItem Value='@("completed")'>已完成</MudSelectItem>
<MudSelectItem Value='@("failed")'>失败</MudSelectItem>
<MudSelectItem Value='@("canceled")'>已取消</MudSelectItem>
</MudSelect>
</MudStack>
</MudCardContent>
@@ -450,6 +451,7 @@
"paused" => "已暂停",
"completed" => "已完成",
"failed" => "失败",
"canceled" => "已取消",
_ => status
};
@@ -460,6 +462,7 @@
"paused" => Color.Warning,
"pending" => Color.Default,
"failed" => Color.Error,
"canceled" => Color.Secondary,
_ => Color.Default
};
@@ -469,6 +472,7 @@
"running" => Color.Primary,
"paused" => Color.Warning,
"failed" => Color.Error,
"canceled" => Color.Secondary,
_ => Color.Default
};
@@ -484,7 +488,7 @@
status is "pending" or "running" or "paused";
private static bool IsHistoryStatus(string status) =>
status is "completed" or "failed";
status is "completed" or "failed" or "canceled";
private static string GetScheduledStatusName(string status) =>
string.Equals((status ?? string.Empty).Trim(), ScheduledTaskStatuses.Paused, StringComparison.OrdinalIgnoreCase)
@@ -594,10 +598,10 @@
private bool CanCancelTask(BatchTask task)
{
var status = GetDisplayStatus(task);
if (status is not ("pending" or "running"))
if (status is not ("pending" or "running" or "paused"))
return false;
return !CanPauseTask(task);
return true;
}
private static List<string> ExtractBotAdminFailureLines(BatchTask task)
@@ -872,7 +876,7 @@
{
try
{
await TaskManagement.CompleteTaskAsync(id, false);
await TaskManagement.CancelTaskAsync(id);
await LoadTasks();
Snackbar.Add("任务已取消", Severity.Warning);
}

View File

@@ -155,7 +155,7 @@ public sealed class BatchTaskBackgroundService : BackgroundService
failed = after.Failed;
}
// 如果任务被用户取消(当前实现Cancel 会把状态写成 failed则不覆盖它
// 如果任务被用户取消(状态变为 canceled则不覆盖它
var latest = await taskManagement.GetTaskAsync(pending.Id);
if (latest != null && latest.Status != "running")
return;

View File

@@ -1,3 +1,5 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -30,6 +32,7 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
ValidateAndNormalizeConfig(config);
config.Canceled = false;
config.Error = null;
var configGate = new SemaphoreSlim(1, 1);
if (config.EnableAiVerification)
{
@@ -55,7 +58,7 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
if (!await host.IsStillRunningAsync(cancellationToken))
{
config.Canceled = true;
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await PersistConfigAsync(taskManagement, host.TaskId, config, configGate, cancellationToken);
return;
}
@@ -66,7 +69,7 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
if (!await host.IsStillRunningAsync(cancellationToken))
{
config.Canceled = true;
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await PersistConfigAsync(taskManagement, host.TaskId, config, configGate, cancellationToken);
return;
}
@@ -87,14 +90,15 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
if (accountSlots.Count == 0)
{
config.Error = "没有可用的账号-目标组合(请确认账号已加入目标群组/频道)";
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await PersistConfigAsync(taskManagement, host.TaskId, config, configGate, cancellationToken);
throw new InvalidOperationException(config.Error);
}
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await PersistConfigAsync(taskManagement, host.TaskId, config, configGate, cancellationToken);
var completed = 0;
var failed = 0;
var progress = new TaskProgressCounter();
var verificationTasks = new ConcurrentDictionary<Guid, Task>();
using var verificationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var accountQueueIndex = 0;
var messageQueueIndex = 0;
var targetQueueIndexByAccountId = new Dictionary<int, int>();
@@ -102,6 +106,18 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
try
{
async Task<bool> DelayUntilNextSendAsync(Stopwatch timer, int intervalMs)
{
if (intervalMs <= 0)
return true;
var remaining = intervalMs - (int)timer.ElapsedMilliseconds;
if (remaining <= 0)
return true;
return await DelayWithPauseCheckAsync(host, remaining, cancellationToken);
}
while (!cancellationToken.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
@@ -109,12 +125,16 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
if (!await host.IsStillRunningAsync(cancellationToken))
{
config.Canceled = true;
verificationTokenSource.Cancel();
break;
}
if (config.MaxMessages > 0 && completed >= config.MaxMessages)
if (config.MaxMessages > 0 && progress.Completed >= config.MaxMessages)
break;
var intervalMs = NextDelayMilliseconds(config.DelayMinMs, config.DelayMaxMs);
var loopTimer = Stopwatch.StartNew();
var accountIdx = SelectIndex(config.AccountMode, accountSlots.Count, ref accountQueueIndex);
var accountSlot = accountSlots[accountIdx];
@@ -132,6 +152,7 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
if (!await host.IsStillRunningAsync(cancellationToken))
{
config.Canceled = true;
verificationTokenSource.Cancel();
break;
}
@@ -142,25 +163,32 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
}
catch (Exception ex)
{
completed++;
failed++;
var completed = Interlocked.Increment(ref progress.Completed);
Interlocked.Increment(ref progress.Failed);
var hadTemplateFailure = true;
AddFailure(config, accountSlot.Account, targetSlot.RawTarget, $"词典模板解析失败:{ex.Message}");
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await AddFailureAndPersistAsync(
taskManagement,
host.TaskId,
config,
accountSlot.Account,
targetSlot.RawTarget,
$"词典模板解析失败:{ex.Message}",
configGate,
cancellationToken);
if (ShouldPersistProgress(completed, hadTemplateFailure, lastProgressPersistAt))
{
await host.UpdateProgressAsync(completed, failed, cancellationToken);
await host.UpdateProgressAsync(completed, progress.Failed, cancellationToken);
lastProgressPersistAt = DateTime.UtcNow;
}
if (config.MaxMessages > 0 && completed >= config.MaxMessages)
break;
var templateFailDelayMs = NextDelayMilliseconds(config.DelayMinMs, config.DelayMaxMs);
if (templateFailDelayMs > 0 && !await DelayWithPauseCheckAsync(host, templateFailDelayMs, cancellationToken))
if (!await DelayUntilNextSendAsync(loopTimer, intervalMs))
{
config.Canceled = true;
verificationTokenSource.Cancel();
break;
}
@@ -169,25 +197,32 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
if (text.Length == 0)
{
completed++;
failed++;
var completed = Interlocked.Increment(ref progress.Completed);
Interlocked.Increment(ref progress.Failed);
var hadEmptyMessageFailure = true;
AddFailure(config, accountSlot.Account, targetSlot.RawTarget, "词典模板解析结果为空,无法发送");
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await AddFailureAndPersistAsync(
taskManagement,
host.TaskId,
config,
accountSlot.Account,
targetSlot.RawTarget,
"词典模板解析结果为空,无法发送",
configGate,
cancellationToken);
if (ShouldPersistProgress(completed, hadEmptyMessageFailure, lastProgressPersistAt))
{
await host.UpdateProgressAsync(completed, failed, cancellationToken);
await host.UpdateProgressAsync(completed, progress.Failed, cancellationToken);
lastProgressPersistAt = DateTime.UtcNow;
}
if (config.MaxMessages > 0 && completed >= config.MaxMessages)
break;
var emptyDelayMs = NextDelayMilliseconds(config.DelayMinMs, config.DelayMaxMs);
if (emptyDelayMs > 0 && !await DelayWithPauseCheckAsync(host, emptyDelayMs, cancellationToken))
if (!await DelayUntilNextSendAsync(loopTimer, intervalMs))
{
config.Canceled = true;
verificationTokenSource.Cancel();
break;
}
@@ -200,14 +235,22 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
text,
cancellationToken: cancellationToken);
completed++;
var sendCompleted = Interlocked.Increment(ref progress.Completed);
var hadFailureThisRound = false;
if (!send.Success)
{
failed++;
Interlocked.Increment(ref progress.Failed);
hadFailureThisRound = true;
AddFailure(config, accountSlot.Account, targetSlot.RawTarget, NormalizeReason(send.Error));
await AddFailureAndPersistAsync(
taskManagement,
host.TaskId,
config,
accountSlot.Account,
targetSlot.RawTarget,
NormalizeReason(send.Error),
configGate,
cancellationToken);
if (LooksLikePeerInvalid(send.Error))
{
@@ -219,80 +262,101 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
if (refresh.Success && refresh.Target != null)
targetSlot.Resolved = refresh.Target;
}
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
}
else if (config.EnableAiVerification)
{
if (!send.MessageId.HasValue || send.MessageId.Value <= 0)
{
failed++;
Interlocked.Increment(ref progress.Failed);
hadFailureThisRound = true;
AddFailure(config, accountSlot.Account, targetSlot.RawTarget, "消息已发送,但未获取到消息 ID无法执行 AI 验证");
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await AddFailureAndPersistAsync(
taskManagement,
host.TaskId,
config,
accountSlot.Account,
targetSlot.RawTarget,
"消息已发送,但未获取到消息 ID无法执行 AI 验证",
configGate,
cancellationToken);
}
else
{
var verification = await aiVerification.TryHandleAsync(
var verificationTaskId = Guid.NewGuid();
var verificationTask = RunVerificationAsync(
aiVerification,
accountSlot.Account,
targetSlot.Resolved,
targetSlot.RawTarget,
send.MessageId.Value,
config,
cancellationToken);
taskManagement,
host,
progress,
configGate,
logger,
verificationTokenSource.Token);
if (!verification.Success)
{
failed++;
hadFailureThisRound = true;
AddFailure(config, accountSlot.Account, targetSlot.RawTarget, NormalizeReason(verification.Error));
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
}
else
{
logger.LogInformation(
"UserChatActive AI verification completed: taskId={TaskId}, accountId={AccountId}, target={Target}, action={Action}",
host.TaskId,
accountSlot.Account.Id,
targetSlot.RawTarget,
verification.ActionSummary ?? "(none)");
}
verificationTasks[verificationTaskId] = verificationTask;
_ = verificationTask.ContinueWith(
_ => verificationTasks.TryRemove(verificationTaskId, out _),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
}
if (ShouldPersistProgress(completed, hadFailureThisRound, lastProgressPersistAt))
if (ShouldPersistProgress(sendCompleted, hadFailureThisRound, lastProgressPersistAt))
{
await host.UpdateProgressAsync(completed, failed, cancellationToken);
await host.UpdateProgressAsync(sendCompleted, progress.Failed, cancellationToken);
lastProgressPersistAt = DateTime.UtcNow;
}
if (config.MaxMessages > 0 && completed >= config.MaxMessages)
if (config.MaxMessages > 0 && sendCompleted >= config.MaxMessages)
break;
var delayMs = NextDelayMilliseconds(config.DelayMinMs, config.DelayMaxMs);
if (delayMs > 0 && !await DelayWithPauseCheckAsync(host, delayMs, cancellationToken))
if (!await DelayUntilNextSendAsync(loopTimer, intervalMs))
{
config.Canceled = true;
verificationTokenSource.Cancel();
break;
}
}
var pendingVerifications = verificationTasks.Values.ToArray();
if (pendingVerifications.Length > 0)
await Task.WhenAll(pendingVerifications);
}
catch (Exception ex)
{
verificationTokenSource.Cancel();
var pendingVerifications = verificationTasks.Values.ToArray();
if (pendingVerifications.Length > 0)
{
try
{
await Task.WhenAll(pendingVerifications);
}
catch
{
// 忽略验证任务的二次异常,避免覆盖主异常。
}
}
logger.LogWarning(ex, "UserChatActive task failed (taskId={TaskId})", host.TaskId);
config.Error = ex.Message;
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await PersistConfigAsync(taskManagement, host.TaskId, config, configGate, cancellationToken);
throw;
}
await host.UpdateProgressAsync(completed, failed, cancellationToken);
await host.UpdateProgressAsync(progress.Completed, progress.Failed, cancellationToken);
if (config.Canceled)
{
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await PersistConfigAsync(taskManagement, host.TaskId, config, configGate, cancellationToken);
return;
}
config.Error = null;
await taskManagement.UpdateTaskConfigAsync(host.TaskId, SerializeIndented(config));
await PersistConfigAsync(taskManagement, host.TaskId, config, configGate, cancellationToken);
}
private static UserChatActiveTaskConfig DeserializeConfig(string? rawConfig)
@@ -499,6 +563,140 @@ public sealed class UserChatActiveTaskHandler : IModuleTaskHandler
return JsonSerializer.Serialize(config, new JsonSerializerOptions { WriteIndented = true });
}
private static async Task PersistConfigAsync(
BatchTaskManagementService taskManagement,
int taskId,
UserChatActiveTaskConfig config,
SemaphoreSlim gate,
CancellationToken cancellationToken)
{
await gate.WaitAsync(cancellationToken);
try
{
await taskManagement.UpdateTaskConfigAsync(taskId, SerializeIndented(config));
}
finally
{
gate.Release();
}
}
private static async Task AddFailureAndPersistAsync(
BatchTaskManagementService taskManagement,
int taskId,
UserChatActiveTaskConfig config,
Account account,
string rawTarget,
string reason,
SemaphoreSlim gate,
CancellationToken cancellationToken)
{
await gate.WaitAsync(cancellationToken);
try
{
AddFailure(config, account, rawTarget, reason);
await taskManagement.UpdateTaskConfigAsync(taskId, SerializeIndented(config));
}
finally
{
gate.Release();
}
}
private static async Task RunVerificationAsync(
UserChatActiveAiVerificationService aiVerification,
Account account,
AccountTelegramToolsService.ResolvedChatTarget target,
string rawTarget,
int sentMessageId,
UserChatActiveTaskConfig config,
BatchTaskManagementService taskManagement,
IModuleTaskExecutionHost host,
TaskProgressCounter progress,
SemaphoreSlim configGate,
ILogger logger,
CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return;
var timeoutSeconds = Math.Clamp(config.VerificationTimeoutSeconds, 3, 300);
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(timeoutSeconds + 10));
try
{
var verification = await aiVerification.TryHandleAsync(
account,
target,
sentMessageId,
config,
timeoutCts.Token);
if (!verification.Success)
{
var failed = Interlocked.Increment(ref progress.Failed);
await AddFailureAndPersistAsync(
taskManagement,
host.TaskId,
config,
account,
rawTarget,
NormalizeReason(verification.Error),
configGate,
CancellationToken.None);
await host.UpdateProgressAsync(progress.Completed, failed, CancellationToken.None);
}
else
{
logger.LogInformation(
"UserChatActive AI verification completed: taskId={TaskId}, accountId={AccountId}, target={Target}, action={Action}",
host.TaskId,
account.Id,
rawTarget,
verification.ActionSummary ?? "(none)");
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// 任务被取消时忽略验证
}
catch (OperationCanceledException)
{
var failed = Interlocked.Increment(ref progress.Failed);
await AddFailureAndPersistAsync(
taskManagement,
host.TaskId,
config,
account,
rawTarget,
"验证处理超时",
configGate,
CancellationToken.None);
await host.UpdateProgressAsync(progress.Completed, failed, CancellationToken.None);
}
catch (Exception ex)
{
var failed = Interlocked.Increment(ref progress.Failed);
await AddFailureAndPersistAsync(
taskManagement,
host.TaskId,
config,
account,
rawTarget,
$"验证处理异常:{ex.Message}",
configGate,
CancellationToken.None);
await host.UpdateProgressAsync(progress.Completed, failed, CancellationToken.None);
}
}
private sealed class TaskProgressCounter
{
public int Completed;
public int Failed;
}
private sealed class AccountSlot
{
public AccountSlot(Account account)