This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/dup-topn-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit aabee8d2bf7b004477a0aef3e11210ee96e4b590 Author: Hongtao Gao <[email protected]> AuthorDate: Thu Mar 19 13:32:21 2026 +0000 fix(query): eliminate duplicate TopN query in distributed measure queries Enable push-down aggregation for Agg+TopN by changing pushDownAgg to include the TopN case. This removes the RewriteAggTopNResult double-query pattern where data nodes ran two queries and returned raw data instead of partial aggregated results. --- CHANGES.md | 1 + banyand/query/processor.go | 200 --------------------- pkg/query/logical/measure/measure_analyzer.go | 4 +- .../logical/measure/measure_plan_distributed.go | 7 - 4 files changed, 3 insertions(+), 209 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8c2ec60c3..625ba8559 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -48,6 +48,7 @@ Release Notes. - Fix memory part reference leak in mustAddMemPart when tsTable loop closes. - Fix memory part leak in syncPartContext Close and prevent double-release in FinishSync. - Fix segment reference leaks in measure/stream/trace queries and ensure chunked sync sessions close part contexts correctly. +- Fix duplicate query execution in distributed measure Agg+TopN queries by enabling push-down aggregation, removing the wasteful double-query pattern. ### Document diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 4857eee4e..f93582980 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -224,17 +224,6 @@ func executeMeasurePlan( return mIterator, plan, nil } -// extractTagValuesFromInternalDataPoints extracts tag values from InternalDataPoints for RewriteAggTopNResult. -func extractTagValuesFromInternalDataPoints(dataPoints []*measurev1.InternalDataPoint, groupByTags []string) map[string][]*modelv1.TagValue { - tagValueMap := make(map[string][]*modelv1.TagValue) - for _, idp := range dataPoints { - if idp.DataPoint != nil { - extractTagValuesFromDataPoint(idp.DataPoint, groupByTags, tagValueMap) - } - } - return tagValueMap -} - // collectInternalDataPoints collects InternalDataPoints from the iterator. func collectInternalDataPoints(mIterator executor.MIterator) []*measurev1.InternalDataPoint { result := make([]*measurev1.InternalDataPoint, 0) @@ -247,50 +236,6 @@ func collectInternalDataPoints(mIterator executor.MIterator) []*measurev1.Intern return result } -// extractTagValuesFromDataPoints extracts tag values from DataPoints for RewriteAggTopNResult. -func extractTagValuesFromDataPoints(dataPoints []*measurev1.DataPoint, groupByTags []string) map[string][]*modelv1.TagValue { - tagValueMap := make(map[string][]*modelv1.TagValue) - for _, dp := range dataPoints { - extractTagValuesFromDataPoint(dp, groupByTags, tagValueMap) - } - return tagValueMap -} - -// extractTagValuesFromDataPoint extracts tag values from a single DataPoint and appends to tagValueMap. -func extractTagValuesFromDataPoint(dp *measurev1.DataPoint, groupByTags []string, tagValueMap map[string][]*modelv1.TagValue) { - for _, tagFamily := range dp.GetTagFamilies() { - for _, tag := range tagFamily.GetTags() { - tagName := tag.GetKey() - if len(groupByTags) == 0 || slices.Contains(groupByTags, tagName) { - tagValueMap[tagName] = append(tagValueMap[tagName], tag.GetValue()) - } - } - } -} - -// getGroupByTags extracts group by tag names from query criteria. -func getGroupByTags(queryCriteria *measurev1.QueryRequest) []string { - groupByTags := make([]string, 0) - if queryCriteria.GetGroupBy() != nil { - for _, tagFamily := range queryCriteria.GetGroupBy().GetTagProjection().GetTagFamilies() { - groupByTags = append(groupByTags, tagFamily.GetTags()...) - } - } - return groupByTags -} - -// buildRewriteQueryCriteria builds the rewrite query criteria for RewriteAggTopNResult. -func buildRewriteQueryCriteria(queryCriteria *measurev1.QueryRequest, rewrittenCriteria *modelv1.Criteria) *measurev1.QueryRequest { - return &measurev1.QueryRequest{ - Groups: queryCriteria.Groups, - Name: queryCriteria.Name, - TimeRange: queryCriteria.TimeRange, - Criteria: rewrittenCriteria, - TagProjection: queryCriteria.TagProjection, - FieldProjection: queryCriteria.FieldProjection, - } -} - func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { queryCriteria, ok := message.Data().(*measurev1.QueryRequest) n := time.Now() @@ -299,34 +244,7 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (r resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type")) return } - if queryCriteria.RewriteAggTopNResult { - queryCriteria.Top.Number *= 2 - } resp = p.executeQuery(ctx, queryCriteria) - - if queryCriteria.RewriteAggTopNResult { - result, handleErr := handleResponse(resp) - if handleErr != nil { - return - } - if len(result) == 0 { - return - } - groupByTags := getGroupByTags(queryCriteria) - tagValueMap := extractTagValuesFromDataPoints(result, groupByTags) - rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap) - if rewriteErr != nil { - p.log.Error().Err(rewriteErr).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria") - return - } - rewriteQueryCriteria := buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria) - resp = p.executeQuery(ctx, rewriteQueryCriteria) - dataPoints, handleErr := handleResponse(resp) - if handleErr != nil { - return - } - resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{DataPoints: dataPoints}) - } return } @@ -417,18 +335,6 @@ func (p *measureQueryProcessor) executeQuery(ctx context.Context, queryCriteria return } -func handleResponse(resp bus.Message) ([]*measurev1.DataPoint, *common.Error) { - data := resp.Data() - switch d := data.(type) { - case *common.Error: - return nil, d - case *measurev1.QueryResponse: - return d.DataPoints, nil - default: - return nil, common.NewError("unexpected response data type: %T", d) - } -} - type measureInternalQueryProcessor struct { measureService measure.Service *queryService @@ -449,10 +355,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message bus.Mes resp = bus.NewMessage(bus.MessageID(now), common.NewError("query request is nil")) return } - // Handle RewriteAggTopNResult: double the top number for initial query - if queryCriteria.RewriteAggTopNResult { - queryCriteria.Top.Number *= 2 - } defer func() { if recoverErr := recover(); recoverErr != nil { p.log.Error().Interface("err", recoverErr).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic") @@ -505,29 +407,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message bus.Mes result := collectInternalDataPoints(mIterator) - // Handle RewriteAggTopNResult: rewrite query to get original data with Timestamp - if queryCriteria.RewriteAggTopNResult && len(result) > 0 { - groupByTags := getGroupByTags(queryCriteria) - tagValueMap := extractTagValuesFromInternalDataPoints(result, groupByTags) - rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap) - if rewriteErr != nil { - mctx.ml.Error().Err(rewriteErr).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria") - } else { - rewriteQueryCriteria := buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria) - rewriteIterator, _, rewriteExecErr := executeMeasurePlan(ctx, rewriteQueryCriteria, mctx, false) - if rewriteExecErr != nil { - mctx.ml.Error().Err(rewriteExecErr).RawJSON("req", logger.Proto(rewriteQueryCriteria)).Msg("fail to execute the rewrite query plan") - } else { - defer func() { - if closeErr := rewriteIterator.Close(); closeErr != nil { - mctx.ml.Error().Err(closeErr).Msg("fail to close the rewrite query plan") - } - }() - result = collectInternalDataPoints(rewriteIterator) - } - } - } - qr := &measurev1.InternalQueryResponse{DataPoints: result} if e := mctx.ml.Debug(); e.Enabled() { e.RawJSON("ret", logger.Proto(qr)).Msg("got an internal measure response") @@ -542,85 +421,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message bus.Mes return } -func rewriteCriteria(tagValueMap map[string][]*modelv1.TagValue) (*modelv1.Criteria, error) { - var tagConditions []*modelv1.Condition - for tagName, tagValues := range tagValueMap { - if len(tagValues) == 0 { - continue - } - switch tagValues[0].GetValue().(type) { - case *modelv1.TagValue_Str: - valueSet := make(map[string]bool) - for _, value := range tagValues { - if strVal, ok := value.GetValue().(*modelv1.TagValue_Str); ok { - valueSet[strVal.Str.GetValue()] = true - } - } - values := make([]string, 0, len(valueSet)) - for value := range valueSet { - values = append(values, value) - } - condition := &modelv1.Condition{ - Name: tagName, - Op: modelv1.Condition_BINARY_OP_IN, - Value: &modelv1.TagValue{ - Value: &modelv1.TagValue_StrArray{ - StrArray: &modelv1.StrArray{ - Value: values, - }, - }, - }, - } - tagConditions = append(tagConditions, condition) - case *modelv1.TagValue_Int: - valueSet := make(map[int64]bool) - for _, value := range tagValues { - if intVal, ok := value.GetValue().(*modelv1.TagValue_Int); ok { - valueSet[intVal.Int.GetValue()] = true - } - } - values := make([]int64, 0, len(valueSet)) - for value := range valueSet { - values = append(values, value) - } - condition := &modelv1.Condition{ - Name: tagName, - Op: modelv1.Condition_BINARY_OP_IN, - Value: &modelv1.TagValue{ - Value: &modelv1.TagValue_IntArray{ - IntArray: &modelv1.IntArray{ - Value: values, - }, - }, - }, - } - tagConditions = append(tagConditions, condition) - default: - return nil, fmt.Errorf("unsupported tag value type: %T", tagValues[0].GetValue()) - } - } - return buildCriteriaTree(tagConditions), nil -} - -func buildCriteriaTree(conditions []*modelv1.Condition) *modelv1.Criteria { - if len(conditions) == 0 { - return nil - } - return &modelv1.Criteria{ - Exp: &modelv1.Criteria_Le{ - Le: &modelv1.LogicalExpression{ - Op: modelv1.LogicalExpression_LOGICAL_OP_AND, - Left: &modelv1.Criteria{ - Exp: &modelv1.Criteria_Condition{ - Condition: conditions[0], - }, - }, - Right: buildCriteriaTree(conditions[1:]), - }, - }, - } -} - type traceQueryProcessor struct { traceService trace.Service *queryService diff --git a/pkg/query/logical/measure/measure_analyzer.go b/pkg/query/logical/measure/measure_analyzer.go index d096b14ac..06c1f4a8d 100644 --- a/pkg/query/logical/measure/measure_analyzer.go +++ b/pkg/query/logical/measure/measure_analyzer.go @@ -160,7 +160,7 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, ss []logical.Schema) ( } } - pushDownAgg := criteria.GetAgg() != nil && criteria.GetTop() == nil + pushDownAgg := criteria.GetAgg() != nil plan := newUnresolvedDistributed(criteria, pushDownAgg) // parse limit and offset @@ -181,7 +181,7 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, ss []logical.Schema) ( criteria.GetAgg().GetFunction(), criteria.GetGroupBy() != nil, false, // emitPartial: liaison does not emit partial - pushDownAgg, // reduceMode: only reduce partials when push-down is active (no TopN) + pushDownAgg, // reduceMode: reduce partials from data nodes when push-down is active ) pushedLimit = math.MaxInt } diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index d229d99e6..01d46ee1a 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -154,13 +154,6 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) temp.GroupBy = ud.originalQuery.GroupBy temp.Agg = ud.originalQuery.Agg } - // push down groupBy, agg and top to data node and rewrite agg result to raw data - if ud.originalQuery.Agg != nil && ud.originalQuery.Top != nil { - temp.RewriteAggTopNResult = true - temp.Agg = ud.originalQuery.Agg - temp.Top = ud.originalQuery.Top - temp.GroupBy = ud.originalQuery.GroupBy - } // Prepare groupBy tags refs if needed for deduplication var groupByTagsRefs [][]*logical.TagRef if ud.pushDownAgg && ud.originalQuery.GetGroupBy() != nil {
