This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test/replication in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 700a31cc3b5a0f3bad08524b792a732f6d9e8aad Author: Hongtao Gao <[email protected]> AuthorDate: Fri Mar 20 04:13:00 2026 +0000 feat(replicated): add replicated trace schemas Co-Authored-By: Claude <[email protected]> --- pkg/test/replicated/trace/etcd.go | 115 +++++++++++++++++++++ .../trace/testdata/groups/test-trace-group.json | 19 ++++ .../trace/testdata/groups/test-trace-spec.json | 19 ++++ .../trace/testdata/groups/test-trace-spec2.json | 19 ++++ .../trace/testdata/groups/test-trace-updated.json | 19 ++++ .../trace/testdata/groups/zipkin-trace-group.json | 19 ++++ 6 files changed, 210 insertions(+) diff --git a/pkg/test/replicated/trace/etcd.go b/pkg/test/replicated/trace/etcd.go new file mode 100644 index 000000000..2ab2222e2 --- /dev/null +++ b/pkg/test/replicated/trace/etcd.go @@ -0,0 +1,115 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package trace implements helpers to load replicated schemas for testing. +package trace + +import ( + "context" + "embed" + "path" + + "github.com/pkg/errors" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" +) + +const ( + groupDir = "testdata/groups" + groupStagesDir = "testdata/groups_stages" + traceDir = "testdata/traces" + indexRuleDir = "testdata/index_rules" + indexRuleBindingDir = "testdata/index_rule_bindings" +) + +//go:embed testdata/* +var store embed.FS + +// PreloadSchema loads schemas from files in the booting process. +func PreloadSchema(ctx context.Context, e schema.Registry) error { + return loadAllSchemas(ctx, e, groupDir) +} + +// PreloadSchemaWithStages loads group schemas with stages from files in the booting process. +func PreloadSchemaWithStages(ctx context.Context, e schema.Registry) error { + return loadAllSchemas(ctx, e, groupStagesDir) +} + +// loadAllSchemas loads all trace-related schemas from the testdata directory. +func loadAllSchemas(ctx context.Context, e schema.Registry, group string) error { + return preloadSchemaWithFuncs(ctx, e, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(group, &commonv1.Group{}, func(group *commonv1.Group) error { + return e.CreateGroup(ctx, group) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(traceDir, &databasev1.Trace{}, func(trace *databasev1.Trace) error { + _, innerErr := e.CreateTrace(ctx, trace) + return innerErr + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error { + return e.CreateIndexRule(ctx, indexRule) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding *databasev1.IndexRuleBinding) error { + return e.CreateIndexRuleBinding(ctx, indexRuleBinding) + }) + }, + ) +} + +// preloadSchemaWithFuncs extracts the common logic for loading schemas. +func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders ...func(context.Context, schema.Registry) error) error { + for _, loader := range loaders { + if err := loader(ctx, e); err != nil { + return errors.WithStack(err) + } + } + return nil +} + +func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource T) error) error { + entries, err := store.ReadDir(dir) + if err != nil { + return err + } + for _, entry := range entries { + data, err := store.ReadFile(path.Join(dir, entry.Name())) + if err != nil { + return err + } + resource.ProtoReflect().Descriptor().RequiredNumbers() + if err := protojson.Unmarshal(data, resource); err != nil { + return err + } + if err := loadFn(resource); err != nil { + if errors.Is(err, schema.ErrGRPCAlreadyExists) { + return nil + } + return err + } + } + return nil +} diff --git a/pkg/test/replicated/trace/testdata/groups/test-trace-group.json b/pkg/test/replicated/trace/testdata/groups/test-trace-group.json new file mode 100644 index 000000000..9cefad810 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/groups/test-trace-group.json @@ -0,0 +1,19 @@ +{ + "metadata": { + "name": "test-trace-group" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "replicas": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 3 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/groups/test-trace-spec.json b/pkg/test/replicated/trace/testdata/groups/test-trace-spec.json new file mode 100644 index 000000000..3f816c157 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/groups/test-trace-spec.json @@ -0,0 +1,19 @@ +{ + "metadata": { + "name": "test-trace-spec" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "replicas": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 3 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/trace/testdata/groups/test-trace-spec2.json b/pkg/test/replicated/trace/testdata/groups/test-trace-spec2.json new file mode 100644 index 000000000..c2714e607 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/groups/test-trace-spec2.json @@ -0,0 +1,19 @@ +{ + "metadata": { + "name": "test-trace-spec2" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "replicas": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 3 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} diff --git a/pkg/test/replicated/trace/testdata/groups/test-trace-updated.json b/pkg/test/replicated/trace/testdata/groups/test-trace-updated.json new file mode 100644 index 000000000..382cb2908 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/groups/test-trace-updated.json @@ -0,0 +1,19 @@ +{ + "metadata": { + "name": "test-trace-updated" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "replicas": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 3 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/replicated/trace/testdata/groups/zipkin-trace-group.json b/pkg/test/replicated/trace/testdata/groups/zipkin-trace-group.json new file mode 100644 index 000000000..132d726e5 --- /dev/null +++ b/pkg/test/replicated/trace/testdata/groups/zipkin-trace-group.json @@ -0,0 +1,19 @@ +{ + "metadata": { + "name": "zipkinTrace" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "replicas": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file
