This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 594717bff fix(sidx): stabilize tag filter matching order (#1034)
594717bff is described below
commit 594717bff57fb229365fec8f835c0adf34059597
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Mar 30 21:53:12 2026 +0800
fix(sidx): stabilize tag filter matching order (#1034)
---
CHANGES.md | 4 +
.../internal/sidx/collect_tags_for_filter_test.go | 122 +++++++++++++++++++++
banyand/internal/sidx/sidx.go | 60 ++++++++--
test/cases/topn/topn.go | 3 +
..._duration_range_and_ids_order_timestamp_desc.ql | 28 +++++
...duration_range_and_ids_order_timestamp_desc.yml | 81 ++++++++++++++
...duration_range_and_ids_order_timestamp_desc.yml | 37 +++++++
test/cases/trace/trace.go | 2 +
8 files changed, 328 insertions(+), 9 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0a6999fc8..ab476c04a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,10 @@ Release Notes.
- Add log query e2e test.
- Sync lifecycle e2e test from SkyWalking stages test.
+### Bug Fixes
+
+- Fix flaky trace query filtering caused by non-deterministic sidx tag
ordering and add consistency checks for integration query cases.
+
## 0.10.0
### Features
diff --git a/banyand/internal/sidx/collect_tags_for_filter_test.go
b/banyand/internal/sidx/collect_tags_for_filter_test.go
new file mode 100644
index 000000000..896ee6df4
--- /dev/null
+++ b/banyand/internal/sidx/collect_tags_for_filter_test.go
@@ -0,0 +1,122 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+// testStrDecoder decodes stored string bytes into model TagValue (same shape
as query path).
+func testStrDecoder(valueType pbv1.ValueType, value []byte, _ [][]byte)
*modelv1.TagValue {
+ if valueType != pbv1.ValueTypeStr {
+ return nil
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{Value: string(value)},
+ },
+ }
+}
+
+func tagKeysFromSlice(tags []*modelv1.Tag) []string {
+ keys := make([]string, len(tags))
+ for idx, tag := range tags {
+ keys[idx] = tag.Key
+ }
+ return keys
+}
+
+func newStrTagData(value string) *tagData {
+ return &tagData{
+ valueType: pbv1.ValueTypeStr,
+ values: []tagRow{
+ {value: []byte(value)},
+ },
+ }
+}
+
+// TestCollectTagsForFilter_DeterministicOrder verifies tag ordering for
TagFilter.Match:
+// logical matchers align tag values with schema by slice index, so iteration
order must be stable.
+// Map iteration order is undefined; without this contract, filtering can
flake (0 vs 1 rows).
+func TestCollectTagsForFilter_DeterministicOrder(t *testing.T) {
+ t.Run("no projection sorts tag names lexicographically", func(t
*testing.T) {
+ blk := &block{
+ userKeys: []int64{1},
+ data: [][]byte{{}},
+ tags: make(map[string]*tagData),
+ }
+ // Deliberately insert keys in non-lexicographic order.
+ blk.tags["zebra"] = newStrTagData("z")
+ blk.tags["middle"] = newStrTagData("m")
+ blk.tags["alpha"] = newStrTagData("a")
+
+ b := &blockCursorBuilder{block: blk}
+ var buf []*modelv1.Tag
+ buf = b.collectTagsForFilter(buf, testStrDecoder, 0,
b.orderedTagNamesForFilter(nil))
+ require.Len(t, buf, 3)
+ assert.Equal(t, []string{"alpha", "middle", "zebra"},
tagKeysFromSlice(buf))
+ })
+
+ t.Run("tag projection defines order", func(t *testing.T) {
+ blk := &block{
+ userKeys: []int64{1},
+ data: [][]byte{{}},
+ tags: make(map[string]*tagData),
+ }
+ blk.tags["zebra"] = newStrTagData("z")
+ blk.tags["middle"] = newStrTagData("m")
+ blk.tags["alpha"] = newStrTagData("a")
+
+ b := &blockCursorBuilder{block: blk}
+ projections := []model.TagProjection{
+ {Family: "span", Names: []string{"zebra", "alpha",
"middle"}},
+ }
+ var buf []*modelv1.Tag
+ buf = b.collectTagsForFilter(buf, testStrDecoder, 0,
b.orderedTagNamesForFilter(projections))
+ require.Len(t, buf, 3)
+ assert.Equal(t, []string{"zebra", "alpha", "middle"},
tagKeysFromSlice(buf))
+ })
+
+ t.Run("duplicate tag names across projection families are
deduplicated", func(t *testing.T) {
+ blk := &block{
+ userKeys: []int64{1},
+ data: [][]byte{{}},
+ tags: make(map[string]*tagData),
+ }
+ blk.tags["zebra"] = newStrTagData("z")
+ blk.tags["middle"] = newStrTagData("m")
+ blk.tags["alpha"] = newStrTagData("a")
+
+ b := &blockCursorBuilder{block: blk}
+ projections := []model.TagProjection{
+ {Family: "span", Names: []string{"zebra", "alpha"}},
+ {Family: "service", Names: []string{"alpha", "middle",
"zebra"}},
+ }
+ var buf []*modelv1.Tag
+ buf = b.collectTagsForFilter(buf, testStrDecoder, 0,
b.orderedTagNamesForFilter(projections))
+ require.Len(t, buf, 3)
+ assert.Equal(t, []string{"zebra", "alpha", "middle"},
tagKeysFromSlice(buf))
+ })
+}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index f699305f5..910d85b4a 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -24,6 +24,7 @@ import (
"fmt"
"os"
"path/filepath"
+ "sort"
"strings"
"sync"
"sync/atomic"
@@ -36,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
)
const (
@@ -358,6 +360,7 @@ func (b *blockCursorBuilder) processWithFilter(req
QueryRequest, log *logger.Log
tags := make([]*modelv1.Tag, 0, len(b.block.tags))
decoder := req.TagFilter.GetDecoder()
+ orderedTagNames := b.orderedTagNamesForFilter(req.TagProjection)
for i := 0; i < len(b.block.userKeys); i++ {
dataBytes := b.block.data[i]
@@ -366,7 +369,7 @@ func (b *blockCursorBuilder) processWithFilter(req
QueryRequest, log *logger.Log
continue
}
- tags = b.collectTagsForFilter(tags, decoder, i)
+ tags = b.collectTagsForFilter(tags, decoder, i, orderedTagNames)
matched, err := req.TagFilter.Match(tags)
if err != nil {
@@ -395,26 +398,65 @@ func (b *blockCursorBuilder) processWithoutFilter() {
}
}
-func (b *blockCursorBuilder) collectTagsForFilter(buf []*modelv1.Tag, decoder
func(pbv1.ValueType, []byte, [][]byte) *modelv1.TagValue, index int)
[]*modelv1.Tag {
+func (b *blockCursorBuilder) collectTagsForFilter(
+ buf []*modelv1.Tag,
+ decoder func(pbv1.ValueType, []byte, [][]byte) *modelv1.TagValue,
+ index int,
+ tagNames []string,
+) []*modelv1.Tag {
buf = buf[:0]
-
- for tagName, tagData := range b.block.tags {
+ for _, tagName := range tagNames {
+ tagData, ok := b.block.tags[tagName]
+ if !ok {
+ continue
+ }
if index >= len(tagData.values) {
continue
}
row := &tagData.values[index]
tagValue := decoder(tagData.valueType, row.value, row.valueArr)
- if tagValue != nil {
- buf = append(buf, &modelv1.Tag{
- Key: tagName,
- Value: tagValue,
- })
+ if tagValue == nil {
+ continue
}
+ buf = append(buf, &modelv1.Tag{
+ Key: tagName,
+ Value: tagValue,
+ })
}
return buf
}
+func (b *blockCursorBuilder) orderedTagNamesForFilter(projections
[]model.TagProjection) []string {
+ // IMPORTANT: The logical tag filter matcher relies on stable tag
ordering,
+ // because it uses schema tag refs to locate tag values by index
(TagFamily.Tags[idx]).
+ // Iterating a Go map is randomized, which makes results flaky.
+ if len(projections) > 0 {
+ seenTagNames := make(map[string]struct{}, len(b.block.tags))
+ tagNames := make([]string, 0, len(b.block.tags))
+ for _, proj := range projections {
+ for _, tagName := range proj.Names {
+ if _, exists := seenTagNames[tagName]; exists {
+ continue
+ }
+ if _, exists := b.block.tags[tagName]; !exists {
+ continue
+ }
+ tagNames = append(tagNames, tagName)
+ seenTagNames[tagName] = struct{}{}
+ }
+ }
+ return tagNames
+ }
+
+ tagNames := make([]string, 0, len(b.block.tags))
+ for tagName := range b.block.tags {
+ tagNames = append(tagNames, tagName)
+ }
+ sort.Strings(tagNames)
+ return tagNames
+}
+
func (b *blockCursorBuilder) appendElement(index int, hash uint64, dataBytes
[]byte) {
key := b.block.userKeys[index]
if !b.keyInRange(key) {
diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go
index 51b983eb3..2664b2391 100644
--- a/test/cases/topn/topn.go
+++ b/test/cases/topn/topn.go
@@ -36,6 +36,9 @@ var (
gm.Eventually(func(innerGm gm.Gomega) {
topNTestData.VerifyFn(innerGm, SharedContext, args)
}, flags.EventuallyTimeout).WithTimeout(10 *
time.Second).WithPolling(2 * time.Second).Should(gm.Succeed())
+ gm.Consistently(func(innerGm gm.Gomega) {
+ topNTestData.VerifyFn(innerGm, SharedContext, args)
+ }, flags.ConsistentlyTimeout).Should(gm.Succeed())
}
)
diff --git
a/test/cases/trace/data/input/state_duration_range_and_ids_order_timestamp_desc.ql
b/test/cases/trace/data/input/state_duration_range_and_ids_order_timestamp_desc.ql
new file mode 100644
index 000000000..d1a0559e8
--- /dev/null
+++
b/test/cases/trace/data/input/state_duration_range_and_ids_order_timestamp_desc.ql
@@ -0,0 +1,28 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+SELECT () FROM TRACE sw IN test-trace-group
+TIME > '-15m'
+WHERE state = 0
+ AND endpoint_id = '/home_endpoint'
+ AND service_instance_id = 'webapp_instance_1'
+ AND service_id = 'webapp_service'
+ AND duration >= 150
+ AND duration <= 1200
+ORDER BY timestamp DESC
+LIMIT 20
+
diff --git
a/test/cases/trace/data/input/state_duration_range_and_ids_order_timestamp_desc.yml
b/test/cases/trace/data/input/state_duration_range_and_ids_order_timestamp_desc.yml
new file mode 100644
index 000000000..2b8a28ef7
--- /dev/null
+++
b/test/cases/trace/data/input/state_duration_range_and_ids_order_timestamp_desc.yml
@@ -0,0 +1,81 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-group"]
+limit: 20
+order_by:
+ index_rule_name: "timestamp"
+ sort: "SORT_DESC"
+criteria:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ condition:
+ name: "state"
+ op: "BINARY_OP_EQ"
+ value:
+ int:
+ value: 0
+ right:
+ condition:
+ name: "endpoint_id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "/home_endpoint"
+ right:
+ condition:
+ name: "service_instance_id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "webapp_instance_1"
+ right:
+ condition:
+ name: "service_id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "webapp_service"
+ right:
+ condition:
+ name: "duration"
+ op: "BINARY_OP_GE"
+ value:
+ int:
+ value: 150
+ right:
+ condition:
+ name: "duration"
+ op: "BINARY_OP_LE"
+ value:
+ int:
+ value: 1200
+
diff --git
a/test/cases/trace/data/want/state_duration_range_and_ids_order_timestamp_desc.yml
b/test/cases/trace/data/want/state_duration_range_and_ids_order_timestamp_desc.yml
new file mode 100644
index 000000000..a511266ae
--- /dev/null
+++
b/test/cases/trace/data/want/state_duration_range_and_ids_order_timestamp_desc.yml
@@ -0,0 +1,37 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+ - spans:
+ - span: trace_005_span_1
+ spanId: span_005_1
+ - span: trace_005_span_2
+ spanId: span_005_2
+ - span: trace_005_span_3
+ spanId: span_005_3
+ - span: trace_005_span_4
+ spanId: span_005_4
+ traceId: trace_005
+ - spans:
+ - span: trace_003_span_1
+ spanId: span_003_1
+ - span: trace_003_span_2
+ spanId: span_003_2
+ - span: trace_003_span_3
+ spanId: span_003_3
+ traceId: trace_003
+
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index a22111f77..88b9744d7 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -54,6 +54,8 @@ var _ = g.DescribeTable("Scanning Traces", func(args
helpers.Args) {
g.Entry("filter by service id", helpers.Args{Input:
"eq_service_order_timestamp_desc", Duration: 1 * time.Hour}),
g.Entry("filter by service instance id", helpers.Args{Input:
"eq_service_instance_order_time_asc", Duration: 1 * time.Hour}),
g.Entry("filter by service instance id and endpoint id",
helpers.Args{Input: "eq_service_instance_and_endpoint_order_timestamp_asc",
Duration: 1 * time.Hour}),
+ g.Entry("filter by state and duration range and ids order by timestamp
desc",
+ helpers.Args{Input:
"state_duration_range_and_ids_order_timestamp_desc", Duration: 1 * time.Hour}),
g.Entry("filter by endpoint", helpers.Args{Input:
"eq_endpoint_order_duration_asc", Duration: 1 * time.Hour}),
g.Entry("order by timestamp limit 2", helpers.Args{Input:
"order_timestamp_desc_limit", Duration: 1 * time.Hour}),
g.Entry("filter by trace id and service unknown", helpers.Args{Input:
"eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}),