This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/dup-topn-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit aabee8d2bf7b004477a0aef3e11210ee96e4b590
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu Mar 19 13:32:21 2026 +0000

    fix(query): eliminate duplicate TopN query in distributed measure queries
    
    Enable push-down aggregation for Agg+TopN by changing pushDownAgg to include
    the TopN case. This removes the RewriteAggTopNResult double-query pattern 
where
    data nodes ran two queries and returned raw data instead of partial 
aggregated
    results.
---
 CHANGES.md                                         |   1 +
 banyand/query/processor.go                         | 200 ---------------------
 pkg/query/logical/measure/measure_analyzer.go      |   4 +-
 .../logical/measure/measure_plan_distributed.go    |   7 -
 4 files changed, 3 insertions(+), 209 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8c2ec60c3..625ba8559 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -48,6 +48,7 @@ Release Notes.
 - Fix memory part reference leak in mustAddMemPart when tsTable loop closes.
 - Fix memory part leak in syncPartContext Close and prevent double-release in 
FinishSync.
 - Fix segment reference leaks in measure/stream/trace queries and ensure 
chunked sync sessions close part contexts correctly.
+- Fix duplicate query execution in distributed measure Agg+TopN queries by 
enabling push-down aggregation, removing the wasteful double-query pattern.
 
 ### Document
 
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 4857eee4e..f93582980 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -224,17 +224,6 @@ func executeMeasurePlan(
        return mIterator, plan, nil
 }
 
-// extractTagValuesFromInternalDataPoints extracts tag values from 
InternalDataPoints for RewriteAggTopNResult.
-func extractTagValuesFromInternalDataPoints(dataPoints 
[]*measurev1.InternalDataPoint, groupByTags []string) 
map[string][]*modelv1.TagValue {
-       tagValueMap := make(map[string][]*modelv1.TagValue)
-       for _, idp := range dataPoints {
-               if idp.DataPoint != nil {
-                       extractTagValuesFromDataPoint(idp.DataPoint, 
groupByTags, tagValueMap)
-               }
-       }
-       return tagValueMap
-}
-
 // collectInternalDataPoints collects InternalDataPoints from the iterator.
 func collectInternalDataPoints(mIterator executor.MIterator) 
[]*measurev1.InternalDataPoint {
        result := make([]*measurev1.InternalDataPoint, 0)
@@ -247,50 +236,6 @@ func collectInternalDataPoints(mIterator 
executor.MIterator) []*measurev1.Intern
        return result
 }
 
-// extractTagValuesFromDataPoints extracts tag values from DataPoints for 
RewriteAggTopNResult.
-func extractTagValuesFromDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTags []string) map[string][]*modelv1.TagValue {
-       tagValueMap := make(map[string][]*modelv1.TagValue)
-       for _, dp := range dataPoints {
-               extractTagValuesFromDataPoint(dp, groupByTags, tagValueMap)
-       }
-       return tagValueMap
-}
-
-// extractTagValuesFromDataPoint extracts tag values from a single DataPoint 
and appends to tagValueMap.
-func extractTagValuesFromDataPoint(dp *measurev1.DataPoint, groupByTags 
[]string, tagValueMap map[string][]*modelv1.TagValue) {
-       for _, tagFamily := range dp.GetTagFamilies() {
-               for _, tag := range tagFamily.GetTags() {
-                       tagName := tag.GetKey()
-                       if len(groupByTags) == 0 || 
slices.Contains(groupByTags, tagName) {
-                               tagValueMap[tagName] = 
append(tagValueMap[tagName], tag.GetValue())
-                       }
-               }
-       }
-}
-
-// getGroupByTags extracts group by tag names from query criteria.
-func getGroupByTags(queryCriteria *measurev1.QueryRequest) []string {
-       groupByTags := make([]string, 0)
-       if queryCriteria.GetGroupBy() != nil {
-               for _, tagFamily := range 
queryCriteria.GetGroupBy().GetTagProjection().GetTagFamilies() {
-                       groupByTags = append(groupByTags, 
tagFamily.GetTags()...)
-               }
-       }
-       return groupByTags
-}
-
-// buildRewriteQueryCriteria builds the rewrite query criteria for 
RewriteAggTopNResult.
-func buildRewriteQueryCriteria(queryCriteria *measurev1.QueryRequest, 
rewrittenCriteria *modelv1.Criteria) *measurev1.QueryRequest {
-       return &measurev1.QueryRequest{
-               Groups:          queryCriteria.Groups,
-               Name:            queryCriteria.Name,
-               TimeRange:       queryCriteria.TimeRange,
-               Criteria:        rewrittenCriteria,
-               TagProjection:   queryCriteria.TagProjection,
-               FieldProjection: queryCriteria.FieldProjection,
-       }
-}
-
 func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) 
(resp bus.Message) {
        queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
        n := time.Now()
@@ -299,34 +244,7 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
                return
        }
-       if queryCriteria.RewriteAggTopNResult {
-               queryCriteria.Top.Number *= 2
-       }
        resp = p.executeQuery(ctx, queryCriteria)
-
-       if queryCriteria.RewriteAggTopNResult {
-               result, handleErr := handleResponse(resp)
-               if handleErr != nil {
-                       return
-               }
-               if len(result) == 0 {
-                       return
-               }
-               groupByTags := getGroupByTags(queryCriteria)
-               tagValueMap := extractTagValuesFromDataPoints(result, 
groupByTags)
-               rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
-               if rewriteErr != nil {
-                       p.log.Error().Err(rewriteErr).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
-                       return
-               }
-               rewriteQueryCriteria := 
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
-               resp = p.executeQuery(ctx, rewriteQueryCriteria)
-               dataPoints, handleErr := handleResponse(resp)
-               if handleErr != nil {
-                       return
-               }
-               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{DataPoints: dataPoints})
-       }
        return
 }
 
@@ -417,18 +335,6 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
        return
 }
 
-func handleResponse(resp bus.Message) ([]*measurev1.DataPoint, *common.Error) {
-       data := resp.Data()
-       switch d := data.(type) {
-       case *common.Error:
-               return nil, d
-       case *measurev1.QueryResponse:
-               return d.DataPoints, nil
-       default:
-               return nil, common.NewError("unexpected response data type: 
%T", d)
-       }
-}
-
 type measureInternalQueryProcessor struct {
        measureService measure.Service
        *queryService
@@ -449,10 +355,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("query request is nil"))
                return
        }
-       // Handle RewriteAggTopNResult: double the top number for initial query
-       if queryCriteria.RewriteAggTopNResult {
-               queryCriteria.Top.Number *= 2
-       }
        defer func() {
                if recoverErr := recover(); recoverErr != nil {
                        p.log.Error().Interface("err", 
recoverErr).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", 
string(debug.Stack())).Msg("panic")
@@ -505,29 +407,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
 
        result := collectInternalDataPoints(mIterator)
 
-       // Handle RewriteAggTopNResult: rewrite query to get original data with 
Timestamp
-       if queryCriteria.RewriteAggTopNResult && len(result) > 0 {
-               groupByTags := getGroupByTags(queryCriteria)
-               tagValueMap := extractTagValuesFromInternalDataPoints(result, 
groupByTags)
-               rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
-               if rewriteErr != nil {
-                       mctx.ml.Error().Err(rewriteErr).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
-               } else {
-                       rewriteQueryCriteria := 
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
-                       rewriteIterator, _, rewriteExecErr := 
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx, false)
-                       if rewriteExecErr != nil {
-                               
mctx.ml.Error().Err(rewriteExecErr).RawJSON("req", 
logger.Proto(rewriteQueryCriteria)).Msg("fail to execute the rewrite query 
plan")
-                       } else {
-                               defer func() {
-                                       if closeErr := rewriteIterator.Close(); 
closeErr != nil {
-                                               
mctx.ml.Error().Err(closeErr).Msg("fail to close the rewrite query plan")
-                                       }
-                               }()
-                               result = 
collectInternalDataPoints(rewriteIterator)
-                       }
-               }
-       }
-
        qr := &measurev1.InternalQueryResponse{DataPoints: result}
        if e := mctx.ml.Debug(); e.Enabled() {
                e.RawJSON("ret", logger.Proto(qr)).Msg("got an internal measure 
response")
@@ -542,85 +421,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
        return
 }
 
-func rewriteCriteria(tagValueMap map[string][]*modelv1.TagValue) 
(*modelv1.Criteria, error) {
-       var tagConditions []*modelv1.Condition
-       for tagName, tagValues := range tagValueMap {
-               if len(tagValues) == 0 {
-                       continue
-               }
-               switch tagValues[0].GetValue().(type) {
-               case *modelv1.TagValue_Str:
-                       valueSet := make(map[string]bool)
-                       for _, value := range tagValues {
-                               if strVal, ok := 
value.GetValue().(*modelv1.TagValue_Str); ok {
-                                       valueSet[strVal.Str.GetValue()] = true
-                               }
-                       }
-                       values := make([]string, 0, len(valueSet))
-                       for value := range valueSet {
-                               values = append(values, value)
-                       }
-                       condition := &modelv1.Condition{
-                               Name: tagName,
-                               Op:   modelv1.Condition_BINARY_OP_IN,
-                               Value: &modelv1.TagValue{
-                                       Value: &modelv1.TagValue_StrArray{
-                                               StrArray: &modelv1.StrArray{
-                                                       Value: values,
-                                               },
-                                       },
-                               },
-                       }
-                       tagConditions = append(tagConditions, condition)
-               case *modelv1.TagValue_Int:
-                       valueSet := make(map[int64]bool)
-                       for _, value := range tagValues {
-                               if intVal, ok := 
value.GetValue().(*modelv1.TagValue_Int); ok {
-                                       valueSet[intVal.Int.GetValue()] = true
-                               }
-                       }
-                       values := make([]int64, 0, len(valueSet))
-                       for value := range valueSet {
-                               values = append(values, value)
-                       }
-                       condition := &modelv1.Condition{
-                               Name: tagName,
-                               Op:   modelv1.Condition_BINARY_OP_IN,
-                               Value: &modelv1.TagValue{
-                                       Value: &modelv1.TagValue_IntArray{
-                                               IntArray: &modelv1.IntArray{
-                                                       Value: values,
-                                               },
-                                       },
-                               },
-                       }
-                       tagConditions = append(tagConditions, condition)
-               default:
-                       return nil, fmt.Errorf("unsupported tag value type: 
%T", tagValues[0].GetValue())
-               }
-       }
-       return buildCriteriaTree(tagConditions), nil
-}
-
-func buildCriteriaTree(conditions []*modelv1.Condition) *modelv1.Criteria {
-       if len(conditions) == 0 {
-               return nil
-       }
-       return &modelv1.Criteria{
-               Exp: &modelv1.Criteria_Le{
-                       Le: &modelv1.LogicalExpression{
-                               Op: modelv1.LogicalExpression_LOGICAL_OP_AND,
-                               Left: &modelv1.Criteria{
-                                       Exp: &modelv1.Criteria_Condition{
-                                               Condition: conditions[0],
-                                       },
-                               },
-                               Right: buildCriteriaTree(conditions[1:]),
-                       },
-               },
-       }
-}
-
 type traceQueryProcessor struct {
        traceService trace.Service
        *queryService
diff --git a/pkg/query/logical/measure/measure_analyzer.go 
b/pkg/query/logical/measure/measure_analyzer.go
index d096b14ac..06c1f4a8d 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -160,7 +160,7 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
                }
        }
 
-       pushDownAgg := criteria.GetAgg() != nil && criteria.GetTop() == nil
+       pushDownAgg := criteria.GetAgg() != nil
        plan := newUnresolvedDistributed(criteria, pushDownAgg)
 
        // parse limit and offset
@@ -181,7 +181,7 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
                        criteria.GetAgg().GetFunction(),
                        criteria.GetGroupBy() != nil,
                        false,       // emitPartial: liaison does not emit 
partial
-                       pushDownAgg, // reduceMode: only reduce partials when 
push-down is active (no TopN)
+                       pushDownAgg, // reduceMode: reduce partials from data 
nodes when push-down is active
                )
                pushedLimit = math.MaxInt
        }
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index d229d99e6..01d46ee1a 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -154,13 +154,6 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                temp.GroupBy = ud.originalQuery.GroupBy
                temp.Agg = ud.originalQuery.Agg
        }
-       // push down groupBy, agg and top to data node and rewrite agg result 
to raw data
-       if ud.originalQuery.Agg != nil && ud.originalQuery.Top != nil {
-               temp.RewriteAggTopNResult = true
-               temp.Agg = ud.originalQuery.Agg
-               temp.Top = ud.originalQuery.Top
-               temp.GroupBy = ud.originalQuery.GroupBy
-       }
        // Prepare groupBy tags refs if needed for deduplication
        var groupByTagsRefs [][]*logical.TagRef
        if ud.pushDownAgg && ud.originalQuery.GetGroupBy() != nil {

Reply via email to