mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-10 08:13:22 +08:00
- Introduced `publishErrorEvent` in `Manager` to publish error events to Redis. - Implemented error event structure to capture authentication errors with detailed metadata. - Added test cases for error event publishing, subscription, and Redis protocol handling. - Enhanced error and usage queue handling with `SubscribeErrors` and `EnqueueError`. Closes: #3701
136 lines
3.6 KiB
Go
136 lines
3.6 KiB
Go
package redisqueue
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestEnqueueBroadcastsToUsageSubscribersAndSkipsQueue(t *testing.T) {
|
|
withEnabledQueue(t, func() {
|
|
first, unsubscribeFirst := SubscribeUsage()
|
|
defer unsubscribeFirst()
|
|
second, unsubscribeSecond := SubscribeUsage()
|
|
defer unsubscribeSecond()
|
|
|
|
requireUsageSubscriberPayload(t, first, usageSupportRefreshPayload)
|
|
requireUsageSubscriberPayload(t, second, usageSupportRefreshPayload)
|
|
|
|
Enqueue([]byte("usage-record"))
|
|
|
|
requireUsageSubscriberPayload(t, first, "usage-record")
|
|
requireUsageSubscriberPayload(t, second, "usage-record")
|
|
|
|
if items := PopOldest(1); len(items) != 0 {
|
|
t.Fatalf("PopOldest() items = %q, want empty after subscriber broadcast", items)
|
|
}
|
|
|
|
unsubscribeFirst()
|
|
unsubscribeSecond()
|
|
|
|
Enqueue([]byte("queued-record"))
|
|
items := PopOldest(1)
|
|
if len(items) != 1 || string(items[0]) != "queued-record" {
|
|
t.Fatalf("PopOldest() items = %q, want queued record after unsubscribe", items)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) {
|
|
withEnabledQueue(t, func() {
|
|
subscriber, unsubscribe := SubscribeUsage()
|
|
defer unsubscribe()
|
|
errorSubscriber, unsubscribeErrors := SubscribeErrors()
|
|
defer unsubscribeErrors()
|
|
|
|
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
|
|
|
|
SetEnabled(false)
|
|
|
|
select {
|
|
case _, ok := <-subscriber:
|
|
if ok {
|
|
t.Fatalf("subscriber channel remained open after SetEnabled(false)")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("timeout waiting for subscriber close")
|
|
}
|
|
|
|
select {
|
|
case _, ok := <-errorSubscriber:
|
|
if ok {
|
|
t.Fatalf("error subscriber channel remained open after SetEnabled(false)")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("timeout waiting for error subscriber close")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEnqueueErrorBroadcastsToErrorSubscribersAndDiscardsWithoutSubscribers(t *testing.T) {
|
|
withEnabledQueue(t, func() {
|
|
subscriber, unsubscribe := SubscribeErrors()
|
|
defer unsubscribe()
|
|
|
|
EnqueueError([]byte("error-record"))
|
|
requireUsageSubscriberPayload(t, subscriber, "error-record")
|
|
|
|
unsubscribe()
|
|
|
|
EnqueueError([]byte("discarded-error"))
|
|
requireErrorQueueEmpty(t)
|
|
})
|
|
}
|
|
|
|
func TestNotifyUsageRefreshBroadcastsOnlyToUsageSubscribers(t *testing.T) {
|
|
withEnabledQueue(t, func() {
|
|
subscriber, unsubscribe := SubscribeUsage()
|
|
defer unsubscribe()
|
|
errorSubscriber, unsubscribeErrors := SubscribeErrors()
|
|
defer unsubscribeErrors()
|
|
|
|
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
|
|
|
|
NotifyUsageRefresh()
|
|
requireUsageSubscriberPayload(t, subscriber, usageRefreshPayload)
|
|
|
|
select {
|
|
case got := <-errorSubscriber:
|
|
t.Fatalf("error subscriber received usage refresh payload %q", string(got))
|
|
default:
|
|
}
|
|
|
|
unsubscribe()
|
|
NotifyUsageRefresh()
|
|
if items := PopOldest(1); len(items) != 0 {
|
|
t.Fatalf("PopOldest() items = %q, want empty after refresh notification without subscribers", items)
|
|
}
|
|
})
|
|
}
|
|
|
|
func requireUsageSubscriberPayload(t *testing.T, subscriber <-chan []byte, want string) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case got, ok := <-subscriber:
|
|
if !ok {
|
|
t.Fatalf("subscriber closed before receiving %q", want)
|
|
}
|
|
if string(got) != want {
|
|
t.Fatalf("subscriber payload = %q, want %q", string(got), want)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("timeout waiting for subscriber payload %q", want)
|
|
}
|
|
}
|
|
|
|
func requireErrorQueueEmpty(t *testing.T) {
|
|
t.Helper()
|
|
|
|
errorGlobal.mu.Lock()
|
|
defer errorGlobal.mu.Unlock()
|
|
|
|
if len(errorGlobal.items)-errorGlobal.head != 0 {
|
|
t.Fatalf("error queue retained %d item(s), want none", len(errorGlobal.items)-errorGlobal.head)
|
|
}
|
|
}
|