From 8d5310f9e9cbd8691f535fed2bd7dd50363abbfb Mon Sep 17 00:00:00 2001 From: wanyaoqi <18528551+wanyaoqi@users.noreply.github.com> Date: Wed, 25 Mar 2026 10:34:34 +0800 Subject: [PATCH] Automated cherry pick of #24544: fix(host): metadata handler proxy telegraf data fix tenant_id (#24546) * fix(host): metadata handler proxy telegraf data * fix(host): append unknown dmesg to last entry --- pkg/hostman/hostmetrics/host_dmesg.go | 22 +- pkg/hostman/metadata/metadatahandler.go | 3 + pkg/hostman/metadata/telegraf_influx.go | 258 +++++++++++++++++++ pkg/hostman/metadata/telegraf_influx_test.go | 52 ++++ 4 files changed, 328 insertions(+), 7 deletions(-) create mode 100644 pkg/hostman/metadata/telegraf_influx.go create mode 100644 pkg/hostman/metadata/telegraf_influx_test.go diff --git a/pkg/hostman/hostmetrics/host_dmesg.go b/pkg/hostman/hostmetrics/host_dmesg.go index 62ca388b72..a6b5747807 100644 --- a/pkg/hostman/hostmetrics/host_dmesg.go +++ b/pkg/hostman/hostmetrics/host_dmesg.go @@ -29,9 +29,10 @@ const ( type SHostDmesgCollector struct { host IHostInfo - mu sync.Mutex - buffer []compute.SKmsgEntry - bootTime time.Time + mu sync.Mutex + buffer []compute.SKmsgEntry + pendingEntry *compute.SKmsgEntry + bootTime time.Time } func NewHostDmesgCollector(hostInfo IHostInfo) *SHostDmesgCollector { @@ -104,7 +105,9 @@ func (c *SHostDmesgCollector) Start() { entry, err := c.parseKmsgLine(line, bootTime) if err != nil { - log.Debugf("failed parse kmsg line %s: %s", line, err) + if c.pendingEntry != nil { + c.pendingEntry.Message += "\n" + line + } continue } if entry.Seq <= lastSeq { @@ -116,9 +119,14 @@ func (c *SHostDmesgCollector) Start() { } c.mu.Lock() - c.buffer = append(c.buffer, *entry) - if len(c.buffer) >= batchSize { - c.flushBuffer() + if c.pendingEntry == nil { + c.pendingEntry = entry + } else { + c.buffer = append(c.buffer, *c.pendingEntry) + if len(c.buffer) >= batchSize { + c.flushBuffer() + } + c.pendingEntry = entry } c.mu.Unlock() } diff --git a/pkg/hostman/metadata/metadatahandler.go b/pkg/hostman/metadata/metadatahandler.go index d49ee80816..667807e1b9 100644 --- a/pkg/hostman/metadata/metadatahandler.go +++ b/pkg/hostman/metadata/metadatahandler.go @@ -343,6 +343,9 @@ func (s *Service) monitorReverseEndpoint() *proxy.SEndpointFactory { } func (s *Service) requestManipulator(ctx context.Context, r *http.Request) (*http.Request, error) { + if err := s.rewriteTelegrafInfluxBodyIfNeeded(ctx, r); err != nil { + log.Errorf("failed rewrite telegraf body %s", err) + } path := r.URL.Path[len(s.monitorPrefix()):] log.Debugf("Path: %s => %s", r.URL.Path, path) r.URL = &url.URL{ diff --git a/pkg/hostman/metadata/telegraf_influx.go b/pkg/hostman/metadata/telegraf_influx.go new file mode 100644 index 0000000000..7088baec95 --- /dev/null +++ b/pkg/hostman/metadata/telegraf_influx.go @@ -0,0 +1,258 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific + +package metadata + +import ( + "bytes" + "compress/gzip" + "context" + "io" + "net/http" + "strings" + + "yunion.io/x/log" + "yunion.io/x/pkg/errors" + + "yunion.io/x/onecloud/pkg/hostman/guestman/desc" +) + +const telegrafInfluxMaxBodyBytes = 16 << 20 + +func (s *Service) rewriteTelegrafInfluxBodyIfNeeded(ctx context.Context, r *http.Request) error { + prefix := s.monitorPrefix() + if !strings.HasPrefix(r.URL.Path, prefix) { + return nil + } + sub := r.URL.Path[len(prefix):] + if r.Method != http.MethodPost && r.Method != http.MethodPut { + return nil + } + if sub != "/write" && !strings.HasPrefix(sub, "/write?") { + return nil + } + + body, err := io.ReadAll(io.LimitReader(r.Body, telegrafInfluxMaxBodyBytes+1)) + if err != nil { + return errors.Wrap(err, "read telegraf influx body") + } + if len(body) > telegrafInfluxMaxBodyBytes { + return errors.Errorf("telegraf influx body exceeds %d bytes", telegrafInfluxMaxBodyBytes) + } + _ = r.Body.Close() + + if strings.Contains(strings.ToLower(r.Header.Get("Content-Encoding")), "gzip") { + gr, err := gzip.NewReader(bytes.NewReader(body)) + if err != nil { + return errors.Wrap(err, "gzip reader") + } + body, err = io.ReadAll(io.LimitReader(gr, telegrafInfluxMaxBodyBytes+1)) + _ = gr.Close() + if err != nil { + return errors.Wrap(err, "read gzipped telegraf body") + } + if len(body) > telegrafInfluxMaxBodyBytes { + return errors.Errorf("telegraf influx body exceeds %d bytes after gzip", telegrafInfluxMaxBodyBytes) + } + r.Header.Del("Content-Encoding") + } + + newBody, changed, err := rewriteInfluxLineProtocolTenant(body, func(vmId string) (string, bool) { + gd := s.lookupGuestDescForTelegraf(r, vmId) + if gd == nil || gd.TenantId == "" { + return "", false + } + return gd.TenantId, true + }) + if err != nil { + return err + } + if changed { + log.Debugf("metadata monitor: corrected tenant_id in telegraf influx payload from %s", r.RemoteAddr) + } + r.Body = io.NopCloser(bytes.NewReader(newBody)) + r.ContentLength = int64(len(newBody)) + r.Header.Del("Content-Length") + return nil +} + +func (s *Service) lookupGuestDescForTelegraf(r *http.Request, vmId string) *desc.SGuestDesc { + if vmId == "" { + return nil + } + gd := s.getGuestDesc(r) + if gd != nil && gd.Uuid == vmId { + return gd + } + return nil +} + +func rewriteInfluxLineProtocolTenant(body []byte, resolveTenant func(vmId string) (tenantId string, ok bool)) ([]byte, bool, error) { + raw := strings.Split(string(body), "\n") + changed := false + for i, line := range raw { + line = strings.TrimRight(line, "\r") + if line == "" || strings.HasPrefix(line, "#") { + continue + } + newLine, lineChanged, err := rewriteInfluxLineTenant(line, resolveTenant) + if err != nil { + return body, false, err + } + if lineChanged { + changed = true + raw[i] = newLine + } + } + if !changed { + return body, false, nil + } + return []byte(strings.Join(raw, "\n")), true, nil +} + +func rewriteInfluxLineTenant(line string, resolveTenant func(vmId string) (tenantId string, ok bool)) (string, bool, error) { + measTags, fields, ok := splitMeasurementTagsAndFields(line) + if !ok { + return line, false, nil + } + parts := splitOnUnescapedComma(measTags) + if len(parts) < 1 { + return line, false, nil + } + measurement := parts[0] + tagSegs := parts[1:] + vmId := "" + haveTenant := false + curTenant := "" + for _, seg := range tagSegs { + k, v := splitInfluxTagKeyValue(seg) + if k == "" { + continue + } + switch k { + case "vm_id": + vmId = v + case "tenant_id": + haveTenant = true + curTenant = v + } + } + if vmId == "" { + return line, false, nil + } + expectTenant, ok := resolveTenant(vmId) + if !ok { + return line, false, nil + } + if haveTenant && curTenant == expectTenant { + return line, false, nil + } + newSegs := make([]string, 0, len(tagSegs)+1) + for _, seg := range tagSegs { + k, _ := splitInfluxTagKeyValue(seg) + if k == "tenant_id" { + newSegs = append(newSegs, "tenant_id="+expectTenant) + } else { + newSegs = append(newSegs, seg) + } + } + if !haveTenant { + newSegs = append(newSegs, "tenant_id="+expectTenant) + } + var b strings.Builder + b.WriteString(measurement) + for _, seg := range newSegs { + b.WriteByte(',') + b.WriteString(seg) + } + b.WriteByte(' ') + b.WriteString(fields) + return b.String(), true, nil +} + +func splitMeasurementTagsAndFields(line string) (measTags string, fields string, ok bool) { + for i := 0; i < len(line); i++ { + if line[i] == ' ' && !influxByteEscaped(line, i) { + return line[:i], line[i+1:], true + } + } + return "", "", false +} + +func influxByteEscaped(line string, i int) bool { + if i == 0 { + return false + } + n := 0 + for j := i - 1; j >= 0 && line[j] == '\\'; j-- { + n++ + } + return n%2 == 1 +} + +func splitOnUnescapedComma(s string) []string { + var out []string + var b strings.Builder + escaped := false + for i := 0; i < len(s); i++ { + c := s[i] + if escaped { + b.WriteByte(c) + escaped = false + continue + } + if c == '\\' { + escaped = true + b.WriteByte('\\') + continue + } + if c == ',' { + out = append(out, b.String()) + b.Reset() + continue + } + b.WriteByte(c) + } + out = append(out, b.String()) + return out +} + +func splitInfluxTagKeyValue(seg string) (key, val string) { + for i := 0; i < len(seg); i++ { + if seg[i] == '=' && !influxByteEscaped(seg, i) { + return influxUnescapeTagKey(seg[:i]), seg[i+1:] + } + } + return "", "" +} + +func influxUnescapeTagKey(s string) string { + return influxUnescapeTag(s) +} + +func influxUnescapeTag(s string) string { + var b strings.Builder + b.Grow(len(s)) + for i := 0; i < len(s); i++ { + if s[i] == '\\' && i+1 < len(s) { + switch s[i+1] { + case '\\', ' ', ',', '=': + b.WriteByte(s[i+1]) + i++ + continue + } + } + b.WriteByte(s[i]) + } + return b.String() +} diff --git a/pkg/hostman/metadata/telegraf_influx_test.go b/pkg/hostman/metadata/telegraf_influx_test.go new file mode 100644 index 0000000000..5aa3c00fbb --- /dev/null +++ b/pkg/hostman/metadata/telegraf_influx_test.go @@ -0,0 +1,52 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +package metadata + +import ( + "testing" +) + +func TestRewriteInfluxLineTenant(t *testing.T) { + resolve := func(vmId string) (string, bool) { + if vmId == "vm-1" { + return "tenant-correct", true + } + return "", false + } + line := `agent_cpu,vm_id=vm-1,tenant_id=wrong u=1i` + out, ch, err := rewriteInfluxLineTenant(line, resolve) + if err != nil { + t.Fatal(err) + } + if !ch { + t.Fatal("expected change") + } + want := `agent_cpu,vm_id=vm-1,tenant_id=tenant-correct u=1i` + if out != want { + t.Fatalf("got %q want %q", out, want) + } +} + +func TestRewriteInfluxLineTenantAddMissing(t *testing.T) { + resolve := func(vmId string) (string, bool) { + if vmId == "vm-1" { + return "t1", true + } + return "", false + } + line := `agent_mem,vm_id=vm-1 used=1i` + out, ch, err := rewriteInfluxLineTenant(line, resolve) + if err != nil { + t.Fatal(err) + } + if !ch { + t.Fatal("expected change") + } + want := `agent_mem,vm_id=vm-1,tenant_id=t1 used=1i` + if out != want { + t.Fatalf("got %q want %q", out, want) + } +}