This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch test/replication
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit be23171564a99a213b910ed35eddeed79d8d3521
Author: Hongtao Gao <[email protected]>
AuthorDate: Mon Mar 23 02:54:38 2026 +0000

    fix(replication): fix cluster stability issues for replication tests
    
    Fix connection manager node eviction from active map on failover,
    add context/cancel to measure schemaRepo to prevent shutdown panics,
    handle prometheus AlreadyRegisteredError for metric re-registration,
    treat gRPC Canceled as a failover error, reduce distributed query
    timeout from 30s to 5s, and stabilize test node restart by reusing
    persisted data directories.
---
 banyand/measure/metadata.go                        |  20 +-
 banyand/measure/svc_data.go                        |  14 +-
 banyand/property/gossip/service.go                 |   4 +-
 pkg/grpchelper/connmanager.go                      |  26 +-
 pkg/grpchelper/retry.go                            |   4 +-
 pkg/meter/prom/prom.go                             |  51 ++-
 .../logical/stream/stream_plan_distributed.go      |   2 +-
 pkg/query/logical/trace/trace_plan_distributed.go  |   2 +-
 pkg/test/setup/setup.go                            |  10 +-
 test/cases/measure/data/data.go                    |   3 +-
 test/cases/stream/data/data.go                     |  12 +-
 test/cases/trace/data/data.go                      |  11 +-
 test/cases/trace/data/input/all.yml                |  12 +-
 test/cases/trace/data/want/all.yml                 | 452 +++------------------
 .../replication/measure_normal_replication_test.go |  16 +-
 .../replication/replication_suite_test.go          |  49 ++-
 test/integration/replication/replication_test.go   |  20 +-
 .../replication/stream_replication_test.go         |  13 +-
 .../replication/trace_replication_test.go          |  13 +-
 19 files changed, 231 insertions(+), 503 deletions(-)

diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 1517b8e68..daf140bd9 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -82,6 +82,8 @@ type schemaRepo struct {
        metadata         metadata.Repo
        pipeline         queue.Client
        l                *logger.Logger
+       ctx              context.Context
+       cancel           context.CancelFunc
        closingGroups    map[string]struct{}
        topNProcessorMap sync.Map
        nodeID           string
@@ -91,6 +93,7 @@ type schemaRepo struct {
 }
 
 func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, 
nodeID string) *schemaRepo {
+       ctx, cancel := context.WithCancel(context.Background())
        sr := &schemaRepo{
                path:          path,
                l:             svc.l,
@@ -99,6 +102,8 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
                nodeID:        nodeID,
                closingGroups: make(map[string]struct{}),
                role:          databasev1.Role_ROLE_DATA,
+               ctx:           ctx,
+               cancel:        cancel,
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
@@ -111,6 +116,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
 }
 
 func newLiaisonSchemaRepo(path string, svc *liaison, measureDataNodeRegistry 
grpc.NodeRegistry, pipeline queue.Client) *schemaRepo {
+       ctx, cancel := context.WithCancel(context.Background())
        sr := &schemaRepo{
                path:          path,
                l:             svc.l,
@@ -118,6 +124,8 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
measureDataNodeRegistry grp
                pipeline:      pipeline,
                closingGroups: make(map[string]struct{}),
                role:          databasev1.Role_ROLE_LIAISON,
+               ctx:           ctx,
+               cancel:        cancel,
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
@@ -188,7 +196,7 @@ func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, 
[]int64) {
        }
        groupNames, revs := sr.Repository.Init(schema.KindMeasure)
        for i := range groupNames {
-               sr.createTopNResultMeasure(context.Background(), 
sr.metadata.MeasureRegistry(), groupNames[i])
+               sr.createTopNResultMeasure(sr.ctx, 
sr.metadata.MeasureRegistry(), groupNames[i])
        }
        return true, revs
 }
@@ -209,7 +217,7 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata 
schema.Metadata) {
                        Kind:     resourceSchema.EventKindGroup,
                        Metadata: g,
                })
-               sr.createTopNResultMeasure(context.Background(), 
sr.metadata.MeasureRegistry(), g.Metadata.Name)
+               sr.createTopNResultMeasure(sr.ctx, 
sr.metadata.MeasureRegistry(), g.Metadata.Name)
        case schema.KindMeasure:
                m := metadata.Spec.(*databasev1.Measure)
                if err := validate.Measure(m); err != nil {
@@ -327,6 +335,7 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
 }
 
 func (sr *schemaRepo) Close() {
+       sr.cancel()
        var err error
        sr.topNProcessorMap.Range(func(_, val any) bool {
                manager := val.(*topNProcessorManager)
@@ -578,9 +587,10 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx 
context.Context, measureSchema
        backoffStrategy := backoff.NewExponentialBackOff()
        backoffStrategy.MaxElapsedTime = 0 // never stop until topN measure has 
been created
 
-       err := backoff.Retry(operation, backoffStrategy)
-       if err != nil {
-               logger.Panicf("fail to create topN measure %s: %v", md, err)
+       if err := backoff.Retry(operation, backoff.WithContext(backoffStrategy, 
ctx)); err != nil {
+               if !errors.Is(err, context.Canceled) {
+                       logger.Panicf("fail to create topN measure %s: %v", md, 
err)
+               }
        }
 }
 
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index 21ff035c5..065ff220f 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -429,12 +429,16 @@ func NewReadonlyDataSVC(metadata metadata.Repo, omr 
observability.MetricsRegistr
 }
 
 func newDataSchemaRepo(path string, svc *dataSVC, nodeLabels 
map[string]string, nodeID string) *schemaRepo {
+       ctx, cancel := context.WithCancel(context.Background())
        sr := &schemaRepo{
-               path:     path,
-               l:        svc.l,
-               metadata: svc.metadata,
-               nodeID:   nodeID,
-               role:     databasev1.Role_ROLE_DATA,
+               path:          path,
+               l:             svc.l,
+               metadata:      svc.metadata,
+               nodeID:        nodeID,
+               role:          databasev1.Role_ROLE_DATA,
+               closingGroups: make(map[string]struct{}),
+               ctx:           ctx,
+               cancel:        cancel,
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
diff --git a/banyand/property/gossip/service.go 
b/banyand/property/gossip/service.go
index 91000c0f0..5ce1339ea 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -143,11 +143,11 @@ func (s *service) PreRun(ctx context.Context) error {
        s.serverMetrics = newServerMetrics(s.omr.With(metricsScope))
        if s.metadata != nil {
                s.sel.OnInit([]schema.Kind{schema.KindGroup})
-               s.metadata.RegisterHandler(s.prefix+"-nodes", schema.KindNode, 
s)
-               s.metadata.RegisterHandler(s.prefix+"-groups", 
schema.KindGroup, s)
                if err := s.initTracing(ctx); err != nil {
                        s.log.Warn().Err(err).Msg("failed to init internal 
trace stream")
                }
+               s.metadata.RegisterHandler(s.prefix+"-nodes", schema.KindNode, 
s)
+               s.metadata.RegisterHandler(s.prefix+"-groups", 
schema.KindGroup, s)
        }
        s.protocolHandler = newProtocolHandler(s)
        go s.protocolHandler.processPropagation()
diff --git a/pkg/grpchelper/connmanager.go b/pkg/grpchelper/connmanager.go
index 4e9793679..0ed31da27 100644
--- a/pkg/grpchelper/connmanager.go
+++ b/pkg/grpchelper/connmanager.go
@@ -223,7 +223,7 @@ func (m *ConnManager[C]) OnDelete(node *databasev1.Node) {
                                                return
                                        }
                                case <-m.closer.CloseNotify():
-                                       return
+                                               return
                                }
                                attempt++
                        }
@@ -339,10 +339,11 @@ func (m *ConnManager[C]) FailoverNode(node string) {
                }
                return
        }
-       if mn, ok := m.active[node]; ok && !m.checkHealthAndReconnect(mn.conn, 
mn.node, mn.client) {
-               _ = mn.conn.Close()
-               delete(m.active, node)
-               m.handler.OnInactive(node, mn.client)
+       mn, ok := m.active[node]
+       if !ok {
+               return
+       }
+       if !m.checkHealthAndReconnect(mn.conn, mn.node, mn.client) {
                m.log.Info().Str("status", m.dump()).Str("node", 
node).Msg("node is unhealthy in the failover flow, move it to evict queue")
        }
 }
@@ -417,6 +418,7 @@ func (m *ConnManager[C]) removeNodeIfUnhealthy(name string, 
mn *managedNode[C])
        if m.healthCheck(mn.node.String(), mn.conn) {
                return false
        }
+       m.log.Info().Str("node", name).Msg("removeNodeIfUnhealthy: node is 
unhealthy, removing from active")
        _ = mn.conn.Close()
        delete(m.active, name)
        m.handler.OnInactive(name, mn.client)
@@ -424,12 +426,13 @@ func (m *ConnManager[C]) removeNodeIfUnhealthy(name 
string, mn *managedNode[C])
 }
 
 // checkHealthAndReconnect checks if a node is healthy. If not, closes the 
conn,
-// adds to evictable, calls OnInactive, and starts a retry goroutine.
+// adds to evictable, deletes from active, calls OnInactive, and starts a 
retry goroutine.
 // Returns true if healthy.
 func (m *ConnManager[C]) checkHealthAndReconnect(conn *grpc.ClientConn, node 
*databasev1.Node, client C) bool {
        if m.healthCheck(node.String(), conn) {
                return true
        }
+       m.log.Info().Str("node", 
node.Metadata.Name).Msg("checkHealthAndReconnect: node is unhealthy, moving to 
evictable")
        _ = conn.Close()
        if !m.closer.AddRunning() {
                return false
@@ -496,6 +499,7 @@ func (m *ConnManager[C]) checkHealthAndReconnect(conn 
*grpc.ClientConn, node *da
                        attempt++
                }
        }(name, m.evictable[name])
+       delete(m.active, name)
        return false
 }
 
@@ -508,12 +512,14 @@ func (m *ConnManager[C]) healthCheck(node string, conn 
*grpc.ClientConn) bool {
                        })
                return err
        }); requestErr != nil {
-               if e := m.log.Debug(); e.Enabled() {
-                       e.Err(requestErr).Str("node", node).Msg("service 
unhealthy")
-               }
+               m.log.Info().Err(requestErr).Str("node", 
node).Msg("healthCheck: service unhealthy, RPC error")
                return false
        }
-       return resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING
+       if resp.GetStatus() != grpc_health_v1.HealthCheckResponse_SERVING {
+               m.log.Info().Str("node", node).Str("status", 
resp.GetStatus().String()).Msg("healthCheck: service not SERVING")
+               return false
+       }
+       return true
 }
 
 func (m *ConnManager[C]) dump() string {
diff --git a/pkg/grpchelper/retry.go b/pkg/grpchelper/retry.go
index 2478a4c3d..8bb6282b6 100644
--- a/pkg/grpchelper/retry.go
+++ b/pkg/grpchelper/retry.go
@@ -156,7 +156,9 @@ func IsFailoverError(err error) bool {
        if !ok {
                return false
        }
-       return code == codes.Unavailable || code == codes.DeadlineExceeded
+       // codes.Canceled with "client connection is closing" indicates the 
gRPC connection
+       // to the node was closed (node failure or keepalive timeout), not a 
client-side cancellation.
+       return code == codes.Unavailable || code == codes.DeadlineExceeded || 
code == codes.Canceled
 }
 
 // IsInternalError checks if the error is an internal server error.
diff --git a/pkg/meter/prom/prom.go b/pkg/meter/prom/prom.go
index 0ca74abec..00443bbd8 100644
--- a/pkg/meter/prom/prom.go
+++ b/pkg/meter/prom/prom.go
@@ -18,10 +18,10 @@
 package prom
 
 import (
+       "errors"
        "unsafe"
 
        "github.com/prometheus/client_golang/prometheus"
-       "github.com/prometheus/client_golang/prometheus/promauto"
 
        "github.com/apache/skywalking-banyandb/pkg/meter"
 )
@@ -42,36 +42,43 @@ func NewProvider(scope meter.Scope, reg 
prometheus.Registerer) meter.Provider {
 
 // Counter returns a prometheus counter.
 func (p *provider) Counter(name string, labels ...string) meter.Counter {
-       return &counter{
-               counter: 
promauto.With(p.reg).NewCounterVec(prometheus.CounterOpts{
-                       Name:        p.scope.GetNamespace() + "_" + name,
-                       Help:        p.scope.GetNamespace() + "_" + name,
-                       ConstLabels: convertLabels(p.scope.GetLabels()),
-               }, labels),
-       }
+       return &counter{counter: registerCollector(p.reg, 
prometheus.NewCounterVec(prometheus.CounterOpts{
+               Name:        p.scope.GetNamespace() + "_" + name,
+               Help:        p.scope.GetNamespace() + "_" + name,
+               ConstLabels: convertLabels(p.scope.GetLabels()),
+       }, labels))}
 }
 
 // Gauge returns a prometheus gauge.
 func (p *provider) Gauge(name string, labels ...string) meter.Gauge {
-       return &gauge{
-               gauge: promauto.With(p.reg).NewGaugeVec(prometheus.GaugeOpts{
-                       Name:        p.scope.GetNamespace() + "_" + name,
-                       Help:        p.scope.GetNamespace() + "_" + name,
-                       ConstLabels: convertLabels(p.scope.GetLabels()),
-               }, labels),
-       }
+       return &gauge{gauge: registerCollector(p.reg, 
prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Name:        p.scope.GetNamespace() + "_" + name,
+               Help:        p.scope.GetNamespace() + "_" + name,
+               ConstLabels: convertLabels(p.scope.GetLabels()),
+       }, labels))}
 }
 
 // Histogram returns a prometheus histogram.
 func (p *provider) Histogram(name string, buckets meter.Buckets, labels 
...string) meter.Histogram {
-       return &histogram{
-               histogram: 
promauto.With(p.reg).NewHistogramVec(prometheus.HistogramOpts{
-                       Name:        p.scope.GetNamespace() + "_" + name,
-                       Help:        p.scope.GetNamespace() + "_" + name,
-                       ConstLabels: convertLabels(p.scope.GetLabels()),
-                       Buckets:     buckets,
-               }, labels),
+       return &histogram{histogram: registerCollector(p.reg, 
prometheus.NewHistogramVec(prometheus.HistogramOpts{
+               Name:        p.scope.GetNamespace() + "_" + name,
+               Help:        p.scope.GetNamespace() + "_" + name,
+               ConstLabels: convertLabels(p.scope.GetLabels()),
+               Buckets:     buckets,
+       }, labels))}
+}
+
+func registerCollector[T prometheus.Collector](reg prometheus.Registerer, c T) 
T {
+       if regErr := reg.Register(c); regErr != nil {
+               var are prometheus.AlreadyRegisteredError
+               if errors.As(regErr, &are) {
+                       if existing, ok := are.ExistingCollector.(T); ok {
+                               return existing
+                       }
+               }
+               panic(regErr)
        }
+       return c
 }
 
 // convertLabels converts a map of labels to a prometheus.Labels.
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index b99377653..c05d92ea8 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -38,7 +38,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
-const defaultQueryTimeout = 30 * time.Second
+const defaultQueryTimeout = 5 * time.Second
 
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 
diff --git a/pkg/query/logical/trace/trace_plan_distributed.go 
b/pkg/query/logical/trace/trace_plan_distributed.go
index bedc26ae0..0dfae7bdd 100644
--- a/pkg/query/logical/trace/trace_plan_distributed.go
+++ b/pkg/query/logical/trace/trace_plan_distributed.go
@@ -39,7 +39,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
-const defaultQueryTimeout = 30 * time.Second
+const defaultQueryTimeout = 5 * time.Second
 
 var _ logical.UnresolvedPlan = (*unresolvedTraceDistributed)(nil)
 
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 7dcd4412b..2b76a42c7 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -607,7 +607,15 @@ func CMD(flags ...string) func() {
        }()
        return func() {
                closeFn()
-               wg.Wait()
+               waitCh := make(chan struct{})
+               go func() {
+                       wg.Wait()
+                       close(waitCh)
+               }()
+               select {
+               case <-waitCh:
+               case <-time.After(30 * time.Second):
+               }
        }
 }
 
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 98ddbf984..ae6e582c3 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -144,7 +144,8 @@ func verifyWithContext(ctx context.Context, innerGm 
gm.Gomega, sharedContext hel
 
 // VerifyFn verify whether the query response matches the wanted result.
 var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, 
args helpers.Args) {
-       ctx := context.Background()
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
        verifyWithContext(ctx, innerGm, sharedContext, args)
 }
 
diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go
index ee83d254a..059598727 100644
--- a/test/cases/stream/data/data.go
+++ b/test/cases/stream/data/data.go
@@ -76,7 +76,8 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
        query.TimeRange = helpers.TimeRange(args, sharedContext)
        query.Stages = args.Stages
        c := streamv1.NewStreamServiceClient(sharedContext.Connection)
-       ctx := context.Background()
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
        resp, err := c.Query(ctx, query)
        if args.WantErr {
                if err == nil {
@@ -248,7 +249,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args 
helpers.Args, yamlQuery *stream
        query, errStrs := bydbql.ParseQuery(qlQueryStr)
        innerGm.Expect(errStrs).To(gm.BeNil())
        transformer := bydbql.NewTransformer(mockRepo)
-       transform, err := transformer.Transform(context.Background(), query)
+       transformCtx, transformCancel := 
context.WithTimeout(context.Background(), 10*time.Second)
+       defer transformCancel()
+       transform, err := transformer.Transform(transformCtx, query)
        if args.WantErr && err != nil {
                return
        }
@@ -268,8 +271,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args 
helpers.Args, yamlQuery *stream
 
        // simple check the QL can be executed
        client := bydbqlv1.NewBydbQLServiceClient(conn)
-       ctx := context.Background()
-       bydbqlResp, err := client.Query(ctx, &bydbqlv1.QueryRequest{
+       qlCtx, qlCancel := context.WithTimeout(context.Background(), 
10*time.Second)
+       defer qlCancel()
+       bydbqlResp, err := client.Query(qlCtx, &bydbqlv1.QueryRequest{
                Query: qlQueryStr,
        })
        if args.WantErr {
diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go
index 0c024ad5c..b184c69f2 100644
--- a/test/cases/trace/data/data.go
+++ b/test/cases/trace/data/data.go
@@ -77,7 +77,8 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
        query.TimeRange = helpers.TimeRange(args, sharedContext)
        query.Stages = args.Stages
        c := tracev1.NewTraceServiceClient(sharedContext.Connection)
-       ctx := context.Background()
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
        resp, err := c.Query(ctx, query)
        if args.WantErr {
                if err == nil {
@@ -224,7 +225,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args 
helpers.Args, yamlQuery *tracev
        innerGm.Expect(errStrs).To(gm.BeNil())
 
        transformer := bydbql.NewTransformer(mockRepo)
-       result, err := transformer.Transform(context.Background(), parsed)
+       transformCtx, transformCancel := 
context.WithTimeout(context.Background(), 10*time.Second)
+       defer transformCancel()
+       result, err := transformer.Transform(transformCtx, parsed)
        innerGm.Expect(err).NotTo(gm.HaveOccurred())
 
        qlQuery, ok := result.QueryRequest.(*tracev1.QueryRequest)
@@ -240,7 +243,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args 
helpers.Args, yamlQuery *tracev
        innerGm.Expect(equal).To(gm.BeTrue(), "QL:\n%s\nYAML:\n%s", 
qlQuery.String(), yamlQuery.String())
 
        bydbqlClient := bydbqlv1.NewBydbQLServiceClient(conn)
-       bydbqlResp, err := bydbqlClient.Query(context.Background(), 
&bydbqlv1.QueryRequest{
+       qlCtx, qlCancel := context.WithTimeout(context.Background(), 
10*time.Second)
+       defer qlCancel()
+       bydbqlResp, err := bydbqlClient.Query(qlCtx, &bydbqlv1.QueryRequest{
                Query: qlQueryStr,
        })
        innerGm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/cases/trace/data/input/all.yml 
b/test/cases/trace/data/input/all.yml
index 597f01b27..a27a3ca40 100644
--- a/test/cases/trace/data/input/all.yml
+++ b/test/cases/trace/data/input/all.yml
@@ -17,13 +17,7 @@
 
 name: "sw"
 groups: ["test-trace-group"]
-tag_projection:
-  - "trace_id"
-  - "state"
-  - "service_id"
-  - "service_instance_id"
-  - "endpoint_id"
-  - "duration"
-  - "span_id"
-  - "timestamp"
+order_by:
+  index_rule_name: "timestamp"
+  sort: "SORT_DESC"
 limit: 100
\ No newline at end of file
diff --git a/test/cases/trace/data/want/all.yml 
b/test/cases/trace/data/want/all.yml
index b3103562b..cb8eae893 100644
--- a/test/cases/trace/data/want/all.yml
+++ b/test/cases/trace/data/want/all.yml
@@ -16,417 +16,71 @@
 # under the License.
 
 # Expected traces from sw.json and sw_mixed_traces.json written to 
test-trace-group
+# Ordered by timestamp DESC (latest spans first)
 traces:
-  - traceId: trace_001
-    spans:
+  - spans:
+    - span: trace_008_span_1
+      spanId: span_008_1
+    - span: trace_008_span_2
+      spanId: span_008_2
+    - span: trace_008_span_3
+      spanId: span_008_3
+    traceId: trace_008
+  - spans:
+    - span: trace_007_span_1
+      spanId: span_007_1
+    - span: trace_007_span_2
+      spanId: span_007_2
+    traceId: trace_007
+  - spans:
+    - span: trace_006_span_1
+      spanId: span_006_1
+    - span: trace_006_span_2
+      spanId: span_006_2
+    - span: trace_006_span_3
+      spanId: span_006_3
+    traceId: trace_006
+  - spans:
+    - span: trace_002_span_1
+      spanId: span_002_1
+    - span: trace_002_span_2
+      spanId: span_002_2
+    - span: trace_002_span_3
+      spanId: span_002_3
+    - span: trace_002_span_4
+      spanId: span_002_4
+    traceId: trace_002
+  - spans:
     - span: trace_001_span_1
       spanId: span_001_1
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_001
-      - key: state
-        value:
-          int:
-            value: 1
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_1
-      - key: endpoint_id
-        value:
-          str:
-            value: "/home_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 1000
-      - key: span_id
-        value:
-          str:
-            value: span_001_1
     - span: trace_001_span_2
       spanId: span_001_2
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_001
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_2
-      - key: endpoint_id
-        value:
-          str:
-            value: "/product_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 500
-      - key: span_id
-        value:
-          str:
-            value: span_001_2
     - span: trace_001_span_3
       spanId: span_001_3
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_001
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_1
-      - key: endpoint_id
-        value:
-          str:
-            value: "/item_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 300
-      - key: span_id
-        value:
-          str:
-            value: span_001_3
-  - traceId: trace_002
-    spans:
-    - span: trace_002_span_1
-      spanId: span_002_1
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_002
-      - key: state
-        value:
-          int:
-            value: 1
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_1
-      - key: endpoint_id
-        value:
-          str:
-            value: "/home_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 800
-      - key: span_id
-        value:
-          str:
-            value: span_002_1
-    - span: trace_002_span_2
-      spanId: span_002_2
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_002
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_3
-      - key: endpoint_id
-        value:
-          str:
-            value: "/price_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 200
-      - key: span_id
-        value:
-          str:
-            value: span_002_2
-  - traceId: trace_003
-    spans:
-    - span: trace_003_span_1
-      spanId: span_003_1
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_003
-      - key: state
-        value:
-          int:
-            value: 1
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_2
-      - key: endpoint_id
-        value:
-          str:
-            value: "/product_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 1200
-      - key: span_id
-        value:
-          str:
-            value: span_003_1
-    - span: trace_003_span_2
-      spanId: span_003_2
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_003
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_1
-      - key: endpoint_id
-        value:
-          str:
-            value: "/home_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 150
-      - key: span_id
-        value:
-          str:
-            value: span_003_2
-    - span: trace_003_span_3
-      spanId: span_003_3
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_003
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_3
-      - key: endpoint_id
-        value:
-          str:
-            value: "/price_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 400
-      - key: span_id
-        value:
-          str:
-            value: span_003_3
-  - traceId: trace_004
-    spans:
-    - span: trace_004_span_1
-      spanId: span_004_1
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_004
-      - key: state
-        value:
-          int:
-            value: 1
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_2
-      - key: endpoint_id
-        value:
-          str:
-            value: "/home_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 600
-      - key: span_id
-        value:
-          str:
-            value: span_004_1
-  - traceId: trace_005
-    spans:
+    - span: trace_001_span_4
+      spanId: span_001_4
+    - span: trace_001_span_5
+      spanId: span_001_5
+    traceId: trace_001
+  - spans:
     - span: trace_005_span_1
       spanId: span_005_1
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_005
-      - key: state
-        value:
-          int:
-            value: 1
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_2
-      - key: endpoint_id
-        value:
-          str:
-            value: "/product_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 900
-      - key: span_id
-        value:
-          str:
-            value: span_005_1
     - span: trace_005_span_2
       spanId: span_005_2
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_005
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_1
-      - key: endpoint_id
-        value:
-          str:
-            value: "/home_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 250
-      - key: span_id
-        value:
-          str:
-            value: span_005_2
     - span: trace_005_span_3
       spanId: span_005_3
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_005
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_3
-      - key: endpoint_id
-        value:
-          str:
-            value: "/price_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 350
-      - key: span_id
-        value:
-          str:
-            value: span_005_3
     - span: trace_005_span_4
       spanId: span_005_4
-      tags:
-      - key: trace_id
-        value:
-          str:
-            value: trace_005
-      - key: state
-        value:
-          int:
-            value: 0
-      - key: service_id
-        value:
-          str:
-            value: webapp_service
-      - key: service_instance_id
-        value:
-          str:
-            value: webapp_instance_1
-      - key: endpoint_id
-        value:
-          str:
-            value: "/item_endpoint"
-      - key: duration
-        value:
-          int:
-            value: 180
-      - key: span_id
-        value:
-          str:
-            value: span_005_4
\ No newline at end of file
+    traceId: trace_005
+  - spans:
+    - span: trace_004_span_1
+      spanId: span_004_1
+    traceId: trace_004
+  - spans:
+    - span: trace_003_span_1
+      spanId: span_003_1
+    - span: trace_003_span_2
+      spanId: span_003_2
+    - span: trace_003_span_3
+      spanId: span_003_3
+    traceId: trace_003
diff --git a/test/integration/replication/measure_normal_replication_test.go 
b/test/integration/replication/measure_normal_replication_test.go
index 8bda48944..d45355ef8 100644
--- a/test/integration/replication/measure_normal_replication_test.go
+++ b/test/integration/replication/measure_normal_replication_test.go
@@ -53,7 +53,8 @@ var _ = g.Describe("Measure Normal Mode Replication", func() {
 
        g.It("should return consistent results from replicas", func() {
                g.By("Verifying the measure exists in sw_metric group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                measureMetadata := &commonv1.Metadata{
                        Name:  "service_cpm_minute",
                        Group: "sw_metric",
@@ -85,7 +86,8 @@ var _ = g.Describe("Measure Normal Mode Replication", func() {
 
        g.It("should survive single node failure", func() {
                g.By("Verifying the measure exists in sw_metric group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                measureMetadata := &commonv1.Metadata{
                        Name:  "service_cpm_minute",
                        Group: "sw_metric",
@@ -111,7 +113,8 @@ var _ = g.Describe("Measure Normal Mode Replication", 
func() {
 
        g.It("should recover data after node restart", func() {
                g.By("Verifying the measure exists in sw_metric group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                measureMetadata := &commonv1.Metadata{
                        Name:  "service_cpm_minute",
                        Group: "sw_metric",
@@ -135,8 +138,8 @@ var _ = g.Describe("Measure Normal Mode Replication", 
func() {
                })
 
                g.By("Restarting the data node")
-               closeDataNode := setup.DataNode(clusterConfig, "--node-labels", 
"role=data")
-               dataNodeClosers = append(dataNodeClosers, closeDataNode)
+               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+               dataNodeClosers[0] = closeDataNode
 
                g.By("Waiting for cluster to stabilize and handoff queue to 
drain")
                gm.Eventually(func() bool {
@@ -164,7 +167,8 @@ func verifyDataContentWithArgs(conn *grpc.ClientConn, 
baseTime time.Time, args h
 }
 
 func isClusterStable(conn *grpc.ClientConn) bool {
-       ctx := context.Background()
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
        clusterClient := databasev1.NewClusterStateServiceClient(conn)
        state, err := clusterClient.GetClusterState(ctx, 
&databasev1.GetClusterStateRequest{})
        if err != nil {
diff --git a/test/integration/replication/replication_suite_test.go 
b/test/integration/replication/replication_suite_test.go
index c8cba4d57..1ac8bcc27 100644
--- a/test/integration/replication/replication_suite_test.go
+++ b/test/integration/replication/replication_suite_test.go
@@ -54,13 +54,15 @@ func TestReplication(t *testing.T) {
 }
 
 var (
-       deferFunc       func()
-       goods           []gleak.Goroutine
-       now             time.Time
-       connection      *grpc.ClientConn
-       liaisonAddr     string
-       dataNodeClosers []func()
-       clusterConfig   *setup.ClusterConfig
+       deferFunc           func()
+       goods               []gleak.Goroutine
+       now                 time.Time
+       connection          *grpc.ClientConn
+       liaisonAddr         string
+       dataNodeClosers     []func()
+       dataNodeDirs        []string
+       dataNodeDirCleanups []func()
+       clusterConfig       *setup.ClusterConfig
 )
 
 var _ = SynchronizedBeforeSuite(func() []byte {
@@ -79,9 +81,15 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 
        By("Starting 3 data nodes for replication test")
        dataNodeClosers = make([]func(), 0, 3)
+       dataNodeDirs = make([]string, 0, 3)
+       dataNodeDirCleanups = make([]func(), 0, 3)
 
        for i := 0; i < 3; i++ {
-               closeDataNode := setup.DataNode(clusterConfig, "--node-labels", 
"role=data")
+               nodeDir, nodeDirCleanup, dirErr := test.NewSpace()
+               Expect(dirErr).NotTo(HaveOccurred())
+               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
nodeDir, "--node-labels", "role=data")
+               dataNodeDirs = append(dataNodeDirs, nodeDir)
+               dataNodeDirCleanups = append(dataNodeDirCleanups, 
nodeDirCleanup)
                dataNodeClosers = append(dataNodeClosers, closeDataNode)
        }
 
@@ -134,6 +142,9 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                for _, closeDataNode := range dataNodeClosers {
                        closeDataNode()
                }
+               for _, cleanup := range dataNodeDirCleanups {
+                       cleanup()
+               }
                tmpDirCleanup()
        }
 
@@ -172,11 +183,27 @@ var _ = SynchronizedAfterSuite(func() {
        }
 }, func() {})
 
+// AfterEach restores cluster to 3 active nodes if a spec left a node stopped.
+var _ = AfterEach(func() {
+       if len(dataNodeDirs) == 0 || connection == nil {
+               return
+       }
+       if isClusterStable(connection) {
+               return
+       }
+       // Node 0 may have been stopped by a spec — restart it from its data 
directory.
+       _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+       dataNodeClosers[0] = closeDataNode
+       Eventually(func() bool {
+               return isClusterStable(connection)
+       }, flags.EventuallyTimeout).Should(BeTrue(), "Cluster should stabilize 
after restoring node 0")
+})
+
 var _ = ReportAfterSuite("Replication Suite", func(report Report) {
+       if deferFunc != nil {
+               deferFunc()
+       }
        if report.SuiteSucceeded {
-               if deferFunc != nil {
-                       deferFunc()
-               }
                Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
                Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        }
diff --git a/test/integration/replication/replication_test.go 
b/test/integration/replication/replication_test.go
index c609171b0..020a9c45f 100644
--- a/test/integration/replication/replication_test.go
+++ b/test/integration/replication/replication_test.go
@@ -53,7 +53,8 @@ var _ = g.Describe("Replication", func() {
        g.Context("with replicated_group", func() {
                g.It("should survive node failure", func() {
                        g.By("Verifying the measure exists in replicated_group")
-                       ctx := context.Background()
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 10*time.Second)
+                       defer cancel()
                        measureMetadata := &commonv1.Metadata{
                                Name:  "service_traffic",
                                Group: "replicated_group",
@@ -66,29 +67,24 @@ var _ = g.Describe("Replication", func() {
                        
gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group"))
 
                        g.By("Verifying cluster is stable with 3 data nodes")
-                       gm.Expect(isClusterStable(conn)).To(gm.BeTrue(),
+                       gm.Eventually(func() bool {
+                               return isClusterStable(conn)
+                       }, flags.EventuallyTimeout).Should(gm.BeTrue(),
                                "Cluster should have 3 active data nodes before 
test")
 
                        g.By("Stopping one data node")
-                       // We should have 3 data node closers in dataNodeClosers
-                       // Stop the first one
-                       // Create a local copy to avoid mutating the 
package-level slice
                        closersToStop := make([]func(), len(dataNodeClosers))
                        copy(closersToStop, dataNodeClosers)
                        closersToStop[0]()
 
-                       // Wait for the cluster to stabilize (should have 3 
active nodes after failure)
-                       gm.Eventually(func() bool {
-                               return isClusterStable(conn)
-                       }, flags.EventuallyTimeout).Should(gm.BeTrue(),
-                               "Cluster should have 3 active data nodes after 
stopping one node")
-
                        g.By("Verifying data is still accessible after node 
failure")
                        verifyDataContentAfterNodeFailure(conn, now)
 
                        g.By("Verifying replication factor")
                        groupClient := 
databasev1.NewGroupRegistryServiceClient(conn)
-                       groupResp, err := groupClient.Get(ctx, 
&databasev1.GroupRegistryServiceGetRequest{
+                       groupCtx, groupCancel := 
context.WithTimeout(context.Background(), 10*time.Second)
+                       defer groupCancel()
+                       groupResp, err := groupClient.Get(groupCtx, 
&databasev1.GroupRegistryServiceGetRequest{
                                Group: "replicated_group",
                        })
                        gm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/integration/replication/stream_replication_test.go 
b/test/integration/replication/stream_replication_test.go
index 19d20e9bd..7b70c18ae 100644
--- a/test/integration/replication/stream_replication_test.go
+++ b/test/integration/replication/stream_replication_test.go
@@ -53,7 +53,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() {
 
        g.It("should return consistent results from replicas", func() {
                g.By("Verifying the stream exists in default group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                streamMetadata := &commonv1.Metadata{
                        Name:  "sw",
                        Group: "default",
@@ -85,7 +86,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() {
 
        g.It("should survive single node failure", func() {
                g.By("Verifying the stream exists in default group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                streamMetadata := &commonv1.Metadata{
                        Name:  "sw",
                        Group: "default",
@@ -111,7 +113,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() 
{
 
        g.It("should recover data after node restart", func() {
                g.By("Verifying the stream exists in default group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                streamMetadata := &commonv1.Metadata{
                        Name:  "sw",
                        Group: "default",
@@ -135,8 +138,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() 
{
                })
 
                g.By("Restarting the data node")
-               closeDataNode := setup.DataNode(clusterConfig, "--node-labels", 
"role=data")
-               dataNodeClosers = append(dataNodeClosers, closeDataNode)
+               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+               dataNodeClosers[0] = closeDataNode
 
                g.By("Waiting for cluster to stabilize and handoff queue to 
drain")
                gm.Eventually(func() bool {
diff --git a/test/integration/replication/trace_replication_test.go 
b/test/integration/replication/trace_replication_test.go
index 067c4cd1d..087b3466e 100644
--- a/test/integration/replication/trace_replication_test.go
+++ b/test/integration/replication/trace_replication_test.go
@@ -53,7 +53,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() {
 
        g.It("should return consistent results from replicas", func() {
                g.By("Verifying the trace exists in test-trace-group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                traceMetadata := &commonv1.Metadata{
                        Name:  "sw",
                        Group: "test-trace-group",
@@ -86,7 +87,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() {
 
        g.It("should survive single node failure", func() {
                g.By("Verifying the trace exists in test-trace-group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                traceMetadata := &commonv1.Metadata{
                        Name:  "sw",
                        Group: "test-trace-group",
@@ -112,7 +114,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() {
 
        g.It("should recover data after node restart", func() {
                g.By("Verifying the trace exists in test-trace-group")
-               ctx := context.Background()
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
                traceMetadata := &commonv1.Metadata{
                        Name:  "sw",
                        Group: "test-trace-group",
@@ -136,8 +139,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() {
                })
 
                g.By("Restarting the data node")
-               closeDataNode := setup.DataNode(clusterConfig, "--node-labels", 
"role=data")
-               dataNodeClosers = append(dataNodeClosers, closeDataNode)
+               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+               dataNodeClosers[0] = closeDataNode
 
                g.By("Waiting for cluster to stabilize and handoff queue to 
drain")
                gm.Eventually(func() bool {


Reply via email to