From f28519bbbf45cefbf7fcaf9a73a0e3bfd696fc3d Mon Sep 17 00:00:00 2001 From: Zexi Li Date: Mon, 7 Apr 2025 18:40:56 +0800 Subject: [PATCH] fix(monitor): check measurement field (#22376) --- pkg/monitor/tsdb/driver/victoriametrics/vm.go | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/pkg/monitor/tsdb/driver/victoriametrics/vm.go b/pkg/monitor/tsdb/driver/victoriametrics/vm.go index ad76e13ee9..7493546239 100644 --- a/pkg/monitor/tsdb/driver/victoriametrics/vm.go +++ b/pkg/monitor/tsdb/driver/victoriametrics/vm.go @@ -55,19 +55,28 @@ func NewVMAdapter(datasource *tsdb.DataSource) (tsdb.TsdbQueryEndpoint, error) { // Query implements tsdb.TsdbQueryEndpoint. func (vm *vmAdapter) Query(ctx context.Context, ds *tsdb.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) { + // TODO: use interval inside query + tsdbResp, _, err := vm.query(ctx, ds, query) + if err != nil { + return nil, err + } + return tsdbResp, nil +} + +func (vm *vmAdapter) query(ctx context.Context, ds *tsdb.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, *Response, error) { rawQuery, influxQs, err := vm.influxdbExecutor.GetRawQuery(ds, query) if err != nil { - return nil, errors.Wrapf(err, "get influxdb raw query: %#v", influxQs) + return nil, nil, errors.Wrapf(err, "get influxdb raw query: %#v", influxQs) } // TODO: use interval inside query return queryByRaw(ctx, ds, rawQuery, query, influxQs[0].Interval) } -func queryByRaw(ctx context.Context, ds *tsdb.DataSource, rawQuery string, query *tsdb.TsdbQuery, interval time.Duration) (*tsdb.Response, error) { +func queryByRaw(ctx context.Context, ds *tsdb.DataSource, rawQuery string, query *tsdb.TsdbQuery, interval time.Duration) (*tsdb.Response, *Response, error) { promQL, tr, err := convertInfluxQL(rawQuery) if err != nil { - return nil, errors.Wrap(err, "convert influxQL to promQL") + return nil, nil, errors.Wrap(err, "convert influxQL to promQL") } start := time.Now() @@ -77,17 +86,17 @@ func queryByRaw(ctx context.Context, ds *tsdb.DataSource, rawQuery string, query resp, err := queryRange(ctx, ds, tr, promQL, interval) if err != nil { - return nil, errors.Wrapf(err, "query VM range by: %s", promQL) + return nil, nil, errors.Wrapf(err, "query VM range by: %s", promQL) } - //log.Infof("get vm resp: %s", jsonutils.Marshal(resp).PrettyString()) + //log.Infof("get vm %s resp: %s", promQL, jsonutils.Marshal(resp).PrettyString()) tsdbRet, err := convertVMResponse(rawQuery, query, resp) if err != nil { - return nil, errors.Wrapf(err, "convert to tsdb.Response") + return nil, resp, errors.Wrapf(err, "convert to tsdb.Response") } - return tsdbRet, nil + return tsdbRet, resp, nil } func queryRange(ctx context.Context, ds *tsdb.DataSource, tr *influxql.TimeRange, promQL string, interval time.Duration) (*Response, error) { @@ -273,7 +282,7 @@ func (vm *vmAdapter) checkMeasurementField(ctx context.Context, ds *tsdb.DataSou q.Where().AddTag(tagFilter) } tq := q.ToTsdbQuery() - resp, err := vm.Query(ctx, ds, tq) + resp, rawResp, err := vm.query(ctx, ds, tq) if err != nil { return false, errors.Wrap(err, "VictoriaMetrics.Query") } @@ -283,6 +292,12 @@ func (vm *vmAdapter) checkMeasurementField(ctx context.Context, ds *tsdb.DataSou return true, nil } } + if rawResp.Stats.SeriesFetched != "" { + fetched, _ := strconv.Atoi(rawResp.Stats.SeriesFetched) + if fetched >= 1 { + return true, nil + } + } return false, nil }