This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 049123b81 fix(query): eliminate duplicate TopN query in distributed
measure queries (#1018)
049123b81 is described below
commit 049123b8107a76d7e8ca7157320805ee1d348e00
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Mar 20 09:26:49 2026 +0800
fix(query): eliminate duplicate TopN query in distributed measure queries
(#1018)
* fix(query): eliminate duplicate TopN query in distributed measure queries
Enable push-down aggregation for Agg+TopN by changing pushDownAgg to include
the TopN case. This removes the RewriteAggTopNResult double-query pattern
where
data nodes ran two queries and returned raw data instead of partial
aggregated
results.
* chore: fix ineffectual ctx assignment in measureInternalQueryProcessor
---
CHANGES.md | 1 +
banyand/query/processor.go | 202 +--------------------
pkg/query/logical/measure/measure_analyzer.go | 4 +-
.../logical/measure/measure_plan_distributed.go | 7 -
4 files changed, 4 insertions(+), 210 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8c2ec60c3..625ba8559 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -48,6 +48,7 @@ Release Notes.
- Fix memory part reference leak in mustAddMemPart when tsTable loop closes.
- Fix memory part leak in syncPartContext Close and prevent double-release in
FinishSync.
- Fix segment reference leaks in measure/stream/trace queries and ensure
chunked sync sessions close part contexts correctly.
+- Fix duplicate query execution in distributed measure Agg+TopN queries by
enabling push-down aggregation, removing the wasteful double-query pattern.
### Document
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 4857eee4e..0ebd31a3e 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -224,17 +224,6 @@ func executeMeasurePlan(
return mIterator, plan, nil
}
-// extractTagValuesFromInternalDataPoints extracts tag values from
InternalDataPoints for RewriteAggTopNResult.
-func extractTagValuesFromInternalDataPoints(dataPoints
[]*measurev1.InternalDataPoint, groupByTags []string)
map[string][]*modelv1.TagValue {
- tagValueMap := make(map[string][]*modelv1.TagValue)
- for _, idp := range dataPoints {
- if idp.DataPoint != nil {
- extractTagValuesFromDataPoint(idp.DataPoint,
groupByTags, tagValueMap)
- }
- }
- return tagValueMap
-}
-
// collectInternalDataPoints collects InternalDataPoints from the iterator.
func collectInternalDataPoints(mIterator executor.MIterator)
[]*measurev1.InternalDataPoint {
result := make([]*measurev1.InternalDataPoint, 0)
@@ -247,50 +236,6 @@ func collectInternalDataPoints(mIterator
executor.MIterator) []*measurev1.Intern
return result
}
-// extractTagValuesFromDataPoints extracts tag values from DataPoints for
RewriteAggTopNResult.
-func extractTagValuesFromDataPoints(dataPoints []*measurev1.DataPoint,
groupByTags []string) map[string][]*modelv1.TagValue {
- tagValueMap := make(map[string][]*modelv1.TagValue)
- for _, dp := range dataPoints {
- extractTagValuesFromDataPoint(dp, groupByTags, tagValueMap)
- }
- return tagValueMap
-}
-
-// extractTagValuesFromDataPoint extracts tag values from a single DataPoint
and appends to tagValueMap.
-func extractTagValuesFromDataPoint(dp *measurev1.DataPoint, groupByTags
[]string, tagValueMap map[string][]*modelv1.TagValue) {
- for _, tagFamily := range dp.GetTagFamilies() {
- for _, tag := range tagFamily.GetTags() {
- tagName := tag.GetKey()
- if len(groupByTags) == 0 ||
slices.Contains(groupByTags, tagName) {
- tagValueMap[tagName] =
append(tagValueMap[tagName], tag.GetValue())
- }
- }
- }
-}
-
-// getGroupByTags extracts group by tag names from query criteria.
-func getGroupByTags(queryCriteria *measurev1.QueryRequest) []string {
- groupByTags := make([]string, 0)
- if queryCriteria.GetGroupBy() != nil {
- for _, tagFamily := range
queryCriteria.GetGroupBy().GetTagProjection().GetTagFamilies() {
- groupByTags = append(groupByTags,
tagFamily.GetTags()...)
- }
- }
- return groupByTags
-}
-
-// buildRewriteQueryCriteria builds the rewrite query criteria for
RewriteAggTopNResult.
-func buildRewriteQueryCriteria(queryCriteria *measurev1.QueryRequest,
rewrittenCriteria *modelv1.Criteria) *measurev1.QueryRequest {
- return &measurev1.QueryRequest{
- Groups: queryCriteria.Groups,
- Name: queryCriteria.Name,
- TimeRange: queryCriteria.TimeRange,
- Criteria: rewrittenCriteria,
- TagProjection: queryCriteria.TagProjection,
- FieldProjection: queryCriteria.FieldProjection,
- }
-}
-
func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message)
(resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
n := time.Now()
@@ -299,34 +244,7 @@ func (p *measureQueryProcessor) Rev(ctx context.Context,
message bus.Message) (r
resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
return
}
- if queryCriteria.RewriteAggTopNResult {
- queryCriteria.Top.Number *= 2
- }
resp = p.executeQuery(ctx, queryCriteria)
-
- if queryCriteria.RewriteAggTopNResult {
- result, handleErr := handleResponse(resp)
- if handleErr != nil {
- return
- }
- if len(result) == 0 {
- return
- }
- groupByTags := getGroupByTags(queryCriteria)
- tagValueMap := extractTagValuesFromDataPoints(result,
groupByTags)
- rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
- if rewriteErr != nil {
- p.log.Error().Err(rewriteErr).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
- return
- }
- rewriteQueryCriteria :=
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
- resp = p.executeQuery(ctx, rewriteQueryCriteria)
- dataPoints, handleErr := handleResponse(resp)
- if handleErr != nil {
- return
- }
- resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{DataPoints: dataPoints})
- }
return
}
@@ -417,18 +335,6 @@ func (p *measureQueryProcessor) executeQuery(ctx
context.Context, queryCriteria
return
}
-func handleResponse(resp bus.Message) ([]*measurev1.DataPoint, *common.Error) {
- data := resp.Data()
- switch d := data.(type) {
- case *common.Error:
- return nil, d
- case *measurev1.QueryResponse:
- return d.DataPoints, nil
- default:
- return nil, common.NewError("unexpected response data type:
%T", d)
- }
-}
-
type measureInternalQueryProcessor struct {
measureService measure.Service
*queryService
@@ -449,10 +355,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx
context.Context, message bus.Mes
resp = bus.NewMessage(bus.MessageID(now),
common.NewError("query request is nil"))
return
}
- // Handle RewriteAggTopNResult: double the top number for initial query
- if queryCriteria.RewriteAggTopNResult {
- queryCriteria.Top.Number *= 2
- }
defer func() {
if recoverErr := recover(); recoverErr != nil {
p.log.Error().Interface("err",
recoverErr).RawJSON("req", logger.Proto(queryCriteria)).Str("stack",
string(debug.Stack())).Msg("panic")
@@ -484,7 +386,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx
context.Context, message bus.Mes
var span *query.Span
if queryCriteria.Trace {
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
- span, ctx = tracer.StartSpan(ctx, "data-%s",
p.queryService.nodeID)
+ span, _ = tracer.StartSpan(ctx, "data-%s",
p.queryService.nodeID)
span.Tag("plan", plan.String())
defer func() {
respData := resp.Data()
@@ -505,29 +407,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx
context.Context, message bus.Mes
result := collectInternalDataPoints(mIterator)
- // Handle RewriteAggTopNResult: rewrite query to get original data with
Timestamp
- if queryCriteria.RewriteAggTopNResult && len(result) > 0 {
- groupByTags := getGroupByTags(queryCriteria)
- tagValueMap := extractTagValuesFromInternalDataPoints(result,
groupByTags)
- rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
- if rewriteErr != nil {
- mctx.ml.Error().Err(rewriteErr).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
- } else {
- rewriteQueryCriteria :=
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
- rewriteIterator, _, rewriteExecErr :=
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx, false)
- if rewriteExecErr != nil {
-
mctx.ml.Error().Err(rewriteExecErr).RawJSON("req",
logger.Proto(rewriteQueryCriteria)).Msg("fail to execute the rewrite query
plan")
- } else {
- defer func() {
- if closeErr := rewriteIterator.Close();
closeErr != nil {
-
mctx.ml.Error().Err(closeErr).Msg("fail to close the rewrite query plan")
- }
- }()
- result =
collectInternalDataPoints(rewriteIterator)
- }
- }
- }
-
qr := &measurev1.InternalQueryResponse{DataPoints: result}
if e := mctx.ml.Debug(); e.Enabled() {
e.RawJSON("ret", logger.Proto(qr)).Msg("got an internal measure
response")
@@ -542,85 +421,6 @@ func (p *measureInternalQueryProcessor) Rev(ctx
context.Context, message bus.Mes
return
}
-func rewriteCriteria(tagValueMap map[string][]*modelv1.TagValue)
(*modelv1.Criteria, error) {
- var tagConditions []*modelv1.Condition
- for tagName, tagValues := range tagValueMap {
- if len(tagValues) == 0 {
- continue
- }
- switch tagValues[0].GetValue().(type) {
- case *modelv1.TagValue_Str:
- valueSet := make(map[string]bool)
- for _, value := range tagValues {
- if strVal, ok :=
value.GetValue().(*modelv1.TagValue_Str); ok {
- valueSet[strVal.Str.GetValue()] = true
- }
- }
- values := make([]string, 0, len(valueSet))
- for value := range valueSet {
- values = append(values, value)
- }
- condition := &modelv1.Condition{
- Name: tagName,
- Op: modelv1.Condition_BINARY_OP_IN,
- Value: &modelv1.TagValue{
- Value: &modelv1.TagValue_StrArray{
- StrArray: &modelv1.StrArray{
- Value: values,
- },
- },
- },
- }
- tagConditions = append(tagConditions, condition)
- case *modelv1.TagValue_Int:
- valueSet := make(map[int64]bool)
- for _, value := range tagValues {
- if intVal, ok :=
value.GetValue().(*modelv1.TagValue_Int); ok {
- valueSet[intVal.Int.GetValue()] = true
- }
- }
- values := make([]int64, 0, len(valueSet))
- for value := range valueSet {
- values = append(values, value)
- }
- condition := &modelv1.Condition{
- Name: tagName,
- Op: modelv1.Condition_BINARY_OP_IN,
- Value: &modelv1.TagValue{
- Value: &modelv1.TagValue_IntArray{
- IntArray: &modelv1.IntArray{
- Value: values,
- },
- },
- },
- }
- tagConditions = append(tagConditions, condition)
- default:
- return nil, fmt.Errorf("unsupported tag value type:
%T", tagValues[0].GetValue())
- }
- }
- return buildCriteriaTree(tagConditions), nil
-}
-
-func buildCriteriaTree(conditions []*modelv1.Condition) *modelv1.Criteria {
- if len(conditions) == 0 {
- return nil
- }
- return &modelv1.Criteria{
- Exp: &modelv1.Criteria_Le{
- Le: &modelv1.LogicalExpression{
- Op: modelv1.LogicalExpression_LOGICAL_OP_AND,
- Left: &modelv1.Criteria{
- Exp: &modelv1.Criteria_Condition{
- Condition: conditions[0],
- },
- },
- Right: buildCriteriaTree(conditions[1:]),
- },
- },
- }
-}
-
type traceQueryProcessor struct {
traceService trace.Service
*queryService
diff --git a/pkg/query/logical/measure/measure_analyzer.go
b/pkg/query/logical/measure/measure_analyzer.go
index d096b14ac..06c1f4a8d 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -160,7 +160,7 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
}
}
- pushDownAgg := criteria.GetAgg() != nil && criteria.GetTop() == nil
+ pushDownAgg := criteria.GetAgg() != nil
plan := newUnresolvedDistributed(criteria, pushDownAgg)
// parse limit and offset
@@ -181,7 +181,7 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
criteria.GetAgg().GetFunction(),
criteria.GetGroupBy() != nil,
false, // emitPartial: liaison does not emit
partial
- pushDownAgg, // reduceMode: only reduce partials when
push-down is active (no TopN)
+ pushDownAgg, // reduceMode: reduce partials from data
nodes when push-down is active
)
pushedLimit = math.MaxInt
}
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index d229d99e6..01d46ee1a 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -154,13 +154,6 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
temp.GroupBy = ud.originalQuery.GroupBy
temp.Agg = ud.originalQuery.Agg
}
- // push down groupBy, agg and top to data node and rewrite agg result
to raw data
- if ud.originalQuery.Agg != nil && ud.originalQuery.Top != nil {
- temp.RewriteAggTopNResult = true
- temp.Agg = ud.originalQuery.Agg
- temp.Top = ud.originalQuery.Top
- temp.GroupBy = ud.originalQuery.GroupBy
- }
// Prepare groupBy tags refs if needed for deduplication
var groupByTagsRefs [][]*logical.TagRef
if ud.pushDownAgg && ud.originalQuery.GetGroupBy() != nil {