This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test/replication in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6a3e19f2168c6cbda5139b0d043749a58d1c1459 Author: Hongtao Gao <[email protected]> AuthorDate: Fri Mar 20 04:24:42 2026 +0000 feat(replication): add stream normal mode replication test Add integration tests for stream replication: - Verify consistent results from replicas (deduplication) - Verify data accessibility when a node fails - Verify data recovery and handoff queue drain after node restart Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../replication/stream_replication_test.go | 163 +++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/test/integration/replication/stream_replication_test.go b/test/integration/replication/stream_replication_test.go new file mode 100644 index 000000000..5b1485986 --- /dev/null +++ b/test/integration/replication/stream_replication_test.go @@ -0,0 +1,163 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package replication_test + +import ( + "context" + "time" + + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" +) + +var _ = g.Describe("Stream Normal Mode Replication", func() { + var conn *grpc.ClientConn + + g.BeforeEach(func() { + var err error + conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + gm.Expect(err).NotTo(gm.HaveOccurred()) + }) + + g.AfterEach(func() { + if conn != nil { + gm.Expect(conn.Close()).To(gm.Succeed()) + } + }) + + g.It("should return consistent results from replicas", func() { + g.By("Verifying the stream exists in default group") + ctx := context.Background() + streamMetadata := &commonv1.Metadata{ + Name: "sw", + Group: "default", + } + + schemaClient := databasev1.NewStreamRegistryServiceClient(conn) + resp, err := schemaClient.Get(ctx, &databasev1.StreamRegistryServiceGetRequest{Metadata: streamMetadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + gm.Expect(resp.GetStream()).NotTo(gm.BeNil()) + gm.Expect(resp.GetStream().GetMetadata().GetGroup()).To(gm.Equal("default")) + + g.By("Verifying replication factor for default group") + groupClient := databasev1.NewGroupRegistryServiceClient(conn) + groupResp, groupErr := groupClient.Get(ctx, &databasev1.GroupRegistryServiceGetRequest{ + Group: "default", + }) + gm.Expect(groupErr).NotTo(gm.HaveOccurred()) + gm.Expect(groupResp.GetGroup()).NotTo(gm.BeNil()) + gm.Expect(groupResp.GetGroup().GetResourceOpts().GetReplicas()).To(gm.Equal(uint32(2)), + "default group should have replicas=2") + + g.By("Querying data multiple times to verify consistency (deduplication)") + verifyStreamDataContentWithArgs(conn, now, helpers.Args{ + Input: "all", + Duration: 25 * time.Minute, + Offset: -20 * time.Minute, + }) + }) + + g.It("should survive single node failure", func() { + g.By("Verifying the stream exists in default group") + ctx := context.Background() + streamMetadata := &commonv1.Metadata{ + Name: "sw", + Group: "default", + } + + schemaClient := databasev1.NewStreamRegistryServiceClient(conn) + resp, err := schemaClient.Get(ctx, &databasev1.StreamRegistryServiceGetRequest{Metadata: streamMetadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + gm.Expect(resp.GetStream()).NotTo(gm.BeNil()) + + g.By("Stopping one data node") + closersToStop := make([]func(), len(dataNodeClosers)) + copy(closersToStop, dataNodeClosers) + closersToStop[0]() + + g.By("Verifying data is still accessible after node failure") + verifyStreamDataContentWithArgs(conn, now, helpers.Args{ + Input: "all", + Duration: 25 * time.Minute, + Offset: -20 * time.Minute, + }) + }) + + g.It("should recover data after node restart", func() { + g.By("Verifying the stream exists in default group") + ctx := context.Background() + streamMetadata := &commonv1.Metadata{ + Name: "sw", + Group: "default", + } + + schemaClient := databasev1.NewStreamRegistryServiceClient(conn) + resp, err := schemaClient.Get(ctx, &databasev1.StreamRegistryServiceGetRequest{Metadata: streamMetadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + gm.Expect(resp.GetStream()).NotTo(gm.BeNil()) + + g.By("Stopping one data node") + closersToStop := make([]func(), len(dataNodeClosers)) + copy(closersToStop, dataNodeClosers) + closersToStop[0]() + + g.By("Verifying data is still accessible during node downtime") + verifyStreamDataContentWithArgs(conn, now, helpers.Args{ + Input: "all", + Duration: 25 * time.Minute, + Offset: -20 * time.Minute, + }) + + g.By("Restarting the data node") + closeDataNode := setup.DataNode(clusterConfig, "--node-labels", "role=data") + dataNodeClosers = append(dataNodeClosers, closeDataNode) + + g.By("Waiting for cluster to stabilize and handoff queue to drain") + gm.Eventually(func() bool { + return isClusterStable(conn) + }, flags.EventuallyTimeout).Should(gm.BeTrue(), "Cluster should stabilize after node restart") + + g.By("Verifying data is still accessible after node restart") + verifyStreamDataContentWithArgs(conn, now, helpers.Args{ + Input: "all", + Duration: 25 * time.Minute, + Offset: -20 * time.Minute, + }) + }) +}) + +func verifyStreamDataContentWithArgs(conn *grpc.ClientConn, baseTime time.Time, args helpers.Args) { + sharedContext := helpers.SharedContext{ + Connection: conn, + BaseTime: baseTime, + } + gm.Eventually(func(innerGm gm.Gomega) { + casesstreamdata.VerifyFn(innerGm, sharedContext, args) + }, flags.EventuallyTimeout).Should(gm.Succeed(), + "Should be able to query and verify stream data content") +}
