package redisqueue import ( "testing" "time" ) func TestEnqueueBroadcastsToUsageSubscribersAndSkipsQueue(t *testing.T) { withEnabledQueue(t, func() { first, unsubscribeFirst := SubscribeUsage() defer unsubscribeFirst() second, unsubscribeSecond := SubscribeUsage() defer unsubscribeSecond() requireUsageSubscriberPayload(t, first, usageSupportRefreshPayload) requireUsageSubscriberPayload(t, second, usageSupportRefreshPayload) Enqueue([]byte("usage-record")) requireUsageSubscriberPayload(t, first, "usage-record") requireUsageSubscriberPayload(t, second, "usage-record") if items := PopOldest(1); len(items) != 0 { t.Fatalf("PopOldest() items = %q, want empty after subscriber broadcast", items) } unsubscribeFirst() unsubscribeSecond() Enqueue([]byte("queued-record")) items := PopOldest(1) if len(items) != 1 || string(items[0]) != "queued-record" { t.Fatalf("PopOldest() items = %q, want queued record after unsubscribe", items) } }) } func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) { withEnabledQueue(t, func() { subscriber, unsubscribe := SubscribeUsage() defer unsubscribe() errorSubscriber, unsubscribeErrors := SubscribeErrors() defer unsubscribeErrors() requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload) SetEnabled(false) select { case _, ok := <-subscriber: if ok { t.Fatalf("subscriber channel remained open after SetEnabled(false)") } case <-time.After(time.Second): t.Fatalf("timeout waiting for subscriber close") } select { case _, ok := <-errorSubscriber: if ok { t.Fatalf("error subscriber channel remained open after SetEnabled(false)") } case <-time.After(time.Second): t.Fatalf("timeout waiting for error subscriber close") } }) } func TestEnqueueErrorBroadcastsToErrorSubscribersAndDiscardsWithoutSubscribers(t *testing.T) { withEnabledQueue(t, func() { subscriber, unsubscribe := SubscribeErrors() defer unsubscribe() EnqueueError([]byte("error-record")) requireUsageSubscriberPayload(t, subscriber, "error-record") unsubscribe() EnqueueError([]byte("discarded-error")) requireErrorQueueEmpty(t) }) } func TestNotifyUsageRefreshBroadcastsOnlyToUsageSubscribers(t *testing.T) { withEnabledQueue(t, func() { subscriber, unsubscribe := SubscribeUsage() defer unsubscribe() errorSubscriber, unsubscribeErrors := SubscribeErrors() defer unsubscribeErrors() requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload) NotifyUsageRefresh() requireUsageSubscriberPayload(t, subscriber, usageRefreshPayload) select { case got := <-errorSubscriber: t.Fatalf("error subscriber received usage refresh payload %q", string(got)) default: } unsubscribe() NotifyUsageRefresh() if items := PopOldest(1); len(items) != 0 { t.Fatalf("PopOldest() items = %q, want empty after refresh notification without subscribers", items) } }) } func requireUsageSubscriberPayload(t *testing.T, subscriber <-chan []byte, want string) { t.Helper() select { case got, ok := <-subscriber: if !ok { t.Fatalf("subscriber closed before receiving %q", want) } if string(got) != want { t.Fatalf("subscriber payload = %q, want %q", string(got), want) } case <-time.After(time.Second): t.Fatalf("timeout waiting for subscriber payload %q", want) } } func requireErrorQueueEmpty(t *testing.T) { t.Helper() errorGlobal.mu.Lock() defer errorGlobal.mu.Unlock() if len(errorGlobal.items)-errorGlobal.head != 0 { t.Fatalf("error queue retained %d item(s), want none", len(errorGlobal.items)-errorGlobal.head) } }