This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test/replication in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 220c6c1dbffe91be4bb6115e3ea34c0da7ce11bd Author: Hongtao Gao <[email protected]> AuthorDate: Fri Mar 20 09:50:45 2026 +0000 fix(replication): correct group replicas to match original testdata Ensure replicated schemas exactly match what test cases expect: - exception and index_mode groups: remove incorrect replicas field - sw_metric, sw_spec, sw_spec2, updated: set correct replicas values - Add missing group files and index rule bindings for trace replication - Add missing trace testdata files (traces, index_rules, index_rule_bindings) --- pkg/test/replicated/measure/etcd.go | 19 ++- .../measure/testdata/groups/exception.json | 3 +- .../measure/testdata/groups/index_mode.json | 3 +- .../groups/{exception.json => replicated.json} | 4 +- .../groups/{exception.json => sw_spec.json} | 6 +- .../groups/{exception.json => sw_spec2.json} | 6 +- .../groups/{exception.json => updated.json} | 4 +- .../service_traffic_replicated.json | 17 +++ pkg/test/replicated/stream/etcd.go | 161 ++++++++++++++------- pkg/test/replicated/trace/etcd.go | 17 ++- .../trace/testdata/index_rule_bindings/sw.json | 17 +++ .../testdata/index_rule_bindings/sw_spec.json | 14 ++ .../testdata/index_rule_bindings/sw_spec2.json | 14 ++ .../testdata/index_rule_bindings/sw_updated.json | 17 +++ .../trace/testdata/index_rule_bindings/zipkin.json | 16 ++ .../trace/testdata/index_rules/duration.json | 14 ++ .../trace/testdata/index_rules/duration_spec.json | 14 ++ .../trace/testdata/index_rules/duration_spec2.json | 9 ++ .../testdata/index_rules/duration_updated.json | 14 ++ .../trace/testdata/index_rules/timestamp.json | 14 ++ .../trace/testdata/index_rules/timestamp_spec.json | 14 ++ .../testdata/index_rules/timestamp_spec2.json | 9 ++ .../testdata/index_rules/timestamp_updated.json | 14 ++ .../testdata/index_rules/zipkin-timestamp.json | 14 ++ pkg/test/replicated/trace/testdata/traces/sw.json | 44 ++++++ .../replicated/trace/testdata/traces/sw_spec.json | 45 ++++++ .../replicated/trace/testdata/traces/sw_spec2.json | 44 ++++++ .../trace/testdata/traces/sw_updated.json | 48 ++++++ .../replicated/trace/testdata/traces/zipkin.json | 76 ++++++++++ .../replication/measure_normal_replication_test.go | 2 +- .../replication/replication_suite_test.go | 21 +-- test/integration/replication/replication_test.go | 26 +--- .../replication/stream_replication_test.go | 1 + .../replication/trace_replication_test.go | 1 + 34 files changed, 642 insertions(+), 100 deletions(-) diff --git a/pkg/test/replicated/measure/etcd.go b/pkg/test/replicated/measure/etcd.go index 7304d91ca..c50aca76b 100644 --- a/pkg/test/replicated/measure/etcd.go +++ b/pkg/test/replicated/measure/etcd.go @@ -22,6 +22,7 @@ import ( "context" "embed" "path" + "reflect" "github.com/pkg/errors" "google.golang.org/protobuf/encoding/protojson" @@ -88,16 +89,24 @@ func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource T) if err != nil { return err } - resource.ProtoReflect().Descriptor().RequiredNumbers() - if err := protojson.Unmarshal(data, resource); err != nil { + // Create a new instance for each file to avoid race conditions + // when the callback holds a reference to the resource + newResource := newProtoMessage(resource) + if err := protojson.Unmarshal(data, newResource); err != nil { return err } - if err := loadFn(resource); err != nil { + if err := loadFn(newResource); err != nil { if errors.Is(err, schema.ErrGRPCAlreadyExists) { - return nil + continue } return err } } return nil -} \ No newline at end of file +} + +// newProtoMessage creates a new instance of the same type as the template. +func newProtoMessage[T proto.Message](template T) T { + v := reflect.New(reflect.TypeOf(template).Elem()).Interface().(T) + return v +} diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json b/pkg/test/replicated/measure/testdata/groups/exception.json index d57fbef64..fadbd5a5b 100644 --- a/pkg/test/replicated/measure/testdata/groups/exception.json +++ b/pkg/test/replicated/measure/testdata/groups/exception.json @@ -12,8 +12,7 @@ "ttl": { "unit": "UNIT_DAY", "num": 7 - }, - "replicas": 2 + } }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/pkg/test/replicated/measure/testdata/groups/index_mode.json b/pkg/test/replicated/measure/testdata/groups/index_mode.json index 009222d30..fb0d9b4b2 100644 --- a/pkg/test/replicated/measure/testdata/groups/index_mode.json +++ b/pkg/test/replicated/measure/testdata/groups/index_mode.json @@ -12,8 +12,7 @@ "ttl": { "unit": "UNIT_DAY", "num": 7 - }, - "replicas": 2 + } }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json b/pkg/test/replicated/measure/testdata/groups/replicated.json similarity index 89% copy from pkg/test/replicated/measure/testdata/groups/exception.json copy to pkg/test/replicated/measure/testdata/groups/replicated.json index d57fbef64..98c30b4d8 100644 --- a/pkg/test/replicated/measure/testdata/groups/exception.json +++ b/pkg/test/replicated/measure/testdata/groups/replicated.json @@ -1,6 +1,6 @@ { "metadata": { - "name": "exception" + "name": "replicated_group" }, "catalog": "CATALOG_MEASURE", "resource_opts": { @@ -16,4 +16,4 @@ "replicas": 2 }, "updated_at": "2021-04-15T01:30:15.01Z" -} \ No newline at end of file +} diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json b/pkg/test/replicated/measure/testdata/groups/sw_spec.json similarity index 86% copy from pkg/test/replicated/measure/testdata/groups/exception.json copy to pkg/test/replicated/measure/testdata/groups/sw_spec.json index d57fbef64..47d9d404e 100644 --- a/pkg/test/replicated/measure/testdata/groups/exception.json +++ b/pkg/test/replicated/measure/testdata/groups/sw_spec.json @@ -1,6 +1,6 @@ { "metadata": { - "name": "exception" + "name": "sw_spec" }, "catalog": "CATALOG_MEASURE", "resource_opts": { @@ -13,7 +13,7 @@ "unit": "UNIT_DAY", "num": 7 }, - "replicas": 2 + "replicas": 1 }, "updated_at": "2021-04-15T01:30:15.01Z" -} \ No newline at end of file +} diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json b/pkg/test/replicated/measure/testdata/groups/sw_spec2.json similarity index 86% copy from pkg/test/replicated/measure/testdata/groups/exception.json copy to pkg/test/replicated/measure/testdata/groups/sw_spec2.json index d57fbef64..90473439a 100644 --- a/pkg/test/replicated/measure/testdata/groups/exception.json +++ b/pkg/test/replicated/measure/testdata/groups/sw_spec2.json @@ -1,6 +1,6 @@ { "metadata": { - "name": "exception" + "name": "sw_spec2" }, "catalog": "CATALOG_MEASURE", "resource_opts": { @@ -13,7 +13,7 @@ "unit": "UNIT_DAY", "num": 7 }, - "replicas": 2 + "replicas": 1 }, "updated_at": "2021-04-15T01:30:15.01Z" -} \ No newline at end of file +} diff --git a/pkg/test/replicated/measure/testdata/groups/exception.json b/pkg/test/replicated/measure/testdata/groups/updated.json similarity index 86% copy from pkg/test/replicated/measure/testdata/groups/exception.json copy to pkg/test/replicated/measure/testdata/groups/updated.json index d57fbef64..e2cd972d8 100644 --- a/pkg/test/replicated/measure/testdata/groups/exception.json +++ b/pkg/test/replicated/measure/testdata/groups/updated.json @@ -1,6 +1,6 @@ { "metadata": { - "name": "exception" + "name": "sw_updated" }, "catalog": "CATALOG_MEASURE", "resource_opts": { @@ -13,7 +13,7 @@ "unit": "UNIT_DAY", "num": 7 }, - "replicas": 2 + "replicas": 1 }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/pkg/test/replicated/measure/testdata/index_rule_bindings/service_traffic_replicated.json b/pkg/test/replicated/measure/testdata/index_rule_bindings/service_traffic_replicated.json new file mode 100644 index 000000000..c75ff152c --- /dev/null +++ b/pkg/test/replicated/measure/testdata/index_rule_bindings/service_traffic_replicated.json @@ -0,0 +1,17 @@ +{ + "metadata": { + "name": "service_traffic_rule_binding", + "group": "replicated_group" + }, + "rules": [ + "service_id", + "layer" + ], + "subject":{ + "catalog": "CATALOG_MEASURE", + "name": "service_traffic" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/stream/etcd.go b/pkg/test/replicated/stream/etcd.go index 5eabc0ee5..70e6edb18 100644 --- a/pkg/test/replicated/stream/etcd.go +++ b/pkg/test/replicated/stream/etcd.go @@ -21,86 +21,149 @@ package replicatedstream import ( "context" "embed" + "encoding/json" + "errors" "path" - "github.com/pkg/errors" "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/logger" ) const ( - groupDir = "testdata" streamDir = "testdata/streams" indexRuleDir = "testdata/index_rules" indexRuleBindingDir = "testdata/index_rule_bindings" ) -//go:embed testdata/* -var store embed.FS - -// PreloadSchema loads schemas from files in the booting process. -func PreloadSchema(ctx context.Context, e schema.Registry) error { - return preloadSchemaWithFuncs(ctx, e, - func(ctx context.Context, e schema.Registry) error { - return loadSchema(groupDir, &commonv1.Group{}, func(group *commonv1.Group) error { - return e.CreateGroup(ctx, group) - }) - }, - func(ctx context.Context, e schema.Registry) error { - return loadSchema(streamDir, &databasev1.Stream{}, func(stream *databasev1.Stream) error { - _, innerErr := e.CreateStream(ctx, stream) - return innerErr - }) - }, - func(ctx context.Context, e schema.Registry) error { - return loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error { - return e.CreateIndexRule(ctx, indexRule) - }) - }, - func(ctx context.Context, e schema.Registry) error { - return loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding *databasev1.IndexRuleBinding) error { - return e.CreateIndexRuleBinding(ctx, indexRuleBinding) - }) - }, - ) -} +var ( + //go:embed testdata/index_rules/*.json + indexRuleStore embed.FS + //go:embed testdata/index_rule_bindings/*.json + indexRuleBindingStore embed.FS + //go:embed testdata/streams/*.json + streamStore embed.FS + //go:embed testdata/group.json + groupJSON string + //go:embed testdata/group_with_stages.json + groupWithStagesJSON string +) -func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders ...func(context.Context, schema.Registry) error) error { - for _, loader := range loaders { - if err := loader(ctx, e); err != nil { - return errors.WithStack(err) +// loadSchemas loads streams, index rules, and index rule bindings. +func loadSchemas(ctx context.Context, e schema.Registry) error { + streams, err := streamStore.ReadDir(streamDir) + if err != nil { + return err + } + var data []byte + for _, entry := range streams { + data, err = streamStore.ReadFile(path.Join(streamDir, entry.Name())) + if err != nil { + return err + } + var stream databasev1.Stream + err = protojson.Unmarshal(data, &stream) + if err != nil { + return err + } + if _, innerErr := e.CreateStream(ctx, &stream); innerErr != nil { + return innerErr } } - return nil -} -func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource T) error) error { - entries, err := store.ReadDir(dir) + entries, err := indexRuleStore.ReadDir(indexRuleDir) if err != nil { return err } for _, entry := range entries { - if entry.IsDir() { - continue + data, err = indexRuleStore.ReadFile(path.Join(indexRuleDir, entry.Name())) + if err != nil { + return err } - data, err := store.ReadFile(path.Join(dir, entry.Name())) + var idxRule databasev1.IndexRule + err = protojson.Unmarshal(data, &idxRule) if err != nil { return err } - resource.ProtoReflect().Descriptor().RequiredNumbers() - if err := protojson.Unmarshal(data, resource); err != nil { + if innerErr := e.CreateIndexRule(ctx, &idxRule); innerErr != nil { + return innerErr + } + } + indexRulesBindings, err := indexRuleBindingStore.ReadDir(indexRuleBindingDir) + if err != nil { + return err + } + for _, entry := range indexRulesBindings { + data, err = indexRuleBindingStore.ReadFile(path.Join(indexRuleBindingDir, entry.Name())) + if err != nil { return err } - if err := loadFn(resource); err != nil { - if errors.Is(err, schema.ErrGRPCAlreadyExists) { - return nil - } + var idxRuleBinding databasev1.IndexRuleBinding + err = protojson.Unmarshal(data, &idxRuleBinding) + if err != nil { return err } + if innerErr := e.CreateIndexRuleBinding(ctx, &idxRuleBinding); innerErr != nil { + return innerErr + } } + return nil } + +// LoadSchemaWithStages loads schemas from files, including group stages. +func LoadSchemaWithStages(ctx context.Context, e schema.Registry) error { + if e == nil { + return nil + } + var rawGroups []json.RawMessage + if err := json.Unmarshal([]byte(groupWithStagesJSON), &rawGroups); err != nil { + return err + } + for _, raw := range rawGroups { + g := &commonv1.Group{} + if err := protojson.Unmarshal(raw, g); err != nil { + return err + } + _, err := e.GetGroup(ctx, g.Metadata.Name) + if !errors.Is(err, schema.ErrGRPCResourceNotFound) { + logger.Infof("group %s already exists", g.Metadata.Name) + return nil + } + if innerErr := e.CreateGroup(ctx, g); innerErr != nil { + return innerErr + } + } + return loadSchemas(ctx, e) +} + +// PreloadSchema loads schemas from files in the booting process. +// This version loads group without stages. +func PreloadSchema(ctx context.Context, e schema.Registry) error { + if e == nil { + return nil + } + var rawGroups []json.RawMessage + if err := json.Unmarshal([]byte(groupJSON), &rawGroups); err != nil { + return err + } + for _, raw := range rawGroups { + g := &commonv1.Group{} + if err := protojson.Unmarshal(raw, g); err != nil { + return err + } + _, err := e.GetGroup(ctx, g.Metadata.Name) + if !errors.Is(err, schema.ErrGRPCResourceNotFound) { + logger.Infof("group %s already exists", g.Metadata.Name) + return nil + } + if innerErr := e.CreateGroup(ctx, g); innerErr != nil { + return innerErr + } + } + + return loadSchemas(ctx, e) +} diff --git a/pkg/test/replicated/trace/etcd.go b/pkg/test/replicated/trace/etcd.go index 2ab2222e2..adc4c2463 100644 --- a/pkg/test/replicated/trace/etcd.go +++ b/pkg/test/replicated/trace/etcd.go @@ -22,6 +22,7 @@ import ( "context" "embed" "path" + "reflect" "github.com/pkg/errors" "google.golang.org/protobuf/encoding/protojson" @@ -100,16 +101,24 @@ func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource T) if err != nil { return err } - resource.ProtoReflect().Descriptor().RequiredNumbers() - if err := protojson.Unmarshal(data, resource); err != nil { + // Create a new instance for each file to avoid race conditions + // when the callback holds a reference to the resource + newResource := newProtoMessage(resource) + if err := protojson.Unmarshal(data, newResource); err != nil { return err } - if err := loadFn(resource); err != nil { + if err := loadFn(newResource); err != nil { if errors.Is(err, schema.ErrGRPCAlreadyExists) { - return nil + continue } return err } } return nil } + +// newProtoMessage creates a new instance of the same type as the template. +func newProtoMessage[T proto.Message](template T) T { + v := reflect.New(reflect.TypeOf(template).Elem()).Interface().(T) + return v +} diff --git a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw.json b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw.json new file mode 100644 index 000000000..7955985b2 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw.json @@ -0,0 +1,17 @@ +{ + "metadata": { + "name": "sw-index-rule-binding", + "group": "test-trace-group" + }, + "rules": [ + "duration", + "timestamp" + ], + "subject": { + "catalog": "CATALOG_TRACE", + "name": "sw" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec.json b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec.json new file mode 100644 index 000000000..810c1612e --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "sw-spec-index-rule-binding", + "group": "test-trace-spec" + }, + "rules": ["duration", "timestamp"], + "subject": { + "catalog": "CATALOG_TRACE", + "name": "sw" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec2.json b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec2.json new file mode 100644 index 000000000..4daee2a9f --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_spec2.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "sw-spec2-index-rule-binding", + "group": "test-trace-spec2" + }, + "rules": ["duration", "timestamp"], + "subject": { + "catalog": "CATALOG_TRACE", + "name": "sw" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_updated.json b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_updated.json new file mode 100644 index 000000000..c611a4903 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/sw_updated.json @@ -0,0 +1,17 @@ +{ + "metadata": { + "name": "sw-updated-index-rule-binding", + "group": "test-trace-updated" + }, + "rules": [ + "duration", + "timestamp" + ], + "subject": { + "catalog": "CATALOG_TRACE", + "name": "sw" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rule_bindings/zipkin.json b/pkg/test/replicated/trace/testdata/index_rule_bindings/zipkin.json new file mode 100644 index 000000000..cf03fabbc --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rule_bindings/zipkin.json @@ -0,0 +1,16 @@ +{ + "metadata": { + "name": "zipkin-index-rule-binding", + "group": "zipkinTrace" + }, + "rules": [ + "zipkin-timestamp" + ], + "subject": { + "catalog": "CATALOG_TRACE", + "name": "zipkin" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rules/duration.json b/pkg/test/replicated/trace/testdata/index_rules/duration.json new file mode 100644 index 000000000..c9cb393ab --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/duration.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "duration", + "group": "test-trace-group" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "duration" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rules/duration_spec.json b/pkg/test/replicated/trace/testdata/index_rules/duration_spec.json new file mode 100644 index 000000000..a89781ee6 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/duration_spec.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "duration", + "group": "test-trace-spec" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "duration" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rules/duration_spec2.json b/pkg/test/replicated/trace/testdata/index_rules/duration_spec2.json new file mode 100644 index 000000000..3d3240f35 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/duration_spec2.json @@ -0,0 +1,9 @@ +{ + "metadata": { + "name": "duration", + "group": "test-trace-spec2" + }, + "tags": ["service_id", "service_instance_id", "state", "duration"], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/trace/testdata/index_rules/duration_updated.json b/pkg/test/replicated/trace/testdata/index_rules/duration_updated.json new file mode 100644 index 000000000..d31796ad6 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/duration_updated.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "duration", + "group": "test-trace-updated" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "duration" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rules/timestamp.json b/pkg/test/replicated/trace/testdata/index_rules/timestamp.json new file mode 100644 index 000000000..be83e872e --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "timestamp", + "group": "test-trace-group" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "timestamp" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec.json b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec.json new file mode 100644 index 000000000..ba1a0e27d --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "timestamp", + "group": "test-trace-spec" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "timestamp" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec2.json b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec2.json new file mode 100644 index 000000000..45c896307 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp_spec2.json @@ -0,0 +1,9 @@ +{ + "metadata": { + "name": "timestamp", + "group": "test-trace-spec2" + }, + "tags": ["service_id", "service_instance_id", "state", "timestamp"], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/trace/testdata/index_rules/timestamp_updated.json b/pkg/test/replicated/trace/testdata/index_rules/timestamp_updated.json new file mode 100644 index 000000000..2b947fa33 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/timestamp_updated.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "timestamp", + "group": "test-trace-updated" + }, + "tags": [ + "service_id", + "service_instance_id", + "state", + "timestamp" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/index_rules/zipkin-timestamp.json b/pkg/test/replicated/trace/testdata/index_rules/zipkin-timestamp.json new file mode 100644 index 000000000..98b483770 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/index_rules/zipkin-timestamp.json @@ -0,0 +1,14 @@ +{ + "metadata": { + "name": "zipkin-timestamp", + "group": "zipkinTrace" + }, + "tags": [ + "local_endpoint_service_name", + "operation_name", + "kind", + "timestamp" + ], + "type": "TYPE_TREE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/traces/sw.json b/pkg/test/replicated/trace/testdata/traces/sw.json new file mode 100644 index 000000000..4f501b9a5 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/traces/sw.json @@ -0,0 +1,44 @@ +{ + "metadata": { + "name": "sw", + "group": "test-trace-group" + }, + "tags": [ + { + "name": "trace_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "state", + "type": "TAG_TYPE_INT" + }, + { + "name": "service_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "service_instance_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "endpoint_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "duration", + "type": "TAG_TYPE_INT" + }, + { + "name": "span_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "timestamp", + "type": "TAG_TYPE_TIMESTAMP" + } + ], + "trace_id_tag_name": "trace_id", + "span_id_tag_name": "span_id", + "timestamp_tag_name": "timestamp", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/traces/sw_spec.json b/pkg/test/replicated/trace/testdata/traces/sw_spec.json new file mode 100644 index 000000000..9b0841b72 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/traces/sw_spec.json @@ -0,0 +1,45 @@ +{ + "metadata": { + "name": "sw", + "group": "test-trace-spec" + }, + "tags": [ + { + "name": "trace_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "state", + "type": "TAG_TYPE_INT" + }, + { + "name": "service_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "service_instance_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "endpoint_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "duration", + "type": "TAG_TYPE_INT" + }, + { + "name": "span_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "timestamp", + "type": "TAG_TYPE_TIMESTAMP" + } + ], + "trace_id_tag_name": "trace_id", + "span_id_tag_name": "span_id", + "timestamp_tag_name": "timestamp", + "updated_at": "2021-04-15T01:30:15.01Z" +} + diff --git a/pkg/test/replicated/trace/testdata/traces/sw_spec2.json b/pkg/test/replicated/trace/testdata/traces/sw_spec2.json new file mode 100644 index 000000000..c568dce6b --- /dev/null +++ b/pkg/test/replicated/trace/testdata/traces/sw_spec2.json @@ -0,0 +1,44 @@ +{ + "metadata": { + "name": "sw", + "group": "test-trace-spec2" + }, + "tags": [ + { + "name": "trace_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "state", + "type": "TAG_TYPE_INT" + }, + { + "name": "service_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "service_instance_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "endpoint_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "duration", + "type": "TAG_TYPE_INT" + }, + { + "name": "span_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "timestamp", + "type": "TAG_TYPE_TIMESTAMP" + } + ], + "trace_id_tag_name": "trace_id", + "span_id_tag_name": "span_id", + "timestamp_tag_name": "timestamp", + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/trace/testdata/traces/sw_updated.json b/pkg/test/replicated/trace/testdata/traces/sw_updated.json new file mode 100644 index 000000000..9d55e73b0 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/traces/sw_updated.json @@ -0,0 +1,48 @@ +{ + "metadata": { + "name": "sw", + "group": "test-trace-updated" + }, + "tags": [ + { + "name": "trace_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "state", + "type": "TAG_TYPE_STRING" + }, + { + "name": "service_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "service_instance_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "endpoint_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "duration", + "type": "TAG_TYPE_INT" + }, + { + "name": "span_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "error_message", + "type": "TAG_TYPE_STRING" + }, + { + "name": "timestamp", + "type": "TAG_TYPE_TIMESTAMP" + } + ], + "trace_id_tag_name": "trace_id", + "span_id_tag_name": "span_id", + "timestamp_tag_name": "timestamp", + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/trace/testdata/traces/zipkin.json b/pkg/test/replicated/trace/testdata/traces/zipkin.json new file mode 100644 index 000000000..bc832e0bb --- /dev/null +++ b/pkg/test/replicated/trace/testdata/traces/zipkin.json @@ -0,0 +1,76 @@ +{ + "metadata": { + "name": "zipkin", + "group": "zipkinTrace" + }, + "tags": [ + { + "name": "trace_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "span_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "parent_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "operation_name", + "type": "TAG_TYPE_STRING" + }, + { + "name": "kind", + "type": "TAG_TYPE_STRING" + }, + { + "name": "duration", + "type": "TAG_TYPE_INT" + }, + { + "name": "local_endpoint_service_name", + "type": "TAG_TYPE_STRING" + }, + { + "name": "local_endpoint_ipv4", + "type": "TAG_TYPE_STRING" + }, + { + "name": "local_endpoint_port", + "type": "TAG_TYPE_INT" + }, + { + "name": "remote_endpoint_service_name", + "type": "TAG_TYPE_STRING" + }, + { + "name": "remote_endpoint_ipv4", + "type": "TAG_TYPE_STRING" + }, + { + "name": "remote_endpoint_port", + "type": "TAG_TYPE_INT" + }, + { + "name": "shared", + "type": "TAG_TYPE_INT" + }, + { + "name": "debug", + "type": "TAG_TYPE_INT" + }, + { + "name": "query", + "type": "TAG_TYPE_STRING_ARRAY" + }, + { + "name": "timestamp", + "type": "TAG_TYPE_TIMESTAMP" + } + ], + "trace_id_tag_name": "trace_id", + "span_id_tag_name": "span_id", + "timestamp_tag_name": "timestamp", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/test/integration/replication/measure_normal_replication_test.go b/test/integration/replication/measure_normal_replication_test.go index d0afa83b2..8bda48944 100644 --- a/test/integration/replication/measure_normal_replication_test.go +++ b/test/integration/replication/measure_normal_replication_test.go @@ -179,4 +179,4 @@ func isClusterStable(conn *grpc.ClientConn) bool { return false } return len(tire2Table.GetActive()) == 3 -} \ No newline at end of file +} diff --git a/test/integration/replication/replication_suite_test.go b/test/integration/replication/replication_suite_test.go index eb0570716..c8cba4d57 100644 --- a/test/integration/replication/replication_suite_test.go +++ b/test/integration/replication/replication_suite_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/credentials/insecure" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + schemapkg "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/pool" @@ -37,10 +38,11 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" - "github.com/apache/skywalking-banyandb/pkg/test/setup" + test_property "github.com/apache/skywalking-banyandb/pkg/test/property" test_replicated_measure "github.com/apache/skywalking-banyandb/pkg/test/replicated/measure" test_replicated_stream "github.com/apache/skywalking-banyandb/pkg/test/replicated/stream" test_replicated_trace "github.com/apache/skywalking-banyandb/pkg/test/replicated/trace" + "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" test_cases "github.com/apache/skywalking-banyandb/test/cases" casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" @@ -59,7 +61,6 @@ var ( liaisonAddr string dataNodeClosers []func() clusterConfig *setup.ClusterConfig - tmpDirCleanup func() ) var _ = SynchronizedBeforeSuite(func() []byte { @@ -76,13 +77,6 @@ var _ = SynchronizedBeforeSuite(func() []byte { dfWriter := setup.NewDiscoveryFileWriter(tmpDir) clusterConfig = setup.PropertyClusterConfig(dfWriter) - // Load schemas via property-based registry - setup.PreloadSchemaViaProperty(clusterConfig, - test_replicated_measure.PreloadSchema, - test_replicated_stream.PreloadSchema, - test_replicated_trace.PreloadSchema, - ) - By("Starting 3 data nodes for replication test") dataNodeClosers = make([]func(), 0, 3) @@ -91,6 +85,15 @@ var _ = SynchronizedBeforeSuite(func() []byte { dataNodeClosers = append(dataNodeClosers, closeDataNode) } + By("Loading schema via property") + setup.PreloadSchemaViaProperty(clusterConfig, + test_replicated_measure.PreloadSchema, + test_replicated_stream.PreloadSchema, + test_replicated_trace.PreloadSchema, + test_property.PreloadSchema, + ) + clusterConfig.AddLoadedKinds(schemapkg.KindStream, schemapkg.KindMeasure, schemapkg.KindTrace, schemapkg.KindProperty) + By("Starting liaison node") liaisonAddr2, closerLiaisonNode := setup.LiaisonNode(clusterConfig, "--data-node-selector", "role=data") liaisonAddr = liaisonAddr2 diff --git a/test/integration/replication/replication_test.go b/test/integration/replication/replication_test.go index e2c0f2f70..c609171b0 100644 --- a/test/integration/replication/replication_test.go +++ b/test/integration/replication/replication_test.go @@ -28,7 +28,6 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" @@ -66,14 +65,9 @@ var _ = g.Describe("Replication", func() { gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil()) gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group")) - g.By("Getting list of all nodes from etcd (includes data nodes + liaison)") - nodePath := "/" + metadata.DefaultNamespace + "/nodes" - allNodes, err2 := helpers.ListKeys(etcdEndpoint, nodePath) - gm.Expect(err2).NotTo(gm.HaveOccurred()) - - // We have: 3 data nodes + 1 liaison node = 4 nodes total - gm.Expect(len(allNodes)).To(gm.Equal(4), - "Should have 4 nodes total (3 data nodes + 1 liaison node), found %d", len(allNodes)) + g.By("Verifying cluster is stable with 3 data nodes") + gm.Expect(isClusterStable(conn)).To(gm.BeTrue(), + "Cluster should have 3 active data nodes before test") g.By("Stopping one data node") // We should have 3 data node closers in dataNodeClosers @@ -83,15 +77,11 @@ var _ = g.Describe("Replication", func() { copy(closersToStop, dataNodeClosers) closersToStop[0]() - // Wait for the cluster to stabilize - gm.Eventually(func() int { - nodes, err3 := helpers.ListKeys(etcdEndpoint, nodePath) - if err3 != nil { - return 0 - } - return len(nodes) - }, flags.EventuallyTimeout).Should(gm.Equal(3), - "Should have 3 nodes total after stopping one data node (2 data nodes + 1 liaison)") + // Wait for the cluster to stabilize (should have 3 active nodes after failure) + gm.Eventually(func() bool { + return isClusterStable(conn) + }, flags.EventuallyTimeout).Should(gm.BeTrue(), + "Cluster should have 3 active data nodes after stopping one node") g.By("Verifying data is still accessible after node failure") verifyDataContentAfterNodeFailure(conn, now) diff --git a/test/integration/replication/stream_replication_test.go b/test/integration/replication/stream_replication_test.go index 5b1485986..19d20e9bd 100644 --- a/test/integration/replication/stream_replication_test.go +++ b/test/integration/replication/stream_replication_test.go @@ -31,6 +31,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" + "github.com/apache/skywalking-banyandb/pkg/test/setup" casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" ) diff --git a/test/integration/replication/trace_replication_test.go b/test/integration/replication/trace_replication_test.go index 4734d0196..067c4cd1d 100644 --- a/test/integration/replication/trace_replication_test.go +++ b/test/integration/replication/trace_replication_test.go @@ -72,6 +72,7 @@ var _ = g.Describe("Trace Normal Mode Replication", func() { }) gm.Expect(groupErr).NotTo(gm.HaveOccurred()) gm.Expect(groupResp.GetGroup()).NotTo(gm.BeNil()) + gm.Expect(groupResp.GetGroup().GetResourceOpts()).NotTo(gm.BeNil()) gm.Expect(groupResp.GetGroup().GetResourceOpts().GetReplicas()).To(gm.Equal(uint32(2)), "test-trace-group should have replicas=2")
