This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch test/replication
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 220c6c1dbffe91be4bb6115e3ea34c0da7ce11bd
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri Mar 20 09:50:45 2026 +0000

    fix(replication): correct group replicas to match original testdata
    
    Ensure replicated schemas exactly match what test cases expect:
    - exception and index_mode groups: remove incorrect replicas field
    - sw_metric, sw_spec, sw_spec2, updated: set correct replicas values
    - Add missing group files and index rule bindings for trace replication
    - Add missing trace testdata files (traces, index_rules, 
index_rule_bindings)
---
 pkg/test/replicated/measure/etcd.go                |  19 ++-
 .../measure/testdata/groups/exception.json         |   3 +-
 .../measure/testdata/groups/index_mode.json        |   3 +-
 .../groups/{exception.json => replicated.json}     |   4 +-
 .../groups/{exception.json => sw_spec.json}        |   6 +-
 .../groups/{exception.json => sw_spec2.json}       |   6 +-
 .../groups/{exception.json => updated.json}        |   4 +-
 .../service_traffic_replicated.json                |  17 +++
 pkg/test/replicated/stream/etcd.go                 | 161 ++++++++++++++-------
 pkg/test/replicated/trace/etcd.go                  |  17 ++-
 .../trace/testdata/index_rule_bindings/sw.json     |  17 +++
 .../testdata/index_rule_bindings/sw_spec.json      |  14 ++
 .../testdata/index_rule_bindings/sw_spec2.json     |  14 ++
 .../testdata/index_rule_bindings/sw_updated.json   |  17 +++
 .../trace/testdata/index_rule_bindings/zipkin.json |  16 ++
 .../trace/testdata/index_rules/duration.json       |  14 ++
 .../trace/testdata/index_rules/duration_spec.json  |  14 ++
 .../trace/testdata/index_rules/duration_spec2.json |   9 ++
 .../testdata/index_rules/duration_updated.json     |  14 ++
 .../trace/testdata/index_rules/timestamp.json      |  14 ++
 .../trace/testdata/index_rules/timestamp_spec.json |  14 ++
 .../testdata/index_rules/timestamp_spec2.json      |   9 ++
 .../testdata/index_rules/timestamp_updated.json    |  14 ++
 .../testdata/index_rules/zipkin-timestamp.json     |  14 ++
 pkg/test/replicated/trace/testdata/traces/sw.json  |  44 ++++++
 .../replicated/trace/testdata/traces/sw_spec.json  |  45 ++++++
 .../replicated/trace/testdata/traces/sw_spec2.json |  44 ++++++
 .../trace/testdata/traces/sw_updated.json          |  48 ++++++
 .../replicated/trace/testdata/traces/zipkin.json   |  76 ++++++++++
 .../replication/measure_normal_replication_test.go |   2 +-
 .../replication/replication_suite_test.go          |  21 +--
 test/integration/replication/replication_test.go   |  26 +---
 .../replication/stream_replication_test.go         |   1 +
 .../replication/trace_replication_test.go          |   1 +
 34 files changed, 642 insertions(+), 100 deletions(-)

diff --git a/pkg/test/replicated/measure/etcd.go 
b/pkg/test/replicated/measure/etcd.go
index 7304d91ca..c50aca76b 100644
--- a/pkg/test/replicated/measure/etcd.go
+++ b/pkg/test/replicated/measure/etcd.go
@@ -22,6 +22,7 @@ import (
        "context"
        "embed"
        "path"
+       "reflect"
 
        "github.com/pkg/errors"
        "google.golang.org/protobuf/encoding/protojson"
@@ -88,16 +89,24 @@ func loadSchema[T proto.Message](dir string, resource T, 
loadFn func(resource T)
                if err != nil {
                        return err
                }
-               resource.ProtoReflect().Descriptor().RequiredNumbers()
-               if err := protojson.Unmarshal(data, resource); err != nil {
+               // Create a new instance for each file to avoid race conditions
+               // when the callback holds a reference to the resource
+               newResource := newProtoMessage(resource)
+               if err := protojson.Unmarshal(data, newResource); err != nil {
                        return err
                }
-               if err := loadFn(resource); err != nil {
+               if err := loadFn(newResource); err != nil {
                        if errors.Is(err, schema.ErrGRPCAlreadyExists) {
-                               return nil
+                               continue
                        }
                        return err
                }
        }
        return nil
-}
\ No newline at end of file
+}
+
+// newProtoMessage creates a new instance of the same type as the template.
+func newProtoMessage[T proto.Message](template T) T {
+       v := reflect.New(reflect.TypeOf(template).Elem()).Interface().(T)
+       return v
+}
diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json 
b/pkg/test/replicated/measure/testdata/groups/exception.json
index d57fbef64..fadbd5a5b 100644
--- a/pkg/test/replicated/measure/testdata/groups/exception.json
+++ b/pkg/test/replicated/measure/testdata/groups/exception.json
@@ -12,8 +12,7 @@
     "ttl": {
       "unit": "UNIT_DAY",
       "num": 7
-    },
-    "replicas": 2
+    }
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/test/replicated/measure/testdata/groups/index_mode.json 
b/pkg/test/replicated/measure/testdata/groups/index_mode.json
index 009222d30..fb0d9b4b2 100644
--- a/pkg/test/replicated/measure/testdata/groups/index_mode.json
+++ b/pkg/test/replicated/measure/testdata/groups/index_mode.json
@@ -12,8 +12,7 @@
     "ttl": {
       "unit": "UNIT_DAY",
       "num": 7
-    },
-    "replicas": 2
+    }
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json 
b/pkg/test/replicated/measure/testdata/groups/replicated.json
similarity index 89%
copy from pkg/test/replicated/measure/testdata/groups/exception.json
copy to pkg/test/replicated/measure/testdata/groups/replicated.json
index d57fbef64..98c30b4d8 100644
--- a/pkg/test/replicated/measure/testdata/groups/exception.json
+++ b/pkg/test/replicated/measure/testdata/groups/replicated.json
@@ -1,6 +1,6 @@
 {
   "metadata": {
-    "name": "exception"
+    "name": "replicated_group"
   },
   "catalog": "CATALOG_MEASURE",
   "resource_opts": {
@@ -16,4 +16,4 @@
     "replicas": 2
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
+}
diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json 
b/pkg/test/replicated/measure/testdata/groups/sw_spec.json
similarity index 86%
copy from pkg/test/replicated/measure/testdata/groups/exception.json
copy to pkg/test/replicated/measure/testdata/groups/sw_spec.json
index d57fbef64..47d9d404e 100644
--- a/pkg/test/replicated/measure/testdata/groups/exception.json
+++ b/pkg/test/replicated/measure/testdata/groups/sw_spec.json
@@ -1,6 +1,6 @@
 {
   "metadata": {
-    "name": "exception"
+    "name": "sw_spec"
   },
   "catalog": "CATALOG_MEASURE",
   "resource_opts": {
@@ -13,7 +13,7 @@
       "unit": "UNIT_DAY",
       "num": 7
     },
-    "replicas": 2
+    "replicas": 1
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
+}
diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json 
b/pkg/test/replicated/measure/testdata/groups/sw_spec2.json
similarity index 86%
copy from pkg/test/replicated/measure/testdata/groups/exception.json
copy to pkg/test/replicated/measure/testdata/groups/sw_spec2.json
index d57fbef64..90473439a 100644
--- a/pkg/test/replicated/measure/testdata/groups/exception.json
+++ b/pkg/test/replicated/measure/testdata/groups/sw_spec2.json
@@ -1,6 +1,6 @@
 {
   "metadata": {
-    "name": "exception"
+    "name": "sw_spec2"
   },
   "catalog": "CATALOG_MEASURE",
   "resource_opts": {
@@ -13,7 +13,7 @@
       "unit": "UNIT_DAY",
       "num": 7
     },
-    "replicas": 2
+    "replicas": 1
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
+}
diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json 
b/pkg/test/replicated/measure/testdata/groups/updated.json
similarity index 86%
copy from pkg/test/replicated/measure/testdata/groups/exception.json
copy to pkg/test/replicated/measure/testdata/groups/updated.json
index d57fbef64..e2cd972d8 100644
--- a/pkg/test/replicated/measure/testdata/groups/exception.json
+++ b/pkg/test/replicated/measure/testdata/groups/updated.json
@@ -1,6 +1,6 @@
 {
   "metadata": {
-    "name": "exception"
+    "name": "sw_updated"
   },
   "catalog": "CATALOG_MEASURE",
   "resource_opts": {
@@ -13,7 +13,7 @@
       "unit": "UNIT_DAY",
       "num": 7
     },
-    "replicas": 2
+    "replicas": 1
   },
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git 
a/pkg/test/replicated/measure/testdata/index_rule_bindings/service_traffic_replicated.json
 
b/pkg/test/replicated/measure/testdata/index_rule_bindings/service_traffic_replicated.json
new file mode 100644
index 000000000..c75ff152c
--- /dev/null
+++ 
b/pkg/test/replicated/measure/testdata/index_rule_bindings/service_traffic_replicated.json
@@ -0,0 +1,17 @@
+{
+  "metadata": {
+    "name": "service_traffic_rule_binding",
+    "group": "replicated_group"
+  },
+  "rules": [
+    "service_id",
+    "layer"
+  ],
+  "subject":{
+    "catalog": "CATALOG_MEASURE",
+    "name": "service_traffic"
+  },
+  "begin_at": "2021-04-15T01:30:15.01Z",
+  "expire_at": "2121-04-15T01:30:15.01Z",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/replicated/stream/etcd.go 
b/pkg/test/replicated/stream/etcd.go
index 5eabc0ee5..70e6edb18 100644
--- a/pkg/test/replicated/stream/etcd.go
+++ b/pkg/test/replicated/stream/etcd.go
@@ -21,86 +21,149 @@ package replicatedstream
 import (
        "context"
        "embed"
+       "encoding/json"
+       "errors"
        "path"
 
-       "github.com/pkg/errors"
        "google.golang.org/protobuf/encoding/protojson"
-       "google.golang.org/protobuf/proto"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 const (
-       groupDir            = "testdata"
        streamDir           = "testdata/streams"
        indexRuleDir        = "testdata/index_rules"
        indexRuleBindingDir = "testdata/index_rule_bindings"
 )
 
-//go:embed testdata/*
-var store embed.FS
-
-// PreloadSchema loads schemas from files in the booting process.
-func PreloadSchema(ctx context.Context, e schema.Registry) error {
-       return preloadSchemaWithFuncs(ctx, e,
-               func(ctx context.Context, e schema.Registry) error {
-                       return loadSchema(groupDir, &commonv1.Group{}, 
func(group *commonv1.Group) error {
-                               return e.CreateGroup(ctx, group)
-                       })
-               },
-               func(ctx context.Context, e schema.Registry) error {
-                       return loadSchema(streamDir, &databasev1.Stream{}, 
func(stream *databasev1.Stream) error {
-                               _, innerErr := e.CreateStream(ctx, stream)
-                               return innerErr
-                       })
-               },
-               func(ctx context.Context, e schema.Registry) error {
-                       return loadSchema(indexRuleDir, 
&databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error {
-                               return e.CreateIndexRule(ctx, indexRule)
-                       })
-               },
-               func(ctx context.Context, e schema.Registry) error {
-                       return loadSchema(indexRuleBindingDir, 
&databasev1.IndexRuleBinding{}, func(indexRuleBinding 
*databasev1.IndexRuleBinding) error {
-                               return e.CreateIndexRuleBinding(ctx, 
indexRuleBinding)
-                       })
-               },
-       )
-}
+var (
+       //go:embed testdata/index_rules/*.json
+       indexRuleStore embed.FS
+       //go:embed testdata/index_rule_bindings/*.json
+       indexRuleBindingStore embed.FS
+       //go:embed testdata/streams/*.json
+       streamStore embed.FS
+       //go:embed testdata/group.json
+       groupJSON string
+       //go:embed testdata/group_with_stages.json
+       groupWithStagesJSON string
+)
 
-func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders 
...func(context.Context, schema.Registry) error) error {
-       for _, loader := range loaders {
-               if err := loader(ctx, e); err != nil {
-                       return errors.WithStack(err)
+// loadSchemas loads streams, index rules, and index rule bindings.
+func loadSchemas(ctx context.Context, e schema.Registry) error {
+       streams, err := streamStore.ReadDir(streamDir)
+       if err != nil {
+               return err
+       }
+       var data []byte
+       for _, entry := range streams {
+               data, err = streamStore.ReadFile(path.Join(streamDir, 
entry.Name()))
+               if err != nil {
+                       return err
+               }
+               var stream databasev1.Stream
+               err = protojson.Unmarshal(data, &stream)
+               if err != nil {
+                       return err
+               }
+               if _, innerErr := e.CreateStream(ctx, &stream); innerErr != nil 
{
+                       return innerErr
                }
        }
-       return nil
-}
 
-func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource 
T) error) error {
-       entries, err := store.ReadDir(dir)
+       entries, err := indexRuleStore.ReadDir(indexRuleDir)
        if err != nil {
                return err
        }
        for _, entry := range entries {
-               if entry.IsDir() {
-                       continue
+               data, err = indexRuleStore.ReadFile(path.Join(indexRuleDir, 
entry.Name()))
+               if err != nil {
+                       return err
                }
-               data, err := store.ReadFile(path.Join(dir, entry.Name()))
+               var idxRule databasev1.IndexRule
+               err = protojson.Unmarshal(data, &idxRule)
                if err != nil {
                        return err
                }
-               resource.ProtoReflect().Descriptor().RequiredNumbers()
-               if err := protojson.Unmarshal(data, resource); err != nil {
+               if innerErr := e.CreateIndexRule(ctx, &idxRule); innerErr != 
nil {
+                       return innerErr
+               }
+       }
+       indexRulesBindings, err := 
indexRuleBindingStore.ReadDir(indexRuleBindingDir)
+       if err != nil {
+               return err
+       }
+       for _, entry := range indexRulesBindings {
+               data, err = 
indexRuleBindingStore.ReadFile(path.Join(indexRuleBindingDir, entry.Name()))
+               if err != nil {
                        return err
                }
-               if err := loadFn(resource); err != nil {
-                       if errors.Is(err, schema.ErrGRPCAlreadyExists) {
-                               return nil
-                       }
+               var idxRuleBinding databasev1.IndexRuleBinding
+               err = protojson.Unmarshal(data, &idxRuleBinding)
+               if err != nil {
                        return err
                }
+               if innerErr := e.CreateIndexRuleBinding(ctx, &idxRuleBinding); 
innerErr != nil {
+                       return innerErr
+               }
        }
+
        return nil
 }
+
+// LoadSchemaWithStages loads schemas from files, including group stages.
+func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error {
+       if e == nil {
+               return nil
+       }
+       var rawGroups []json.RawMessage
+       if err := json.Unmarshal([]byte(groupWithStagesJSON), &rawGroups); err 
!= nil {
+               return err
+       }
+       for _, raw := range rawGroups {
+               g := &commonv1.Group{}
+               if err := protojson.Unmarshal(raw, g); err != nil {
+                       return err
+               }
+               _, err := e.GetGroup(ctx, g.Metadata.Name)
+               if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+                       logger.Infof("group %s already exists", g.Metadata.Name)
+                       return nil
+               }
+               if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
+                       return innerErr
+               }
+       }
+       return loadSchemas(ctx, e)
+}
+
+// PreloadSchema loads schemas from files in the booting process.
+// This version loads group without stages.
+func PreloadSchema(ctx context.Context, e schema.Registry) error {
+       if e == nil {
+               return nil
+       }
+       var rawGroups []json.RawMessage
+       if err := json.Unmarshal([]byte(groupJSON), &rawGroups); err != nil {
+               return err
+       }
+       for _, raw := range rawGroups {
+               g := &commonv1.Group{}
+               if err := protojson.Unmarshal(raw, g); err != nil {
+                       return err
+               }
+               _, err := e.GetGroup(ctx, g.Metadata.Name)
+               if !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+                       logger.Infof("group %s already exists", g.Metadata.Name)
+                       return nil
+               }
+               if innerErr := e.CreateGroup(ctx, g); innerErr != nil {
+                       return innerErr
+               }
+       }
+
+       return loadSchemas(ctx, e)
+}
diff --git a/pkg/test/replicated/trace/etcd.go 
b/pkg/test/replicated/trace/etcd.go
index 2ab2222e2..adc4c2463 100644
--- a/pkg/test/replicated/trace/etcd.go
+++ b/pkg/test/replicated/trace/etcd.go
@@ -22,6 +22,7 @@ import (
        "context"
        "embed"
        "path"
+       "reflect"
 
        "github.com/pkg/errors"
        "google.golang.org/protobuf/encoding/protojson"
@@ -100,16 +101,24 @@ func loadSchema[T proto.Message](dir string, resource T, 
loadFn func(resource T)
                if err != nil {
                        return err
                }
-               resource.ProtoReflect().Descriptor().RequiredNumbers()
-               if err := protojson.Unmarshal(data, resource); err != nil {
+               // Create a new instance for each file to avoid race conditions
+               // when the callback holds a reference to the resource
+               newResource := newProtoMessage(resource)
+               if err := protojson.Unmarshal(data, newResource); err != nil {
                        return err
                }
-               if err := loadFn(resource); err != nil {
+               if err := loadFn(newResource); err != nil {
                        if errors.Is(err, schema.ErrGRPCAlreadyExists) {
-                               return nil
+                               continue
                        }
                        return err
                }
        }
        return nil
 }
+
+// newProtoMessage creates a new instance of the same type as the template.
+func newProtoMessage[T proto.Message](template T) T {
+       v := reflect.New(reflect.TypeOf(template).Elem()).Interface().(T)
+       return v
+}
diff --git a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw.json 
b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw.json
new file mode 100644
index 000000000..7955985b2
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw.json
@@ -0,0 +1,17 @@
+{
+    "metadata": {
+        "name": "sw-index-rule-binding",
+        "group": "test-trace-group"
+    },
+    "rules": [
+        "duration",
+        "timestamp"
+    ],
+    "subject": {
+        "catalog": "CATALOG_TRACE",
+        "name": "sw"
+    },
+    "begin_at": "2021-04-15T01:30:15.01Z",
+    "expire_at": "2121-04-15T01:30:15.01Z",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git 
a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec.json 
b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec.json
new file mode 100644
index 000000000..810c1612e
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec.json
@@ -0,0 +1,14 @@
+{
+  "metadata": {
+    "name": "sw-spec-index-rule-binding",
+    "group": "test-trace-spec"
+  },
+  "rules": ["duration", "timestamp"],
+  "subject": {
+    "catalog": "CATALOG_TRACE",
+    "name": "sw"
+  },
+  "begin_at": "2021-04-15T01:30:15.01Z",
+  "expire_at": "2121-04-15T01:30:15.01Z",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git 
a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec2.json 
b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec2.json
new file mode 100644
index 000000000..4daee2a9f
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec2.json
@@ -0,0 +1,14 @@
+{
+  "metadata": {
+    "name": "sw-spec2-index-rule-binding",
+    "group": "test-trace-spec2"
+  },
+  "rules": ["duration", "timestamp"],
+  "subject": {
+    "catalog": "CATALOG_TRACE",
+    "name": "sw"
+  },
+  "begin_at": "2021-04-15T01:30:15.01Z",
+  "expire_at": "2121-04-15T01:30:15.01Z",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git 
a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_updated.json 
b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_updated.json
new file mode 100644
index 000000000..c611a4903
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_updated.json
@@ -0,0 +1,17 @@
+{
+    "metadata": {
+        "name": "sw-updated-index-rule-binding",
+        "group": "test-trace-updated"
+    },
+    "rules": [
+        "duration",
+        "timestamp"
+    ],
+    "subject": {
+        "catalog": "CATALOG_TRACE",
+        "name": "sw"
+    },
+    "begin_at": "2021-04-15T01:30:15.01Z",
+    "expire_at": "2121-04-15T01:30:15.01Z",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/index_rule_bindings/zipkin.json 
b/pkg/test/replicated/trace/testdata/index_rule_bindings/zipkin.json
new file mode 100644
index 000000000..cf03fabbc
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/zipkin.json
@@ -0,0 +1,16 @@
+{
+    "metadata": {
+        "name": "zipkin-index-rule-binding",
+        "group": "zipkinTrace"
+    },
+    "rules": [
+        "zipkin-timestamp"
+    ],
+    "subject": {
+        "catalog": "CATALOG_TRACE",
+        "name": "zipkin"
+    },
+    "begin_at": "2021-04-15T01:30:15.01Z",
+    "expire_at": "2121-04-15T01:30:15.01Z",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/index_rules/duration.json 
b/pkg/test/replicated/trace/testdata/index_rules/duration.json
new file mode 100644
index 000000000..c9cb393ab
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/duration.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "duration",
+        "group": "test-trace-group"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "duration"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/index_rules/duration_spec.json 
b/pkg/test/replicated/trace/testdata/index_rules/duration_spec.json
new file mode 100644
index 000000000..a89781ee6
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/duration_spec.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "duration",
+        "group": "test-trace-spec"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "duration"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/index_rules/duration_spec2.json 
b/pkg/test/replicated/trace/testdata/index_rules/duration_spec2.json
new file mode 100644
index 000000000..3d3240f35
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/duration_spec2.json
@@ -0,0 +1,9 @@
+{
+  "metadata": {
+    "name": "duration",
+    "group": "test-trace-spec2"
+  },
+  "tags": ["service_id", "service_instance_id", "state", "duration"],
+  "type": "TYPE_TREE",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git 
a/pkg/test/replicated/trace/testdata/index_rules/duration_updated.json 
b/pkg/test/replicated/trace/testdata/index_rules/duration_updated.json
new file mode 100644
index 000000000..d31796ad6
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/duration_updated.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "duration",
+        "group": "test-trace-updated"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "duration"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/index_rules/timestamp.json 
b/pkg/test/replicated/trace/testdata/index_rules/timestamp.json
new file mode 100644
index 000000000..be83e872e
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "timestamp",
+        "group": "test-trace-group"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "timestamp"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec.json 
b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec.json
new file mode 100644
index 000000000..ba1a0e27d
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "timestamp",
+        "group": "test-trace-spec"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "timestamp"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git 
a/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec2.json 
b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec2.json
new file mode 100644
index 000000000..45c896307
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec2.json
@@ -0,0 +1,9 @@
+{
+  "metadata": {
+    "name": "timestamp",
+    "group": "test-trace-spec2"
+  },
+  "tags": ["service_id", "service_instance_id", "state", "timestamp"],
+  "type": "TYPE_TREE",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git 
a/pkg/test/replicated/trace/testdata/index_rules/timestamp_updated.json 
b/pkg/test/replicated/trace/testdata/index_rules/timestamp_updated.json
new file mode 100644
index 000000000..2b947fa33
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp_updated.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "timestamp",
+        "group": "test-trace-updated"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "timestamp"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git 
a/pkg/test/replicated/trace/testdata/index_rules/zipkin-timestamp.json 
b/pkg/test/replicated/trace/testdata/index_rules/zipkin-timestamp.json
new file mode 100644
index 000000000..98b483770
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/index_rules/zipkin-timestamp.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "zipkin-timestamp",
+        "group": "zipkinTrace"
+    },
+    "tags": [
+        "local_endpoint_service_name",
+        "operation_name",
+        "kind",
+        "timestamp"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/traces/sw.json 
b/pkg/test/replicated/trace/testdata/traces/sw.json
new file mode 100644
index 000000000..4f501b9a5
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/traces/sw.json
@@ -0,0 +1,44 @@
+{
+    "metadata": {
+        "name": "sw",
+        "group": "test-trace-group"
+    },
+    "tags": [
+        {
+            "name": "trace_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "state",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "service_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "service_instance_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "endpoint_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "duration",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "span_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "timestamp",
+            "type": "TAG_TYPE_TIMESTAMP"
+        }
+    ],
+    "trace_id_tag_name": "trace_id",
+    "span_id_tag_name": "span_id",
+    "timestamp_tag_name": "timestamp",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/replicated/trace/testdata/traces/sw_spec.json 
b/pkg/test/replicated/trace/testdata/traces/sw_spec.json
new file mode 100644
index 000000000..9b0841b72
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/traces/sw_spec.json
@@ -0,0 +1,45 @@
+{
+  "metadata": {
+    "name": "sw",
+    "group": "test-trace-spec"
+  },
+  "tags": [
+    {
+      "name": "trace_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "state",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "service_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "service_instance_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "endpoint_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "duration",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "span_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "timestamp",
+      "type": "TAG_TYPE_TIMESTAMP"
+    }
+  ],
+  "trace_id_tag_name": "trace_id",
+  "span_id_tag_name": "span_id",
+  "timestamp_tag_name": "timestamp",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
+
diff --git a/pkg/test/replicated/trace/testdata/traces/sw_spec2.json 
b/pkg/test/replicated/trace/testdata/traces/sw_spec2.json
new file mode 100644
index 000000000..c568dce6b
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/traces/sw_spec2.json
@@ -0,0 +1,44 @@
+{
+  "metadata": {
+    "name": "sw",
+    "group": "test-trace-spec2"
+  },
+  "tags": [
+    {
+      "name": "trace_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "state",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "service_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "service_instance_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "endpoint_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "duration",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "span_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "timestamp",
+      "type": "TAG_TYPE_TIMESTAMP"
+    }
+  ],
+  "trace_id_tag_name": "trace_id",
+  "span_id_tag_name": "span_id",
+  "timestamp_tag_name": "timestamp",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/replicated/trace/testdata/traces/sw_updated.json 
b/pkg/test/replicated/trace/testdata/traces/sw_updated.json
new file mode 100644
index 000000000..9d55e73b0
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/traces/sw_updated.json
@@ -0,0 +1,48 @@
+{
+    "metadata": {
+        "name": "sw",
+        "group": "test-trace-updated"
+    },
+    "tags": [
+        {
+            "name": "trace_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "state",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "service_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "service_instance_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "endpoint_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "duration",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "span_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "error_message",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "timestamp",
+            "type": "TAG_TYPE_TIMESTAMP"
+        }
+    ],
+    "trace_id_tag_name": "trace_id",
+    "span_id_tag_name": "span_id",
+    "timestamp_tag_name": "timestamp",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/replicated/trace/testdata/traces/zipkin.json 
b/pkg/test/replicated/trace/testdata/traces/zipkin.json
new file mode 100644
index 000000000..bc832e0bb
--- /dev/null
+++ b/pkg/test/replicated/trace/testdata/traces/zipkin.json
@@ -0,0 +1,76 @@
+{
+    "metadata": {
+        "name": "zipkin",
+        "group": "zipkinTrace"
+    },
+    "tags": [
+        {
+            "name": "trace_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "span_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "parent_id",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "operation_name",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "kind",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "duration",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "local_endpoint_service_name",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "local_endpoint_ipv4",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "local_endpoint_port",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "remote_endpoint_service_name",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "remote_endpoint_ipv4",
+            "type": "TAG_TYPE_STRING"
+        },
+        {
+            "name": "remote_endpoint_port",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "shared",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "debug",
+            "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "query",
+            "type": "TAG_TYPE_STRING_ARRAY"
+        },
+        {
+            "name": "timestamp",
+            "type": "TAG_TYPE_TIMESTAMP"
+        }
+    ],
+    "trace_id_tag_name": "trace_id",
+    "span_id_tag_name": "span_id",
+    "timestamp_tag_name": "timestamp",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/integration/replication/measure_normal_replication_test.go 
b/test/integration/replication/measure_normal_replication_test.go
index d0afa83b2..8bda48944 100644
--- a/test/integration/replication/measure_normal_replication_test.go
+++ b/test/integration/replication/measure_normal_replication_test.go
@@ -179,4 +179,4 @@ func isClusterStable(conn *grpc.ClientConn) bool {
                return false
        }
        return len(tire2Table.GetActive()) == 3
-}
\ No newline at end of file
+}
diff --git a/test/integration/replication/replication_suite_test.go 
b/test/integration/replication/replication_suite_test.go
index eb0570716..c8cba4d57 100644
--- a/test/integration/replication/replication_suite_test.go
+++ b/test/integration/replication/replication_suite_test.go
@@ -30,6 +30,7 @@ import (
        "google.golang.org/grpc/credentials/insecure"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       schemapkg 
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/pool"
@@ -37,10 +38,11 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
-       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_property "github.com/apache/skywalking-banyandb/pkg/test/property"
        test_replicated_measure 
"github.com/apache/skywalking-banyandb/pkg/test/replicated/measure"
        test_replicated_stream 
"github.com/apache/skywalking-banyandb/pkg/test/replicated/stream"
        test_replicated_trace 
"github.com/apache/skywalking-banyandb/pkg/test/replicated/trace"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        test_cases "github.com/apache/skywalking-banyandb/test/cases"
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
@@ -59,7 +61,6 @@ var (
        liaisonAddr     string
        dataNodeClosers []func()
        clusterConfig   *setup.ClusterConfig
-       tmpDirCleanup   func()
 )
 
 var _ = SynchronizedBeforeSuite(func() []byte {
@@ -76,13 +77,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
        clusterConfig = setup.PropertyClusterConfig(dfWriter)
 
-       // Load schemas via property-based registry
-       setup.PreloadSchemaViaProperty(clusterConfig,
-               test_replicated_measure.PreloadSchema,
-               test_replicated_stream.PreloadSchema,
-               test_replicated_trace.PreloadSchema,
-       )
-
        By("Starting 3 data nodes for replication test")
        dataNodeClosers = make([]func(), 0, 3)
 
@@ -91,6 +85,15 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                dataNodeClosers = append(dataNodeClosers, closeDataNode)
        }
 
+       By("Loading schema via property")
+       setup.PreloadSchemaViaProperty(clusterConfig,
+               test_replicated_measure.PreloadSchema,
+               test_replicated_stream.PreloadSchema,
+               test_replicated_trace.PreloadSchema,
+               test_property.PreloadSchema,
+       )
+       clusterConfig.AddLoadedKinds(schemapkg.KindStream, 
schemapkg.KindMeasure, schemapkg.KindTrace, schemapkg.KindProperty)
+
        By("Starting liaison node")
        liaisonAddr2, closerLiaisonNode := setup.LiaisonNode(clusterConfig, 
"--data-node-selector", "role=data")
        liaisonAddr = liaisonAddr2
diff --git a/test/integration/replication/replication_test.go 
b/test/integration/replication/replication_test.go
index e2c0f2f70..c609171b0 100644
--- a/test/integration/replication/replication_test.go
+++ b/test/integration/replication/replication_test.go
@@ -28,7 +28,6 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
@@ -66,14 +65,9 @@ var _ = g.Describe("Replication", func() {
                        gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil())
                        
gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group"))
 
-                       g.By("Getting list of all nodes from etcd (includes 
data nodes + liaison)")
-                       nodePath := "/" + metadata.DefaultNamespace + "/nodes"
-                       allNodes, err2 := helpers.ListKeys(etcdEndpoint, 
nodePath)
-                       gm.Expect(err2).NotTo(gm.HaveOccurred())
-
-                       // We have: 3 data nodes + 1 liaison node = 4 nodes 
total
-                       gm.Expect(len(allNodes)).To(gm.Equal(4),
-                               "Should have 4 nodes total (3 data nodes + 1 
liaison node), found %d", len(allNodes))
+                       g.By("Verifying cluster is stable with 3 data nodes")
+                       gm.Expect(isClusterStable(conn)).To(gm.BeTrue(),
+                               "Cluster should have 3 active data nodes before 
test")
 
                        g.By("Stopping one data node")
                        // We should have 3 data node closers in dataNodeClosers
@@ -83,15 +77,11 @@ var _ = g.Describe("Replication", func() {
                        copy(closersToStop, dataNodeClosers)
                        closersToStop[0]()
 
-                       // Wait for the cluster to stabilize
-                       gm.Eventually(func() int {
-                               nodes, err3 := helpers.ListKeys(etcdEndpoint, 
nodePath)
-                               if err3 != nil {
-                                       return 0
-                               }
-                               return len(nodes)
-                       }, flags.EventuallyTimeout).Should(gm.Equal(3),
-                               "Should have 3 nodes total after stopping one 
data node (2 data nodes + 1 liaison)")
+                       // Wait for the cluster to stabilize (should have 3 
active nodes after failure)
+                       gm.Eventually(func() bool {
+                               return isClusterStable(conn)
+                       }, flags.EventuallyTimeout).Should(gm.BeTrue(),
+                               "Cluster should have 3 active data nodes after 
stopping one node")
 
                        g.By("Verifying data is still accessible after node 
failure")
                        verifyDataContentAfterNodeFailure(conn, now)
diff --git a/test/integration/replication/stream_replication_test.go 
b/test/integration/replication/stream_replication_test.go
index 5b1485986..19d20e9bd 100644
--- a/test/integration/replication/stream_replication_test.go
+++ b/test/integration/replication/stream_replication_test.go
@@ -31,6 +31,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
        casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
 )
 
diff --git a/test/integration/replication/trace_replication_test.go 
b/test/integration/replication/trace_replication_test.go
index 4734d0196..067c4cd1d 100644
--- a/test/integration/replication/trace_replication_test.go
+++ b/test/integration/replication/trace_replication_test.go
@@ -72,6 +72,7 @@ var _ = g.Describe("Trace Normal Mode Replication", func() {
                })
                gm.Expect(groupErr).NotTo(gm.HaveOccurred())
                gm.Expect(groupResp.GetGroup()).NotTo(gm.BeNil())
+               
gm.Expect(groupResp.GetGroup().GetResourceOpts()).NotTo(gm.BeNil())
                
gm.Expect(groupResp.GetGroup().GetResourceOpts().GetReplicas()).To(gm.Equal(uint32(2)),
                        "test-trace-group should have replicas=2")
 


Reply via email to