This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c27121f43 Add `has-meta-role` flag to switch schema property should
activate or not (#1007)
c27121f43 is described below
commit c27121f43c76b3b31470e2846f68958884ef91d6
Author: mrproliu <[email protected]>
AuthorDate: Fri Mar 13 15:39:46 2026 +0800
Add `has-meta-role` flag to switch schema property should activate or not
(#1007)
* Add `has-schema-role` flag to switch schema property should activated or
not
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/metadata/service/server.go | 24 +++++---
pkg/test/setup/setup.go | 31 +++++++---
.../distributed/lifecycle/property/suite_test.go | 66 ++++++++++++++--------
3 files changed, 83 insertions(+), 38 deletions(-)
diff --git a/banyand/metadata/service/server.go
b/banyand/metadata/service/server.go
index 4dcbf1095..ee604ae92 100644
--- a/banyand/metadata/service/server.go
+++ b/banyand/metadata/service/server.go
@@ -82,6 +82,7 @@ type server struct {
listenPeerURL []string
quotaBackendBytes run.Bytes
embedded bool
+ hasMetaRole bool
}
func (s *server) Name() string {
@@ -90,7 +91,7 @@ func (s *server) Name() string {
func (s *server) Role() databasev1.Role {
needEtcd := s.schemaRegistryMode == schemaTypeEtcd ||
s.nodeDiscoveryMode == metadata.NodeDiscoveryModeEtcd
- if s.schemaRegistryMode == schemaTypeProperty || (s.embedded &&
needEtcd) {
+ if (s.embedded && needEtcd) || (s.schemaRegistryMode ==
schemaTypeProperty && s.hasMetaRole) {
return databasev1.Role_ROLE_META
}
return databasev1.Role_ROLE_UNSPECIFIED
@@ -102,6 +103,8 @@ func (s *server) FlagSet() *run.FlagSet {
"Schema registry mode: 'etcd' for etcd-based storage,
'property' for property-based storage")
fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode",
metadata.NodeDiscoveryModeNone,
"Node discovery mode: 'none' for standalone, 'etcd' for
etcd-based, 'dns' for DNS-based, 'file' for file-based")
+ fs.BoolVar(&s.hasMetaRole, "has-meta-role", true,
+ "Whether this data node runs the schema server. Only effective
in property schema registry mode.")
if s.embedded {
fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the
root path of metadata")
fs.StringVar(&s.autoCompactionMode,
"etcd-auto-compaction-mode", "periodic", "auto compaction mode: 'periodic' or
'revision'")
@@ -199,13 +202,18 @@ func (s *server) PreRun(ctx context.Context) error {
s.propServer = nil
s.repairSvc = nil
case schemaTypeProperty:
- ctx = s.enrichContextWithSchemaAddress(ctx)
- if propPreRunErr := s.propServer.PreRun(ctx); propPreRunErr !=
nil {
- return propPreRunErr
- }
- if s.repairSvc != nil {
- if repairPreRunErr := s.repairSvc.PreRun(ctx);
repairPreRunErr != nil {
- return repairPreRunErr
+ if !s.hasMetaRole {
+ s.propServer = nil
+ s.repairSvc = nil
+ } else {
+ ctx = s.enrichContextWithSchemaAddress(ctx)
+ if propPreRunErr := s.propServer.PreRun(ctx);
propPreRunErr != nil {
+ return propPreRunErr
+ }
+ if s.repairSvc != nil {
+ if repairPreRunErr := s.repairSvc.PreRun(ctx);
repairPreRunErr != nil {
+ return repairPreRunErr
+ }
}
}
if s.snapshotPipeline != nil {
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index e3c22a223..321ad5f4c 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -611,13 +611,26 @@ func CMD(flags ...string) func() {
}
}
+func hasFlagValue(flags []string, key, value string) bool {
+ for idx, f := range flags {
+ if f == key+"="+value {
+ return true
+ }
+ if f == key && idx+1 < len(flags) && flags[idx+1] == value {
+ return true
+ }
+ }
+ return false
+}
+
func startDataNode(config *ClusterConfig, dataDir string, flags ...string)
(string, string, func()) {
if config == nil {
config = defaultClusterConfig
}
isPropertyMode := config.SchemaRegistry.Mode == ModeProperty
+ runSchemaServer := isPropertyMode && !hasFlagValue(flags,
"--has-meta-role", "false")
portCount := 2
- if isPropertyMode {
+ if runSchemaServer {
portCount = 3
}
ports, err := test.AllocateFreePorts(portCount)
@@ -650,13 +663,15 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
}
if isPropertyMode {
- schemaPort := ports[2]
- schemaAddr := fmt.Sprintf("%s:%d", nodeHost, schemaPort)
- flags = append(flags,
- "--schema-server-grpc-host="+nodeHost,
- fmt.Sprintf("--schema-server-grpc-port=%d", schemaPort),
- )
- config.addSchemaServerAddr(schemaAddr)
+ if runSchemaServer {
+ schemaPort := ports[2]
+ schemaAddr := fmt.Sprintf("%s:%d", nodeHost, schemaPort)
+ flags = append(flags,
+ "--schema-server-grpc-host="+nodeHost,
+ fmt.Sprintf("--schema-server-grpc-port=%d",
schemaPort),
+ )
+ config.addSchemaServerAddr(schemaAddr)
+ }
} else {
flags = append(flags, "--etcd-endpoints", config.EtcdEndpoint)
}
diff --git a/test/integration/distributed/lifecycle/property/suite_test.go
b/test/integration/distributed/lifecycle/property/suite_test.go
index d977dec8b..d43287624 100644
--- a/test/integration/distributed/lifecycle/property/suite_test.go
+++ b/test/integration/distributed/lifecycle/property/suite_test.go
@@ -18,6 +18,7 @@
package property_test
import (
+ "context"
"fmt"
"sort"
"testing"
@@ -25,8 +26,12 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
@@ -50,6 +55,7 @@ func init() {
"--measure-flush-timeout", "0s",
"--stream-flush-timeout", "0s", "--trace-flush-timeout", "0s")
By("Starting warm data node")
_, destDir, closeDataNode1 :=
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=warm",
+ "--has-meta-role=false",
"--measure-flush-timeout", "0s",
"--stream-flush-timeout", "0s", "--trace-flush-timeout", "0s")
setup.PreloadSchemaViaProperty(config,
test_stream.LoadSchemaWithStages, test_measure.LoadSchemaWithStages,
test_trace.PreloadSchemaWithStages,
test_property.PreloadSchema)
@@ -61,19 +67,11 @@ func init() {
groups := nodeGroups[schemaAddr]
By(fmt.Sprintf("Schema server %s groups(%d): %v",
schemaAddr, len(groups), groups))
}
- if len(schemaAddrs) >= 2 {
- baseAddr := schemaAddrs[0]
- for _, schemaAddr := range schemaAddrs[1:] {
- onlyInBase := diffGroups(nodeGroups[baseAddr],
nodeGroups[schemaAddr])
- onlyInCurrent :=
diffGroups(nodeGroups[schemaAddr], nodeGroups[baseAddr])
- if len(onlyInBase) > 0 || len(onlyInCurrent) >
0 {
- By(fmt.Sprintf("Group snapshot
mismatch: onlyIn(%s)=%v onlyIn(%s)=%v", baseAddr, onlyInBase, schemaAddr,
onlyInCurrent))
- }
- }
- }
config.AddLoadedKinds(schema.KindStream, schema.KindMeasure,
schema.KindTrace)
By("Starting liaison node")
liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config,
"--data-node-selector", "type=hot")
+ By("Verifying cluster state: hot node has ROLE_META, warm node
does not")
+ verifyClusterNodeRoles(liaisonAddr)
By("Initializing test cases with 10 days before")
ns := timestamp.NowMilli().UnixNano()
now := time.Unix(0, ns-ns%int64(time.Minute))
@@ -101,19 +99,43 @@ func init() {
}
}
-func diffGroups(left, right []string) []string {
- rightSet := make(map[string]struct{}, len(right))
- for _, groupName := range right {
- rightSet[groupName] = struct{}{}
- }
- result := make([]string, 0)
- for _, groupName := range left {
- if _, exists := rightSet[groupName]; exists {
- continue
+func verifyClusterNodeRoles(liaisonAddr string) {
+ conn, connErr := grpchelper.Conn(liaisonAddr, 10*time.Second,
+ grpclib.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(connErr).NotTo(HaveOccurred())
+ defer func() { _ = conn.Close() }()
+ clusterClient := databasev1.NewClusterStateServiceClient(conn)
+ Eventually(func(g Gomega) {
+ ctx, cancel := context.WithTimeout(context.Background(),
10*time.Second)
+ defer cancel()
+ state, stateErr := clusterClient.GetClusterState(ctx,
&databasev1.GetClusterStateRequest{})
+ g.Expect(stateErr).NotTo(HaveOccurred())
+ tire2 := state.GetRouteTables()["tire2"]
+ g.Expect(tire2).NotTo(BeNil(), "tire2 route table not found")
+ foundHot := false
+ foundWarm := false
+ for _, node := range tire2.GetRegistered() {
+ labels := node.GetLabels()
+ hasMetaRole := false
+ for _, role := range node.GetRoles() {
+ if role == databasev1.Role_ROLE_META {
+ hasMetaRole = true
+ break
+ }
+ }
+ if labels["type"] == "hot" {
+ foundHot = true
+ g.Expect(hasMetaRole).To(BeTrue(),
+ fmt.Sprintf("hot node %s should have
ROLE_META", node.GetMetadata().GetName()))
+ } else if labels["type"] == "warm" {
+ foundWarm = true
+ g.Expect(hasMetaRole).To(BeFalse(),
+ fmt.Sprintf("warm node %s should NOT
have ROLE_META", node.GetMetadata().GetName()))
+ }
}
- result = append(result, groupName)
- }
- return result
+ g.Expect(foundHot).To(BeTrue(), "no hot node found in cluster
state")
+ g.Expect(foundWarm).To(BeTrue(), "no warm node found in cluster
state")
+
}).WithTimeout(flags.EventuallyTimeout).WithPolling(time.Second).Should(Succeed())
}
func TestPropertyLifecycle(t *testing.T) {