Automated cherry pick of #24544: fix(host): metadata handler proxy telegraf data fix tenant_id (#24546)

* fix(host): metadata handler proxy telegraf data

* fix(host): append unknown dmesg to last entry
This commit is contained in:
wanyaoqi
2026-03-25 10:34:34 +08:00
committed by GitHub
parent 1e5bab79f8
commit 8d5310f9e9
4 changed files with 328 additions and 7 deletions

View File

@@ -29,9 +29,10 @@ const (
type SHostDmesgCollector struct {
host IHostInfo
mu sync.Mutex
buffer []compute.SKmsgEntry
bootTime time.Time
mu sync.Mutex
buffer []compute.SKmsgEntry
pendingEntry *compute.SKmsgEntry
bootTime time.Time
}
func NewHostDmesgCollector(hostInfo IHostInfo) *SHostDmesgCollector {
@@ -104,7 +105,9 @@ func (c *SHostDmesgCollector) Start() {
entry, err := c.parseKmsgLine(line, bootTime)
if err != nil {
log.Debugf("failed parse kmsg line %s: %s", line, err)
if c.pendingEntry != nil {
c.pendingEntry.Message += "\n" + line
}
continue
}
if entry.Seq <= lastSeq {
@@ -116,9 +119,14 @@ func (c *SHostDmesgCollector) Start() {
}
c.mu.Lock()
c.buffer = append(c.buffer, *entry)
if len(c.buffer) >= batchSize {
c.flushBuffer()
if c.pendingEntry == nil {
c.pendingEntry = entry
} else {
c.buffer = append(c.buffer, *c.pendingEntry)
if len(c.buffer) >= batchSize {
c.flushBuffer()
}
c.pendingEntry = entry
}
c.mu.Unlock()
}

View File

@@ -343,6 +343,9 @@ func (s *Service) monitorReverseEndpoint() *proxy.SEndpointFactory {
}
func (s *Service) requestManipulator(ctx context.Context, r *http.Request) (*http.Request, error) {
if err := s.rewriteTelegrafInfluxBodyIfNeeded(ctx, r); err != nil {
log.Errorf("failed rewrite telegraf body %s", err)
}
path := r.URL.Path[len(s.monitorPrefix()):]
log.Debugf("Path: %s => %s", r.URL.Path, path)
r.URL = &url.URL{

View File

@@ -0,0 +1,258 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific
package metadata
import (
"bytes"
"compress/gzip"
"context"
"io"
"net/http"
"strings"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
)
const telegrafInfluxMaxBodyBytes = 16 << 20
func (s *Service) rewriteTelegrafInfluxBodyIfNeeded(ctx context.Context, r *http.Request) error {
prefix := s.monitorPrefix()
if !strings.HasPrefix(r.URL.Path, prefix) {
return nil
}
sub := r.URL.Path[len(prefix):]
if r.Method != http.MethodPost && r.Method != http.MethodPut {
return nil
}
if sub != "/write" && !strings.HasPrefix(sub, "/write?") {
return nil
}
body, err := io.ReadAll(io.LimitReader(r.Body, telegrafInfluxMaxBodyBytes+1))
if err != nil {
return errors.Wrap(err, "read telegraf influx body")
}
if len(body) > telegrafInfluxMaxBodyBytes {
return errors.Errorf("telegraf influx body exceeds %d bytes", telegrafInfluxMaxBodyBytes)
}
_ = r.Body.Close()
if strings.Contains(strings.ToLower(r.Header.Get("Content-Encoding")), "gzip") {
gr, err := gzip.NewReader(bytes.NewReader(body))
if err != nil {
return errors.Wrap(err, "gzip reader")
}
body, err = io.ReadAll(io.LimitReader(gr, telegrafInfluxMaxBodyBytes+1))
_ = gr.Close()
if err != nil {
return errors.Wrap(err, "read gzipped telegraf body")
}
if len(body) > telegrafInfluxMaxBodyBytes {
return errors.Errorf("telegraf influx body exceeds %d bytes after gzip", telegrafInfluxMaxBodyBytes)
}
r.Header.Del("Content-Encoding")
}
newBody, changed, err := rewriteInfluxLineProtocolTenant(body, func(vmId string) (string, bool) {
gd := s.lookupGuestDescForTelegraf(r, vmId)
if gd == nil || gd.TenantId == "" {
return "", false
}
return gd.TenantId, true
})
if err != nil {
return err
}
if changed {
log.Debugf("metadata monitor: corrected tenant_id in telegraf influx payload from %s", r.RemoteAddr)
}
r.Body = io.NopCloser(bytes.NewReader(newBody))
r.ContentLength = int64(len(newBody))
r.Header.Del("Content-Length")
return nil
}
func (s *Service) lookupGuestDescForTelegraf(r *http.Request, vmId string) *desc.SGuestDesc {
if vmId == "" {
return nil
}
gd := s.getGuestDesc(r)
if gd != nil && gd.Uuid == vmId {
return gd
}
return nil
}
func rewriteInfluxLineProtocolTenant(body []byte, resolveTenant func(vmId string) (tenantId string, ok bool)) ([]byte, bool, error) {
raw := strings.Split(string(body), "\n")
changed := false
for i, line := range raw {
line = strings.TrimRight(line, "\r")
if line == "" || strings.HasPrefix(line, "#") {
continue
}
newLine, lineChanged, err := rewriteInfluxLineTenant(line, resolveTenant)
if err != nil {
return body, false, err
}
if lineChanged {
changed = true
raw[i] = newLine
}
}
if !changed {
return body, false, nil
}
return []byte(strings.Join(raw, "\n")), true, nil
}
func rewriteInfluxLineTenant(line string, resolveTenant func(vmId string) (tenantId string, ok bool)) (string, bool, error) {
measTags, fields, ok := splitMeasurementTagsAndFields(line)
if !ok {
return line, false, nil
}
parts := splitOnUnescapedComma(measTags)
if len(parts) < 1 {
return line, false, nil
}
measurement := parts[0]
tagSegs := parts[1:]
vmId := ""
haveTenant := false
curTenant := ""
for _, seg := range tagSegs {
k, v := splitInfluxTagKeyValue(seg)
if k == "" {
continue
}
switch k {
case "vm_id":
vmId = v
case "tenant_id":
haveTenant = true
curTenant = v
}
}
if vmId == "" {
return line, false, nil
}
expectTenant, ok := resolveTenant(vmId)
if !ok {
return line, false, nil
}
if haveTenant && curTenant == expectTenant {
return line, false, nil
}
newSegs := make([]string, 0, len(tagSegs)+1)
for _, seg := range tagSegs {
k, _ := splitInfluxTagKeyValue(seg)
if k == "tenant_id" {
newSegs = append(newSegs, "tenant_id="+expectTenant)
} else {
newSegs = append(newSegs, seg)
}
}
if !haveTenant {
newSegs = append(newSegs, "tenant_id="+expectTenant)
}
var b strings.Builder
b.WriteString(measurement)
for _, seg := range newSegs {
b.WriteByte(',')
b.WriteString(seg)
}
b.WriteByte(' ')
b.WriteString(fields)
return b.String(), true, nil
}
func splitMeasurementTagsAndFields(line string) (measTags string, fields string, ok bool) {
for i := 0; i < len(line); i++ {
if line[i] == ' ' && !influxByteEscaped(line, i) {
return line[:i], line[i+1:], true
}
}
return "", "", false
}
func influxByteEscaped(line string, i int) bool {
if i == 0 {
return false
}
n := 0
for j := i - 1; j >= 0 && line[j] == '\\'; j-- {
n++
}
return n%2 == 1
}
func splitOnUnescapedComma(s string) []string {
var out []string
var b strings.Builder
escaped := false
for i := 0; i < len(s); i++ {
c := s[i]
if escaped {
b.WriteByte(c)
escaped = false
continue
}
if c == '\\' {
escaped = true
b.WriteByte('\\')
continue
}
if c == ',' {
out = append(out, b.String())
b.Reset()
continue
}
b.WriteByte(c)
}
out = append(out, b.String())
return out
}
func splitInfluxTagKeyValue(seg string) (key, val string) {
for i := 0; i < len(seg); i++ {
if seg[i] == '=' && !influxByteEscaped(seg, i) {
return influxUnescapeTagKey(seg[:i]), seg[i+1:]
}
}
return "", ""
}
func influxUnescapeTagKey(s string) string {
return influxUnescapeTag(s)
}
func influxUnescapeTag(s string) string {
var b strings.Builder
b.Grow(len(s))
for i := 0; i < len(s); i++ {
if s[i] == '\\' && i+1 < len(s) {
switch s[i+1] {
case '\\', ' ', ',', '=':
b.WriteByte(s[i+1])
i++
continue
}
}
b.WriteByte(s[i])
}
return b.String()
}

View File

@@ -0,0 +1,52 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package metadata
import (
"testing"
)
func TestRewriteInfluxLineTenant(t *testing.T) {
resolve := func(vmId string) (string, bool) {
if vmId == "vm-1" {
return "tenant-correct", true
}
return "", false
}
line := `agent_cpu,vm_id=vm-1,tenant_id=wrong u=1i`
out, ch, err := rewriteInfluxLineTenant(line, resolve)
if err != nil {
t.Fatal(err)
}
if !ch {
t.Fatal("expected change")
}
want := `agent_cpu,vm_id=vm-1,tenant_id=tenant-correct u=1i`
if out != want {
t.Fatalf("got %q want %q", out, want)
}
}
func TestRewriteInfluxLineTenantAddMissing(t *testing.T) {
resolve := func(vmId string) (string, bool) {
if vmId == "vm-1" {
return "t1", true
}
return "", false
}
line := `agent_mem,vm_id=vm-1 used=1i`
out, ch, err := rewriteInfluxLineTenant(line, resolve)
if err != nil {
t.Fatal(err)
}
if !ch {
t.Fatal("expected change")
}
want := `agent_mem,vm_id=vm-1,tenant_id=t1 used=1i`
if out != want {
t.Fatalf("got %q want %q", out, want)
}
}