mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-05-08 14:48:29 +08:00
- Introduced `redis-usage-queue-retention-seconds` config parameter with a default of 60 seconds and a max of 3600 seconds. - Updated logic in `redisqueue` to honor configurable retention periods for enqueued usage data. - Modified config validation and initialization to support and enforce retention limits. - Enhanced change tracking in `config_diff` to detect updates to this parameter.
156 lines
2.5 KiB
Go
156 lines
2.5 KiB
Go
package redisqueue
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
defaultRetentionSeconds int64 = 60
|
|
maxRetentionSeconds int64 = 3600
|
|
)
|
|
|
|
type queueItem struct {
|
|
enqueuedAt time.Time
|
|
payload []byte
|
|
}
|
|
|
|
type queue struct {
|
|
mu sync.Mutex
|
|
items []queueItem
|
|
head int
|
|
}
|
|
|
|
var (
|
|
enabled atomic.Bool
|
|
retentionSeconds atomic.Int64
|
|
global queue
|
|
)
|
|
|
|
func init() {
|
|
retentionSeconds.Store(defaultRetentionSeconds)
|
|
}
|
|
|
|
func SetEnabled(value bool) {
|
|
enabled.Store(value)
|
|
if !value {
|
|
global.clear()
|
|
}
|
|
}
|
|
|
|
func Enabled() bool {
|
|
return enabled.Load()
|
|
}
|
|
|
|
func SetRetentionSeconds(value int) {
|
|
normalized := int64(value)
|
|
if normalized <= 0 {
|
|
normalized = defaultRetentionSeconds
|
|
} else if normalized > maxRetentionSeconds {
|
|
normalized = maxRetentionSeconds
|
|
}
|
|
retentionSeconds.Store(normalized)
|
|
}
|
|
|
|
func Enqueue(payload []byte) {
|
|
if !Enabled() {
|
|
return
|
|
}
|
|
if len(payload) == 0 {
|
|
return
|
|
}
|
|
global.enqueue(payload)
|
|
}
|
|
|
|
func PopOldest(count int) [][]byte {
|
|
if !Enabled() {
|
|
return nil
|
|
}
|
|
if count <= 0 {
|
|
return nil
|
|
}
|
|
return global.popOldest(count)
|
|
}
|
|
|
|
func (q *queue) clear() {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
q.items = nil
|
|
q.head = 0
|
|
}
|
|
|
|
func (q *queue) enqueue(payload []byte) {
|
|
now := time.Now()
|
|
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
q.pruneLocked(now)
|
|
q.items = append(q.items, queueItem{
|
|
enqueuedAt: now,
|
|
payload: append([]byte(nil), payload...),
|
|
})
|
|
q.maybeCompactLocked()
|
|
}
|
|
|
|
func (q *queue) popOldest(count int) [][]byte {
|
|
now := time.Now()
|
|
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
q.pruneLocked(now)
|
|
available := len(q.items) - q.head
|
|
if available <= 0 {
|
|
q.items = nil
|
|
q.head = 0
|
|
return nil
|
|
}
|
|
if count > available {
|
|
count = available
|
|
}
|
|
|
|
out := make([][]byte, 0, count)
|
|
for i := 0; i < count; i++ {
|
|
item := q.items[q.head+i]
|
|
out = append(out, item.payload)
|
|
}
|
|
q.head += count
|
|
q.maybeCompactLocked()
|
|
return out
|
|
}
|
|
|
|
func (q *queue) pruneLocked(now time.Time) {
|
|
if q.head >= len(q.items) {
|
|
q.items = nil
|
|
q.head = 0
|
|
return
|
|
}
|
|
|
|
windowSeconds := retentionSeconds.Load()
|
|
if windowSeconds <= 0 {
|
|
windowSeconds = defaultRetentionSeconds
|
|
}
|
|
cutoff := now.Add(-time.Duration(windowSeconds) * time.Second)
|
|
for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) {
|
|
q.head++
|
|
}
|
|
}
|
|
|
|
func (q *queue) maybeCompactLocked() {
|
|
if q.head == 0 {
|
|
return
|
|
}
|
|
if q.head >= len(q.items) {
|
|
q.items = nil
|
|
q.head = 0
|
|
return
|
|
}
|
|
if q.head < 1024 && q.head*2 < len(q.items) {
|
|
return
|
|
}
|
|
q.items = append([]queueItem(nil), q.items[q.head:]...)
|
|
q.head = 0
|
|
}
|