This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c27121f43 Add `has-meta-role` flag to switch schema property should 
activate or not (#1007)
c27121f43 is described below

commit c27121f43c76b3b31470e2846f68958884ef91d6
Author: mrproliu <[email protected]>
AuthorDate: Fri Mar 13 15:39:46 2026 +0800

    Add `has-meta-role` flag to switch schema property should activate or not 
(#1007)
    
    * Add `has-schema-role` flag to switch schema property should activated or 
not
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/metadata/service/server.go                 | 24 +++++---
 pkg/test/setup/setup.go                            | 31 +++++++---
 .../distributed/lifecycle/property/suite_test.go   | 66 ++++++++++++++--------
 3 files changed, 83 insertions(+), 38 deletions(-)

diff --git a/banyand/metadata/service/server.go 
b/banyand/metadata/service/server.go
index 4dcbf1095..ee604ae92 100644
--- a/banyand/metadata/service/server.go
+++ b/banyand/metadata/service/server.go
@@ -82,6 +82,7 @@ type server struct {
        listenPeerURL           []string
        quotaBackendBytes       run.Bytes
        embedded                bool
+       hasMetaRole             bool
 }
 
 func (s *server) Name() string {
@@ -90,7 +91,7 @@ func (s *server) Name() string {
 
 func (s *server) Role() databasev1.Role {
        needEtcd := s.schemaRegistryMode == schemaTypeEtcd || 
s.nodeDiscoveryMode == metadata.NodeDiscoveryModeEtcd
-       if s.schemaRegistryMode == schemaTypeProperty || (s.embedded && 
needEtcd) {
+       if (s.embedded && needEtcd) || (s.schemaRegistryMode == 
schemaTypeProperty && s.hasMetaRole) {
                return databasev1.Role_ROLE_META
        }
        return databasev1.Role_ROLE_UNSPECIFIED
@@ -102,6 +103,8 @@ func (s *server) FlagSet() *run.FlagSet {
                "Schema registry mode: 'etcd' for etcd-based storage, 
'property' for property-based storage")
        fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode", 
metadata.NodeDiscoveryModeNone,
                "Node discovery mode: 'none' for standalone, 'etcd' for 
etcd-based, 'dns' for DNS-based, 'file' for file-based")
+       fs.BoolVar(&s.hasMetaRole, "has-meta-role", true,
+               "Whether this data node runs the schema server. Only effective 
in property schema registry mode.")
        if s.embedded {
                fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the 
root path of metadata")
                fs.StringVar(&s.autoCompactionMode, 
"etcd-auto-compaction-mode", "periodic", "auto compaction mode: 'periodic' or 
'revision'")
@@ -199,13 +202,18 @@ func (s *server) PreRun(ctx context.Context) error {
                s.propServer = nil
                s.repairSvc = nil
        case schemaTypeProperty:
-               ctx = s.enrichContextWithSchemaAddress(ctx)
-               if propPreRunErr := s.propServer.PreRun(ctx); propPreRunErr != 
nil {
-                       return propPreRunErr
-               }
-               if s.repairSvc != nil {
-                       if repairPreRunErr := s.repairSvc.PreRun(ctx); 
repairPreRunErr != nil {
-                               return repairPreRunErr
+               if !s.hasMetaRole {
+                       s.propServer = nil
+                       s.repairSvc = nil
+               } else {
+                       ctx = s.enrichContextWithSchemaAddress(ctx)
+                       if propPreRunErr := s.propServer.PreRun(ctx); 
propPreRunErr != nil {
+                               return propPreRunErr
+                       }
+                       if s.repairSvc != nil {
+                               if repairPreRunErr := s.repairSvc.PreRun(ctx); 
repairPreRunErr != nil {
+                                       return repairPreRunErr
+                               }
                        }
                }
                if s.snapshotPipeline != nil {
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index e3c22a223..321ad5f4c 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -611,13 +611,26 @@ func CMD(flags ...string) func() {
        }
 }
 
+func hasFlagValue(flags []string, key, value string) bool {
+       for idx, f := range flags {
+               if f == key+"="+value {
+                       return true
+               }
+               if f == key && idx+1 < len(flags) && flags[idx+1] == value {
+                       return true
+               }
+       }
+       return false
+}
+
 func startDataNode(config *ClusterConfig, dataDir string, flags ...string) 
(string, string, func()) {
        if config == nil {
                config = defaultClusterConfig
        }
        isPropertyMode := config.SchemaRegistry.Mode == ModeProperty
+       runSchemaServer := isPropertyMode && !hasFlagValue(flags, 
"--has-meta-role", "false")
        portCount := 2
-       if isPropertyMode {
+       if runSchemaServer {
                portCount = 3
        }
        ports, err := test.AllocateFreePorts(portCount)
@@ -650,13 +663,15 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
        }
 
        if isPropertyMode {
-               schemaPort := ports[2]
-               schemaAddr := fmt.Sprintf("%s:%d", nodeHost, schemaPort)
-               flags = append(flags,
-                       "--schema-server-grpc-host="+nodeHost,
-                       fmt.Sprintf("--schema-server-grpc-port=%d", schemaPort),
-               )
-               config.addSchemaServerAddr(schemaAddr)
+               if runSchemaServer {
+                       schemaPort := ports[2]
+                       schemaAddr := fmt.Sprintf("%s:%d", nodeHost, schemaPort)
+                       flags = append(flags,
+                               "--schema-server-grpc-host="+nodeHost,
+                               fmt.Sprintf("--schema-server-grpc-port=%d", 
schemaPort),
+                       )
+                       config.addSchemaServerAddr(schemaAddr)
+               }
        } else {
                flags = append(flags, "--etcd-endpoints", config.EtcdEndpoint)
        }
diff --git a/test/integration/distributed/lifecycle/property/suite_test.go 
b/test/integration/distributed/lifecycle/property/suite_test.go
index d977dec8b..d43287624 100644
--- a/test/integration/distributed/lifecycle/property/suite_test.go
+++ b/test/integration/distributed/lifecycle/property/suite_test.go
@@ -18,6 +18,7 @@
 package property_test
 
 import (
+       "context"
        "fmt"
        "sort"
        "testing"
@@ -25,8 +26,12 @@ import (
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
@@ -50,6 +55,7 @@ func init() {
                        "--measure-flush-timeout", "0s", 
"--stream-flush-timeout", "0s", "--trace-flush-timeout", "0s")
                By("Starting warm data node")
                _, destDir, closeDataNode1 := 
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=warm",
+                       "--has-meta-role=false",
                        "--measure-flush-timeout", "0s", 
"--stream-flush-timeout", "0s", "--trace-flush-timeout", "0s")
                setup.PreloadSchemaViaProperty(config, 
test_stream.LoadSchemaWithStages, test_measure.LoadSchemaWithStages,
                        test_trace.PreloadSchemaWithStages, 
test_property.PreloadSchema)
@@ -61,19 +67,11 @@ func init() {
                        groups := nodeGroups[schemaAddr]
                        By(fmt.Sprintf("Schema server %s groups(%d): %v", 
schemaAddr, len(groups), groups))
                }
-               if len(schemaAddrs) >= 2 {
-                       baseAddr := schemaAddrs[0]
-                       for _, schemaAddr := range schemaAddrs[1:] {
-                               onlyInBase := diffGroups(nodeGroups[baseAddr], 
nodeGroups[schemaAddr])
-                               onlyInCurrent := 
diffGroups(nodeGroups[schemaAddr], nodeGroups[baseAddr])
-                               if len(onlyInBase) > 0 || len(onlyInCurrent) > 
0 {
-                                       By(fmt.Sprintf("Group snapshot 
mismatch: onlyIn(%s)=%v onlyIn(%s)=%v", baseAddr, onlyInBase, schemaAddr, 
onlyInCurrent))
-                               }
-                       }
-               }
                config.AddLoadedKinds(schema.KindStream, schema.KindMeasure, 
schema.KindTrace)
                By("Starting liaison node")
                liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config, 
"--data-node-selector", "type=hot")
+               By("Verifying cluster state: hot node has ROLE_META, warm node 
does not")
+               verifyClusterNodeRoles(liaisonAddr)
                By("Initializing test cases with 10 days before")
                ns := timestamp.NowMilli().UnixNano()
                now := time.Unix(0, ns-ns%int64(time.Minute))
@@ -101,19 +99,43 @@ func init() {
        }
 }
 
-func diffGroups(left, right []string) []string {
-       rightSet := make(map[string]struct{}, len(right))
-       for _, groupName := range right {
-               rightSet[groupName] = struct{}{}
-       }
-       result := make([]string, 0)
-       for _, groupName := range left {
-               if _, exists := rightSet[groupName]; exists {
-                       continue
+func verifyClusterNodeRoles(liaisonAddr string) {
+       conn, connErr := grpchelper.Conn(liaisonAddr, 10*time.Second,
+               grpclib.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(connErr).NotTo(HaveOccurred())
+       defer func() { _ = conn.Close() }()
+       clusterClient := databasev1.NewClusterStateServiceClient(conn)
+       Eventually(func(g Gomega) {
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
+               state, stateErr := clusterClient.GetClusterState(ctx, 
&databasev1.GetClusterStateRequest{})
+               g.Expect(stateErr).NotTo(HaveOccurred())
+               tire2 := state.GetRouteTables()["tire2"]
+               g.Expect(tire2).NotTo(BeNil(), "tire2 route table not found")
+               foundHot := false
+               foundWarm := false
+               for _, node := range tire2.GetRegistered() {
+                       labels := node.GetLabels()
+                       hasMetaRole := false
+                       for _, role := range node.GetRoles() {
+                               if role == databasev1.Role_ROLE_META {
+                                       hasMetaRole = true
+                                       break
+                               }
+                       }
+                       if labels["type"] == "hot" {
+                               foundHot = true
+                               g.Expect(hasMetaRole).To(BeTrue(),
+                                       fmt.Sprintf("hot node %s should have 
ROLE_META", node.GetMetadata().GetName()))
+                       } else if labels["type"] == "warm" {
+                               foundWarm = true
+                               g.Expect(hasMetaRole).To(BeFalse(),
+                                       fmt.Sprintf("warm node %s should NOT 
have ROLE_META", node.GetMetadata().GetName()))
+                       }
                }
-               result = append(result, groupName)
-       }
-       return result
+               g.Expect(foundHot).To(BeTrue(), "no hot node found in cluster 
state")
+               g.Expect(foundWarm).To(BeTrue(), "no warm node found in cluster 
state")
+       
}).WithTimeout(flags.EventuallyTimeout).WithPolling(time.Second).Should(Succeed())
 }
 
 func TestPropertyLifecycle(t *testing.T) {

Reply via email to