Files
CLIProxyAPI/internal/redisqueue/queue_test.go
Luis Pater fd30944830 feat(auth): add error event publishing and Redis queue integration
- Introduced `publishErrorEvent` in `Manager` to publish error events to Redis.
- Implemented error event structure to capture authentication errors with detailed metadata.
- Added test cases for error event publishing, subscription, and Redis protocol handling.
- Enhanced error and usage queue handling with `SubscribeErrors` and `EnqueueError`.

Closes: #3701
2026-06-04 00:53:43 +08:00

136 lines
3.6 KiB
Go

package redisqueue
import (
"testing"
"time"
)
func TestEnqueueBroadcastsToUsageSubscribersAndSkipsQueue(t *testing.T) {
withEnabledQueue(t, func() {
first, unsubscribeFirst := SubscribeUsage()
defer unsubscribeFirst()
second, unsubscribeSecond := SubscribeUsage()
defer unsubscribeSecond()
requireUsageSubscriberPayload(t, first, usageSupportRefreshPayload)
requireUsageSubscriberPayload(t, second, usageSupportRefreshPayload)
Enqueue([]byte("usage-record"))
requireUsageSubscriberPayload(t, first, "usage-record")
requireUsageSubscriberPayload(t, second, "usage-record")
if items := PopOldest(1); len(items) != 0 {
t.Fatalf("PopOldest() items = %q, want empty after subscriber broadcast", items)
}
unsubscribeFirst()
unsubscribeSecond()
Enqueue([]byte("queued-record"))
items := PopOldest(1)
if len(items) != 1 || string(items[0]) != "queued-record" {
t.Fatalf("PopOldest() items = %q, want queued record after unsubscribe", items)
}
})
}
func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) {
withEnabledQueue(t, func() {
subscriber, unsubscribe := SubscribeUsage()
defer unsubscribe()
errorSubscriber, unsubscribeErrors := SubscribeErrors()
defer unsubscribeErrors()
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
SetEnabled(false)
select {
case _, ok := <-subscriber:
if ok {
t.Fatalf("subscriber channel remained open after SetEnabled(false)")
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for subscriber close")
}
select {
case _, ok := <-errorSubscriber:
if ok {
t.Fatalf("error subscriber channel remained open after SetEnabled(false)")
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for error subscriber close")
}
})
}
func TestEnqueueErrorBroadcastsToErrorSubscribersAndDiscardsWithoutSubscribers(t *testing.T) {
withEnabledQueue(t, func() {
subscriber, unsubscribe := SubscribeErrors()
defer unsubscribe()
EnqueueError([]byte("error-record"))
requireUsageSubscriberPayload(t, subscriber, "error-record")
unsubscribe()
EnqueueError([]byte("discarded-error"))
requireErrorQueueEmpty(t)
})
}
func TestNotifyUsageRefreshBroadcastsOnlyToUsageSubscribers(t *testing.T) {
withEnabledQueue(t, func() {
subscriber, unsubscribe := SubscribeUsage()
defer unsubscribe()
errorSubscriber, unsubscribeErrors := SubscribeErrors()
defer unsubscribeErrors()
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
NotifyUsageRefresh()
requireUsageSubscriberPayload(t, subscriber, usageRefreshPayload)
select {
case got := <-errorSubscriber:
t.Fatalf("error subscriber received usage refresh payload %q", string(got))
default:
}
unsubscribe()
NotifyUsageRefresh()
if items := PopOldest(1); len(items) != 0 {
t.Fatalf("PopOldest() items = %q, want empty after refresh notification without subscribers", items)
}
})
}
func requireUsageSubscriberPayload(t *testing.T, subscriber <-chan []byte, want string) {
t.Helper()
select {
case got, ok := <-subscriber:
if !ok {
t.Fatalf("subscriber closed before receiving %q", want)
}
if string(got) != want {
t.Fatalf("subscriber payload = %q, want %q", string(got), want)
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for subscriber payload %q", want)
}
}
func requireErrorQueueEmpty(t *testing.T) {
t.Helper()
errorGlobal.mu.Lock()
defer errorGlobal.mu.Unlock()
if len(errorGlobal.items)-errorGlobal.head != 0 {
t.Fatalf("error queue retained %d item(s), want none", len(errorGlobal.items)-errorGlobal.head)
}
}