From c4ee063b958a6a2bed2afae8698256f2c1bdf977 Mon Sep 17 00:00:00 2001 From: hkfires <10558748+hkfires@users.noreply.github.com> Date: Fri, 29 May 2026 08:12:52 +0800 Subject: [PATCH] feat(logging): add HomeAppLogForwarder for application log forwarding --- internal/home/client.go | 12 ++ internal/logging/home_app_log_forwarder.go | 167 ++++++++++++++++++ .../logging/home_app_log_forwarder_test.go | 159 +++++++++++++++++ sdk/cliproxy/service.go | 15 +- 4 files changed, 351 insertions(+), 2 deletions(-) create mode 100644 internal/logging/home_app_log_forwarder.go create mode 100644 internal/logging/home_app_log_forwarder_test.go diff --git a/internal/home/client.go b/internal/home/client.go index 0357529e6..fd7f98a25 100644 --- a/internal/home/client.go +++ b/internal/home/client.go @@ -28,6 +28,7 @@ const ( redisKeyModels = "models" redisKeyUsage = "usage" redisKeyRequestLog = "request-log" + redisKeyAppLog = "app-log" homeReconnectInterval = time.Second homeReconnectFailoverThreshold = 3 @@ -650,6 +651,17 @@ func (c *Client) RPushRequestLog(ctx context.Context, payload []byte) error { return cmd.RPush(ctx, redisKeyRequestLog, payload).Err() } +func (c *Client) RPushAppLog(ctx context.Context, payload []byte) error { + cmd, errClient := c.commandClient() + if errClient != nil { + return errClient + } + if len(payload) == 0 { + return nil + } + return cmd.RPush(ctx, redisKeyAppLog, payload).Err() +} + func (c *Client) handleSubscriptionPayload(channel string, payload string, onConfig func([]byte) error) error { payload = strings.TrimSpace(payload) if payload == "" { diff --git a/internal/logging/home_app_log_forwarder.go b/internal/logging/home_app_log_forwarder.go new file mode 100644 index 000000000..e74e47a1c --- /dev/null +++ b/internal/logging/home_app_log_forwarder.go @@ -0,0 +1,167 @@ +package logging + +import ( + "context" + "encoding/json" + "errors" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/router-for-me/CLIProxyAPI/v7/internal/home" + log "github.com/sirupsen/logrus" +) + +const defaultHomeAppLogQueueSize = 1024 + +type homeAppLogClient interface { + HeartbeatOK() bool + RPushAppLog(ctx context.Context, payload []byte) error +} + +type homeAppLogPayload struct { + Line string `json:"line"` + Level string `json:"level,omitempty"` + Timestamp string `json:"timestamp,omitempty"` +} + +var currentHomeAppLogClient = func() homeAppLogClient { + return home.Current() +} + +// HomeAppLogForwarder forwards application logs to Home after the control connection is healthy. +type HomeAppLogForwarder struct { + formatter log.Formatter + queue chan homeAppLogPayload + stop chan struct{} + stopOnce sync.Once + wg sync.WaitGroup + enabled atomic.Bool +} + +// StartHomeAppLogForwarder installs a logrus hook that forwards future application logs to Home. +func StartHomeAppLogForwarder(queueSize int) *HomeAppLogForwarder { + if queueSize <= 0 { + queueSize = defaultHomeAppLogQueueSize + } + forwarder := &HomeAppLogForwarder{ + formatter: &LogFormatter{}, + queue: make(chan homeAppLogPayload, queueSize), + stop: make(chan struct{}), + } + forwarder.enabled.Store(true) + forwarder.wg.Add(1) + go forwarder.run() + log.AddHook(forwarder) + return forwarder +} + +// Stop disables forwarding and waits for the background sender to exit. +func (f *HomeAppLogForwarder) Stop() { + if f == nil { + return + } + f.stopOnce.Do(func() { + f.enabled.Store(false) + close(f.stop) + f.wg.Wait() + }) +} + +// Levels implements logrus.Hook. +func (f *HomeAppLogForwarder) Levels() []log.Level { + return log.AllLevels +} + +// Fire implements logrus.Hook. +func (f *HomeAppLogForwarder) Fire(entry *log.Entry) error { + if f == nil || entry == nil || !f.enabled.Load() { + return nil + } + client := currentHomeAppLogClient() + if client == nil || !client.HeartbeatOK() { + return nil + } + line, errFormat := f.formatEntry(entry) + if errFormat != nil || strings.TrimSpace(line) == "" { + return nil + } + + payload := homeAppLogPayload{ + Line: line, + Level: entry.Level.String(), + Timestamp: entry.Time.Format(time.RFC3339Nano), + } + select { + case f.queue <- payload: + default: + } + return nil +} + +func (f *HomeAppLogForwarder) formatEntry(entry *log.Entry) (string, error) { + formatter := f.formatter + if formatter == nil { + formatter = &LogFormatter{} + } + raw, errFormat := formatter.Format(entry) + if errFormat != nil { + return "", errFormat + } + return string(raw), nil +} + +func (f *HomeAppLogForwarder) run() { + defer f.wg.Done() + for { + select { + case <-f.stop: + return + case payload := <-f.queue: + f.forward(payload) + } + } +} + +func (f *HomeAppLogForwarder) forward(payload homeAppLogPayload) { + if !f.enabled.Load() { + return + } + client := currentHomeAppLogClient() + if client == nil || !client.HeartbeatOK() { + return + } + raw, errMarshal := json.Marshal(&payload) + if errMarshal != nil { + return + } + if errPush := client.RPushAppLog(context.Background(), raw); errPush != nil && isHomeAppLogUnsupported(errPush) { + f.enabled.Store(false) + } +} + +func isHomeAppLogUnsupported(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(strings.TrimSpace(err.Error())) + if msg == "" { + return false + } + for { + switch { + case strings.Contains(msg, "unsupported key"): + return true + case strings.Contains(msg, "unknown command"): + return true + case strings.Contains(msg, "unsupported command"): + return true + } + err = errors.Unwrap(err) + if err == nil { + return false + } + msg = strings.ToLower(strings.TrimSpace(err.Error())) + } +} diff --git a/internal/logging/home_app_log_forwarder_test.go b/internal/logging/home_app_log_forwarder_test.go new file mode 100644 index 000000000..59476d1c0 --- /dev/null +++ b/internal/logging/home_app_log_forwarder_test.go @@ -0,0 +1,159 @@ +package logging + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "strings" + "sync" + "testing" + "time" + + log "github.com/sirupsen/logrus" +) + +type stubHomeAppLogClient struct { + mu sync.Mutex + heartbeatOK bool + err error + pushed [][]byte +} + +func (c *stubHomeAppLogClient) HeartbeatOK() bool { return c.heartbeatOK } + +func (c *stubHomeAppLogClient) RPushAppLog(_ context.Context, payload []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.err != nil { + return c.err + } + c.pushed = append(c.pushed, bytes.Clone(payload)) + return nil +} + +func (c *stubHomeAppLogClient) pushedCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.pushed) +} + +func (c *stubHomeAppLogClient) pushedAt(index int) []byte { + c.mu.Lock() + defer c.mu.Unlock() + if index < 0 || index >= len(c.pushed) { + return nil + } + return bytes.Clone(c.pushed[index]) +} + +func TestHomeAppLogForwarder_ForwardsFormattedLogWhenHomeHealthy(t *testing.T) { + original := currentHomeAppLogClient + defer func() { + currentHomeAppLogClient = original + }() + + stub := &stubHomeAppLogClient{heartbeatOK: true} + currentHomeAppLogClient = func() homeAppLogClient { + return stub + } + + forwarder := &HomeAppLogForwarder{ + formatter: &LogFormatter{}, + queue: make(chan homeAppLogPayload, 4), + stop: make(chan struct{}), + } + forwarder.enabled.Store(true) + forwarder.wg.Add(1) + go forwarder.run() + defer forwarder.Stop() + + entry := log.NewEntry(log.StandardLogger()) + entry.Time = time.Date(2026, 5, 29, 8, 0, 0, 0, time.Local) + entry.Level = log.DebugLevel + entry.Message = "debug details" + + if errFire := forwarder.Fire(entry); errFire != nil { + t.Fatalf("Fire error: %v", errFire) + } + + deadline := time.Now().Add(time.Second) + for stub.pushedCount() == 0 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + if stub.pushedCount() != 1 { + t.Fatalf("pushed records = %d, want 1", stub.pushedCount()) + } + + var got homeAppLogPayload + if errUnmarshal := json.Unmarshal(stub.pushedAt(0), &got); errUnmarshal != nil { + t.Fatalf("unmarshal payload: %v", errUnmarshal) + } + if got.Level != "debug" { + t.Fatalf("level = %q, want debug", got.Level) + } + if !strings.Contains(got.Line, "debug details") { + t.Fatalf("line %q missing log message", got.Line) + } + if strings.TrimSpace(got.Timestamp) == "" { + t.Fatal("timestamp empty, want non-empty") + } +} + +func TestHomeAppLogForwarder_SkipsWhenHomeHeartbeatIsDown(t *testing.T) { + original := currentHomeAppLogClient + defer func() { + currentHomeAppLogClient = original + }() + + stub := &stubHomeAppLogClient{heartbeatOK: false} + currentHomeAppLogClient = func() homeAppLogClient { + return stub + } + + forwarder := &HomeAppLogForwarder{ + formatter: &LogFormatter{}, + queue: make(chan homeAppLogPayload, 4), + stop: make(chan struct{}), + } + forwarder.enabled.Store(true) + + entry := log.NewEntry(log.StandardLogger()) + entry.Time = time.Now() + entry.Level = log.InfoLevel + entry.Message = "should stay local" + + if errFire := forwarder.Fire(entry); errFire != nil { + t.Fatalf("Fire error: %v", errFire) + } + if stub.pushedCount() != 0 { + t.Fatalf("pushed records = %d, want 0", stub.pushedCount()) + } +} + +func TestHomeAppLogForwarder_DisablesForwardingWhenHomeDoesNotSupportAppLog(t *testing.T) { + original := currentHomeAppLogClient + defer func() { + currentHomeAppLogClient = original + }() + + stub := &stubHomeAppLogClient{ + heartbeatOK: true, + err: errors.New("ERR unsupported key"), + } + currentHomeAppLogClient = func() homeAppLogClient { + return stub + } + + forwarder := &HomeAppLogForwarder{ + formatter: &LogFormatter{}, + queue: make(chan homeAppLogPayload, 4), + stop: make(chan struct{}), + } + forwarder.enabled.Store(true) + + forwarder.forward(homeAppLogPayload{Line: "legacy home cannot receive app logs"}) + if forwarder.enabled.Load() { + t.Fatal("forwarder still enabled, want disabled after unsupported app-log response") + } +} diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index cd16ebcef..10c3d0dd9 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -14,6 +14,7 @@ import ( "github.com/router-for-me/CLIProxyAPI/v7/internal/api" "github.com/router-for-me/CLIProxyAPI/v7/internal/home" + "github.com/router-for-me/CLIProxyAPI/v7/internal/logging" "github.com/router-for-me/CLIProxyAPI/v7/internal/redisqueue" "github.com/router-for-me/CLIProxyAPI/v7/internal/registry" "github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor" @@ -96,8 +97,9 @@ type Service struct { // wsGateway manages websocket Gemini providers. wsGateway *wsrelay.Manager - homeClient *home.Client - homeCancel context.CancelFunc + homeClient *home.Client + homeCancel context.CancelFunc + homeLogForwarder *logging.HomeAppLogForwarder } // RegisterUsagePlugin registers a usage plugin on the global usage manager. @@ -717,6 +719,10 @@ func (s *Service) startHomeSubscriber(ctx context.Context) { s.homeClient.Close() s.homeClient = nil } + if s.homeLogForwarder != nil { + s.homeLogForwarder.Stop() + s.homeLogForwarder = nil + } homeCtx := ctx if homeCtx == nil { @@ -739,6 +745,7 @@ func (s *Service) startHomeSubscriber(ctx context.Context) { return nil }) s.startHomeUsageForwarder(homeCtx, client) + s.homeLogForwarder = logging.StartHomeAppLogForwarder(0) } // Run starts the service and blocks until the context is cancelled or the server stops. @@ -971,6 +978,10 @@ func (s *Service) Shutdown(ctx context.Context) error { s.homeClient.Close() s.homeClient = nil } + if s.homeLogForwarder != nil { + s.homeLogForwarder.Stop() + s.homeLogForwarder = nil + } home.ClearCurrent() // legacy refresh loop removed; only stopping core auth manager below