This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test/replication in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit be23171564a99a213b910ed35eddeed79d8d3521 Author: Hongtao Gao <[email protected]> AuthorDate: Mon Mar 23 02:54:38 2026 +0000 fix(replication): fix cluster stability issues for replication tests Fix connection manager node eviction from active map on failover, add context/cancel to measure schemaRepo to prevent shutdown panics, handle prometheus AlreadyRegisteredError for metric re-registration, treat gRPC Canceled as a failover error, reduce distributed query timeout from 30s to 5s, and stabilize test node restart by reusing persisted data directories. --- banyand/measure/metadata.go | 20 +- banyand/measure/svc_data.go | 14 +- banyand/property/gossip/service.go | 4 +- pkg/grpchelper/connmanager.go | 26 +- pkg/grpchelper/retry.go | 4 +- pkg/meter/prom/prom.go | 51 ++- .../logical/stream/stream_plan_distributed.go | 2 +- pkg/query/logical/trace/trace_plan_distributed.go | 2 +- pkg/test/setup/setup.go | 10 +- test/cases/measure/data/data.go | 3 +- test/cases/stream/data/data.go | 12 +- test/cases/trace/data/data.go | 11 +- test/cases/trace/data/input/all.yml | 12 +- test/cases/trace/data/want/all.yml | 452 +++------------------ .../replication/measure_normal_replication_test.go | 16 +- .../replication/replication_suite_test.go | 49 ++- test/integration/replication/replication_test.go | 20 +- .../replication/stream_replication_test.go | 13 +- .../replication/trace_replication_test.go | 13 +- 19 files changed, 231 insertions(+), 503 deletions(-) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 1517b8e68..daf140bd9 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -82,6 +82,8 @@ type schemaRepo struct { metadata metadata.Repo pipeline queue.Client l *logger.Logger + ctx context.Context + cancel context.CancelFunc closingGroups map[string]struct{} topNProcessorMap sync.Map nodeID string @@ -91,6 +93,7 @@ type schemaRepo struct { } func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, nodeID string) *schemaRepo { + ctx, cancel := context.WithCancel(context.Background()) sr := &schemaRepo{ path: path, l: svc.l, @@ -99,6 +102,8 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, n nodeID: nodeID, closingGroups: make(map[string]struct{}), role: databasev1.Role_ROLE_DATA, + ctx: ctx, + cancel: cancel, } sr.Repository = resourceSchema.NewRepository( svc.metadata, @@ -111,6 +116,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, n } func newLiaisonSchemaRepo(path string, svc *liaison, measureDataNodeRegistry grpc.NodeRegistry, pipeline queue.Client) *schemaRepo { + ctx, cancel := context.WithCancel(context.Background()) sr := &schemaRepo{ path: path, l: svc.l, @@ -118,6 +124,8 @@ func newLiaisonSchemaRepo(path string, svc *liaison, measureDataNodeRegistry grp pipeline: pipeline, closingGroups: make(map[string]struct{}), role: databasev1.Role_ROLE_LIAISON, + ctx: ctx, + cancel: cancel, } sr.Repository = resourceSchema.NewRepository( svc.metadata, @@ -188,7 +196,7 @@ func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) { } groupNames, revs := sr.Repository.Init(schema.KindMeasure) for i := range groupNames { - sr.createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), groupNames[i]) + sr.createTopNResultMeasure(sr.ctx, sr.metadata.MeasureRegistry(), groupNames[i]) } return true, revs } @@ -209,7 +217,7 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { Kind: resourceSchema.EventKindGroup, Metadata: g, }) - sr.createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), g.Metadata.Name) + sr.createTopNResultMeasure(sr.ctx, sr.metadata.MeasureRegistry(), g.Metadata.Name) case schema.KindMeasure: m := metadata.Spec.(*databasev1.Measure) if err := validate.Measure(m); err != nil { @@ -327,6 +335,7 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) { } func (sr *schemaRepo) Close() { + sr.cancel() var err error sr.topNProcessorMap.Range(func(_, val any) bool { manager := val.(*topNProcessorManager) @@ -578,9 +587,10 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context, measureSchema backoffStrategy := backoff.NewExponentialBackOff() backoffStrategy.MaxElapsedTime = 0 // never stop until topN measure has been created - err := backoff.Retry(operation, backoffStrategy) - if err != nil { - logger.Panicf("fail to create topN measure %s: %v", md, err) + if err := backoff.Retry(operation, backoff.WithContext(backoffStrategy, ctx)); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Panicf("fail to create topN measure %s: %v", md, err) + } } } diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go index 21ff035c5..065ff220f 100644 --- a/banyand/measure/svc_data.go +++ b/banyand/measure/svc_data.go @@ -429,12 +429,16 @@ func NewReadonlyDataSVC(metadata metadata.Repo, omr observability.MetricsRegistr } func newDataSchemaRepo(path string, svc *dataSVC, nodeLabels map[string]string, nodeID string) *schemaRepo { + ctx, cancel := context.WithCancel(context.Background()) sr := &schemaRepo{ - path: path, - l: svc.l, - metadata: svc.metadata, - nodeID: nodeID, - role: databasev1.Role_ROLE_DATA, + path: path, + l: svc.l, + metadata: svc.metadata, + nodeID: nodeID, + role: databasev1.Role_ROLE_DATA, + closingGroups: make(map[string]struct{}), + ctx: ctx, + cancel: cancel, } sr.Repository = resourceSchema.NewRepository( svc.metadata, diff --git a/banyand/property/gossip/service.go b/banyand/property/gossip/service.go index 91000c0f0..5ce1339ea 100644 --- a/banyand/property/gossip/service.go +++ b/banyand/property/gossip/service.go @@ -143,11 +143,11 @@ func (s *service) PreRun(ctx context.Context) error { s.serverMetrics = newServerMetrics(s.omr.With(metricsScope)) if s.metadata != nil { s.sel.OnInit([]schema.Kind{schema.KindGroup}) - s.metadata.RegisterHandler(s.prefix+"-nodes", schema.KindNode, s) - s.metadata.RegisterHandler(s.prefix+"-groups", schema.KindGroup, s) if err := s.initTracing(ctx); err != nil { s.log.Warn().Err(err).Msg("failed to init internal trace stream") } + s.metadata.RegisterHandler(s.prefix+"-nodes", schema.KindNode, s) + s.metadata.RegisterHandler(s.prefix+"-groups", schema.KindGroup, s) } s.protocolHandler = newProtocolHandler(s) go s.protocolHandler.processPropagation() diff --git a/pkg/grpchelper/connmanager.go b/pkg/grpchelper/connmanager.go index 4e9793679..0ed31da27 100644 --- a/pkg/grpchelper/connmanager.go +++ b/pkg/grpchelper/connmanager.go @@ -223,7 +223,7 @@ func (m *ConnManager[C]) OnDelete(node *databasev1.Node) { return } case <-m.closer.CloseNotify(): - return + return } attempt++ } @@ -339,10 +339,11 @@ func (m *ConnManager[C]) FailoverNode(node string) { } return } - if mn, ok := m.active[node]; ok && !m.checkHealthAndReconnect(mn.conn, mn.node, mn.client) { - _ = mn.conn.Close() - delete(m.active, node) - m.handler.OnInactive(node, mn.client) + mn, ok := m.active[node] + if !ok { + return + } + if !m.checkHealthAndReconnect(mn.conn, mn.node, mn.client) { m.log.Info().Str("status", m.dump()).Str("node", node).Msg("node is unhealthy in the failover flow, move it to evict queue") } } @@ -417,6 +418,7 @@ func (m *ConnManager[C]) removeNodeIfUnhealthy(name string, mn *managedNode[C]) if m.healthCheck(mn.node.String(), mn.conn) { return false } + m.log.Info().Str("node", name).Msg("removeNodeIfUnhealthy: node is unhealthy, removing from active") _ = mn.conn.Close() delete(m.active, name) m.handler.OnInactive(name, mn.client) @@ -424,12 +426,13 @@ func (m *ConnManager[C]) removeNodeIfUnhealthy(name string, mn *managedNode[C]) } // checkHealthAndReconnect checks if a node is healthy. If not, closes the conn, -// adds to evictable, calls OnInactive, and starts a retry goroutine. +// adds to evictable, deletes from active, calls OnInactive, and starts a retry goroutine. // Returns true if healthy. func (m *ConnManager[C]) checkHealthAndReconnect(conn *grpc.ClientConn, node *databasev1.Node, client C) bool { if m.healthCheck(node.String(), conn) { return true } + m.log.Info().Str("node", node.Metadata.Name).Msg("checkHealthAndReconnect: node is unhealthy, moving to evictable") _ = conn.Close() if !m.closer.AddRunning() { return false @@ -496,6 +499,7 @@ func (m *ConnManager[C]) checkHealthAndReconnect(conn *grpc.ClientConn, node *da attempt++ } }(name, m.evictable[name]) + delete(m.active, name) return false } @@ -508,12 +512,14 @@ func (m *ConnManager[C]) healthCheck(node string, conn *grpc.ClientConn) bool { }) return err }); requestErr != nil { - if e := m.log.Debug(); e.Enabled() { - e.Err(requestErr).Str("node", node).Msg("service unhealthy") - } + m.log.Info().Err(requestErr).Str("node", node).Msg("healthCheck: service unhealthy, RPC error") return false } - return resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING + if resp.GetStatus() != grpc_health_v1.HealthCheckResponse_SERVING { + m.log.Info().Str("node", node).Str("status", resp.GetStatus().String()).Msg("healthCheck: service not SERVING") + return false + } + return true } func (m *ConnManager[C]) dump() string { diff --git a/pkg/grpchelper/retry.go b/pkg/grpchelper/retry.go index 2478a4c3d..8bb6282b6 100644 --- a/pkg/grpchelper/retry.go +++ b/pkg/grpchelper/retry.go @@ -156,7 +156,9 @@ func IsFailoverError(err error) bool { if !ok { return false } - return code == codes.Unavailable || code == codes.DeadlineExceeded + // codes.Canceled with "client connection is closing" indicates the gRPC connection + // to the node was closed (node failure or keepalive timeout), not a client-side cancellation. + return code == codes.Unavailable || code == codes.DeadlineExceeded || code == codes.Canceled } // IsInternalError checks if the error is an internal server error. diff --git a/pkg/meter/prom/prom.go b/pkg/meter/prom/prom.go index 0ca74abec..00443bbd8 100644 --- a/pkg/meter/prom/prom.go +++ b/pkg/meter/prom/prom.go @@ -18,10 +18,10 @@ package prom import ( + "errors" "unsafe" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/apache/skywalking-banyandb/pkg/meter" ) @@ -42,36 +42,43 @@ func NewProvider(scope meter.Scope, reg prometheus.Registerer) meter.Provider { // Counter returns a prometheus counter. func (p *provider) Counter(name string, labels ...string) meter.Counter { - return &counter{ - counter: promauto.With(p.reg).NewCounterVec(prometheus.CounterOpts{ - Name: p.scope.GetNamespace() + "_" + name, - Help: p.scope.GetNamespace() + "_" + name, - ConstLabels: convertLabels(p.scope.GetLabels()), - }, labels), - } + return &counter{counter: registerCollector(p.reg, prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: p.scope.GetNamespace() + "_" + name, + Help: p.scope.GetNamespace() + "_" + name, + ConstLabels: convertLabels(p.scope.GetLabels()), + }, labels))} } // Gauge returns a prometheus gauge. func (p *provider) Gauge(name string, labels ...string) meter.Gauge { - return &gauge{ - gauge: promauto.With(p.reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: p.scope.GetNamespace() + "_" + name, - Help: p.scope.GetNamespace() + "_" + name, - ConstLabels: convertLabels(p.scope.GetLabels()), - }, labels), - } + return &gauge{gauge: registerCollector(p.reg, prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: p.scope.GetNamespace() + "_" + name, + Help: p.scope.GetNamespace() + "_" + name, + ConstLabels: convertLabels(p.scope.GetLabels()), + }, labels))} } // Histogram returns a prometheus histogram. func (p *provider) Histogram(name string, buckets meter.Buckets, labels ...string) meter.Histogram { - return &histogram{ - histogram: promauto.With(p.reg).NewHistogramVec(prometheus.HistogramOpts{ - Name: p.scope.GetNamespace() + "_" + name, - Help: p.scope.GetNamespace() + "_" + name, - ConstLabels: convertLabels(p.scope.GetLabels()), - Buckets: buckets, - }, labels), + return &histogram{histogram: registerCollector(p.reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: p.scope.GetNamespace() + "_" + name, + Help: p.scope.GetNamespace() + "_" + name, + ConstLabels: convertLabels(p.scope.GetLabels()), + Buckets: buckets, + }, labels))} +} + +func registerCollector[T prometheus.Collector](reg prometheus.Registerer, c T) T { + if regErr := reg.Register(c); regErr != nil { + var are prometheus.AlreadyRegisteredError + if errors.As(regErr, &are) { + if existing, ok := are.ExistingCollector.(T); ok { + return existing + } + } + panic(regErr) } + return c } // convertLabels converts a map of labels to a prometheus.Labels. diff --git a/pkg/query/logical/stream/stream_plan_distributed.go b/pkg/query/logical/stream/stream_plan_distributed.go index b99377653..c05d92ea8 100644 --- a/pkg/query/logical/stream/stream_plan_distributed.go +++ b/pkg/query/logical/stream/stream_plan_distributed.go @@ -38,7 +38,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -const defaultQueryTimeout = 30 * time.Second +const defaultQueryTimeout = 5 * time.Second var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) diff --git a/pkg/query/logical/trace/trace_plan_distributed.go b/pkg/query/logical/trace/trace_plan_distributed.go index bedc26ae0..0dfae7bdd 100644 --- a/pkg/query/logical/trace/trace_plan_distributed.go +++ b/pkg/query/logical/trace/trace_plan_distributed.go @@ -39,7 +39,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/model" ) -const defaultQueryTimeout = 30 * time.Second +const defaultQueryTimeout = 5 * time.Second var _ logical.UnresolvedPlan = (*unresolvedTraceDistributed)(nil) diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 7dcd4412b..2b76a42c7 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -607,7 +607,15 @@ func CMD(flags ...string) func() { }() return func() { closeFn() - wg.Wait() + waitCh := make(chan struct{}) + go func() { + wg.Wait() + close(waitCh) + }() + select { + case <-waitCh: + case <-time.After(30 * time.Second): + } } } diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index 98ddbf984..ae6e582c3 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -144,7 +144,8 @@ func verifyWithContext(ctx context.Context, innerGm gm.Gomega, sharedContext hel // VerifyFn verify whether the query response matches the wanted result. var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() verifyWithContext(ctx, innerGm, sharedContext, args) } diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go index ee83d254a..059598727 100644 --- a/test/cases/stream/data/data.go +++ b/test/cases/stream/data/data.go @@ -76,7 +76,8 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args query.TimeRange = helpers.TimeRange(args, sharedContext) query.Stages = args.Stages c := streamv1.NewStreamServiceClient(sharedContext.Connection) - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() resp, err := c.Query(ctx, query) if args.WantErr { if err == nil { @@ -248,7 +249,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args helpers.Args, yamlQuery *stream query, errStrs := bydbql.ParseQuery(qlQueryStr) innerGm.Expect(errStrs).To(gm.BeNil()) transformer := bydbql.NewTransformer(mockRepo) - transform, err := transformer.Transform(context.Background(), query) + transformCtx, transformCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer transformCancel() + transform, err := transformer.Transform(transformCtx, query) if args.WantErr && err != nil { return } @@ -268,8 +271,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args helpers.Args, yamlQuery *stream // simple check the QL can be executed client := bydbqlv1.NewBydbQLServiceClient(conn) - ctx := context.Background() - bydbqlResp, err := client.Query(ctx, &bydbqlv1.QueryRequest{ + qlCtx, qlCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer qlCancel() + bydbqlResp, err := client.Query(qlCtx, &bydbqlv1.QueryRequest{ Query: qlQueryStr, }) if args.WantErr { diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go index 0c024ad5c..b184c69f2 100644 --- a/test/cases/trace/data/data.go +++ b/test/cases/trace/data/data.go @@ -77,7 +77,8 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args query.TimeRange = helpers.TimeRange(args, sharedContext) query.Stages = args.Stages c := tracev1.NewTraceServiceClient(sharedContext.Connection) - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() resp, err := c.Query(ctx, query) if args.WantErr { if err == nil { @@ -224,7 +225,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args helpers.Args, yamlQuery *tracev innerGm.Expect(errStrs).To(gm.BeNil()) transformer := bydbql.NewTransformer(mockRepo) - result, err := transformer.Transform(context.Background(), parsed) + transformCtx, transformCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer transformCancel() + result, err := transformer.Transform(transformCtx, parsed) innerGm.Expect(err).NotTo(gm.HaveOccurred()) qlQuery, ok := result.QueryRequest.(*tracev1.QueryRequest) @@ -240,7 +243,9 @@ func verifyQLWithRequest(innerGm gm.Gomega, args helpers.Args, yamlQuery *tracev innerGm.Expect(equal).To(gm.BeTrue(), "QL:\n%s\nYAML:\n%s", qlQuery.String(), yamlQuery.String()) bydbqlClient := bydbqlv1.NewBydbQLServiceClient(conn) - bydbqlResp, err := bydbqlClient.Query(context.Background(), &bydbqlv1.QueryRequest{ + qlCtx, qlCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer qlCancel() + bydbqlResp, err := bydbqlClient.Query(qlCtx, &bydbqlv1.QueryRequest{ Query: qlQueryStr, }) innerGm.Expect(err).NotTo(gm.HaveOccurred()) diff --git a/test/cases/trace/data/input/all.yml b/test/cases/trace/data/input/all.yml index 597f01b27..a27a3ca40 100644 --- a/test/cases/trace/data/input/all.yml +++ b/test/cases/trace/data/input/all.yml @@ -17,13 +17,7 @@ name: "sw" groups: ["test-trace-group"] -tag_projection: - - "trace_id" - - "state" - - "service_id" - - "service_instance_id" - - "endpoint_id" - - "duration" - - "span_id" - - "timestamp" +order_by: + index_rule_name: "timestamp" + sort: "SORT_DESC" limit: 100 \ No newline at end of file diff --git a/test/cases/trace/data/want/all.yml b/test/cases/trace/data/want/all.yml index b3103562b..cb8eae893 100644 --- a/test/cases/trace/data/want/all.yml +++ b/test/cases/trace/data/want/all.yml @@ -16,417 +16,71 @@ # under the License. # Expected traces from sw.json and sw_mixed_traces.json written to test-trace-group +# Ordered by timestamp DESC (latest spans first) traces: - - traceId: trace_001 - spans: + - spans: + - span: trace_008_span_1 + spanId: span_008_1 + - span: trace_008_span_2 + spanId: span_008_2 + - span: trace_008_span_3 + spanId: span_008_3 + traceId: trace_008 + - spans: + - span: trace_007_span_1 + spanId: span_007_1 + - span: trace_007_span_2 + spanId: span_007_2 + traceId: trace_007 + - spans: + - span: trace_006_span_1 + spanId: span_006_1 + - span: trace_006_span_2 + spanId: span_006_2 + - span: trace_006_span_3 + spanId: span_006_3 + traceId: trace_006 + - spans: + - span: trace_002_span_1 + spanId: span_002_1 + - span: trace_002_span_2 + spanId: span_002_2 + - span: trace_002_span_3 + spanId: span_002_3 + - span: trace_002_span_4 + spanId: span_002_4 + traceId: trace_002 + - spans: - span: trace_001_span_1 spanId: span_001_1 - tags: - - key: trace_id - value: - str: - value: trace_001 - - key: state - value: - int: - value: 1 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_1 - - key: endpoint_id - value: - str: - value: "/home_endpoint" - - key: duration - value: - int: - value: 1000 - - key: span_id - value: - str: - value: span_001_1 - span: trace_001_span_2 spanId: span_001_2 - tags: - - key: trace_id - value: - str: - value: trace_001 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_2 - - key: endpoint_id - value: - str: - value: "/product_endpoint" - - key: duration - value: - int: - value: 500 - - key: span_id - value: - str: - value: span_001_2 - span: trace_001_span_3 spanId: span_001_3 - tags: - - key: trace_id - value: - str: - value: trace_001 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_1 - - key: endpoint_id - value: - str: - value: "/item_endpoint" - - key: duration - value: - int: - value: 300 - - key: span_id - value: - str: - value: span_001_3 - - traceId: trace_002 - spans: - - span: trace_002_span_1 - spanId: span_002_1 - tags: - - key: trace_id - value: - str: - value: trace_002 - - key: state - value: - int: - value: 1 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_1 - - key: endpoint_id - value: - str: - value: "/home_endpoint" - - key: duration - value: - int: - value: 800 - - key: span_id - value: - str: - value: span_002_1 - - span: trace_002_span_2 - spanId: span_002_2 - tags: - - key: trace_id - value: - str: - value: trace_002 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_3 - - key: endpoint_id - value: - str: - value: "/price_endpoint" - - key: duration - value: - int: - value: 200 - - key: span_id - value: - str: - value: span_002_2 - - traceId: trace_003 - spans: - - span: trace_003_span_1 - spanId: span_003_1 - tags: - - key: trace_id - value: - str: - value: trace_003 - - key: state - value: - int: - value: 1 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_2 - - key: endpoint_id - value: - str: - value: "/product_endpoint" - - key: duration - value: - int: - value: 1200 - - key: span_id - value: - str: - value: span_003_1 - - span: trace_003_span_2 - spanId: span_003_2 - tags: - - key: trace_id - value: - str: - value: trace_003 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_1 - - key: endpoint_id - value: - str: - value: "/home_endpoint" - - key: duration - value: - int: - value: 150 - - key: span_id - value: - str: - value: span_003_2 - - span: trace_003_span_3 - spanId: span_003_3 - tags: - - key: trace_id - value: - str: - value: trace_003 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_3 - - key: endpoint_id - value: - str: - value: "/price_endpoint" - - key: duration - value: - int: - value: 400 - - key: span_id - value: - str: - value: span_003_3 - - traceId: trace_004 - spans: - - span: trace_004_span_1 - spanId: span_004_1 - tags: - - key: trace_id - value: - str: - value: trace_004 - - key: state - value: - int: - value: 1 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_2 - - key: endpoint_id - value: - str: - value: "/home_endpoint" - - key: duration - value: - int: - value: 600 - - key: span_id - value: - str: - value: span_004_1 - - traceId: trace_005 - spans: + - span: trace_001_span_4 + spanId: span_001_4 + - span: trace_001_span_5 + spanId: span_001_5 + traceId: trace_001 + - spans: - span: trace_005_span_1 spanId: span_005_1 - tags: - - key: trace_id - value: - str: - value: trace_005 - - key: state - value: - int: - value: 1 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_2 - - key: endpoint_id - value: - str: - value: "/product_endpoint" - - key: duration - value: - int: - value: 900 - - key: span_id - value: - str: - value: span_005_1 - span: trace_005_span_2 spanId: span_005_2 - tags: - - key: trace_id - value: - str: - value: trace_005 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_1 - - key: endpoint_id - value: - str: - value: "/home_endpoint" - - key: duration - value: - int: - value: 250 - - key: span_id - value: - str: - value: span_005_2 - span: trace_005_span_3 spanId: span_005_3 - tags: - - key: trace_id - value: - str: - value: trace_005 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_3 - - key: endpoint_id - value: - str: - value: "/price_endpoint" - - key: duration - value: - int: - value: 350 - - key: span_id - value: - str: - value: span_005_3 - span: trace_005_span_4 spanId: span_005_4 - tags: - - key: trace_id - value: - str: - value: trace_005 - - key: state - value: - int: - value: 0 - - key: service_id - value: - str: - value: webapp_service - - key: service_instance_id - value: - str: - value: webapp_instance_1 - - key: endpoint_id - value: - str: - value: "/item_endpoint" - - key: duration - value: - int: - value: 180 - - key: span_id - value: - str: - value: span_005_4 \ No newline at end of file + traceId: trace_005 + - spans: + - span: trace_004_span_1 + spanId: span_004_1 + traceId: trace_004 + - spans: + - span: trace_003_span_1 + spanId: span_003_1 + - span: trace_003_span_2 + spanId: span_003_2 + - span: trace_003_span_3 + spanId: span_003_3 + traceId: trace_003 diff --git a/test/integration/replication/measure_normal_replication_test.go b/test/integration/replication/measure_normal_replication_test.go index 8bda48944..d45355ef8 100644 --- a/test/integration/replication/measure_normal_replication_test.go +++ b/test/integration/replication/measure_normal_replication_test.go @@ -53,7 +53,8 @@ var _ = g.Describe("Measure Normal Mode Replication", func() { g.It("should return consistent results from replicas", func() { g.By("Verifying the measure exists in sw_metric group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() measureMetadata := &commonv1.Metadata{ Name: "service_cpm_minute", Group: "sw_metric", @@ -85,7 +86,8 @@ var _ = g.Describe("Measure Normal Mode Replication", func() { g.It("should survive single node failure", func() { g.By("Verifying the measure exists in sw_metric group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() measureMetadata := &commonv1.Metadata{ Name: "service_cpm_minute", Group: "sw_metric", @@ -111,7 +113,8 @@ var _ = g.Describe("Measure Normal Mode Replication", func() { g.It("should recover data after node restart", func() { g.By("Verifying the measure exists in sw_metric group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() measureMetadata := &commonv1.Metadata{ Name: "service_cpm_minute", Group: "sw_metric", @@ -135,8 +138,8 @@ var _ = g.Describe("Measure Normal Mode Replication", func() { }) g.By("Restarting the data node") - closeDataNode := setup.DataNode(clusterConfig, "--node-labels", "role=data") - dataNodeClosers = append(dataNodeClosers, closeDataNode) + _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels", "role=data") + dataNodeClosers[0] = closeDataNode g.By("Waiting for cluster to stabilize and handoff queue to drain") gm.Eventually(func() bool { @@ -164,7 +167,8 @@ func verifyDataContentWithArgs(conn *grpc.ClientConn, baseTime time.Time, args h } func isClusterStable(conn *grpc.ClientConn) bool { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() clusterClient := databasev1.NewClusterStateServiceClient(conn) state, err := clusterClient.GetClusterState(ctx, &databasev1.GetClusterStateRequest{}) if err != nil { diff --git a/test/integration/replication/replication_suite_test.go b/test/integration/replication/replication_suite_test.go index c8cba4d57..1ac8bcc27 100644 --- a/test/integration/replication/replication_suite_test.go +++ b/test/integration/replication/replication_suite_test.go @@ -54,13 +54,15 @@ func TestReplication(t *testing.T) { } var ( - deferFunc func() - goods []gleak.Goroutine - now time.Time - connection *grpc.ClientConn - liaisonAddr string - dataNodeClosers []func() - clusterConfig *setup.ClusterConfig + deferFunc func() + goods []gleak.Goroutine + now time.Time + connection *grpc.ClientConn + liaisonAddr string + dataNodeClosers []func() + dataNodeDirs []string + dataNodeDirCleanups []func() + clusterConfig *setup.ClusterConfig ) var _ = SynchronizedBeforeSuite(func() []byte { @@ -79,9 +81,15 @@ var _ = SynchronizedBeforeSuite(func() []byte { By("Starting 3 data nodes for replication test") dataNodeClosers = make([]func(), 0, 3) + dataNodeDirs = make([]string, 0, 3) + dataNodeDirCleanups = make([]func(), 0, 3) for i := 0; i < 3; i++ { - closeDataNode := setup.DataNode(clusterConfig, "--node-labels", "role=data") + nodeDir, nodeDirCleanup, dirErr := test.NewSpace() + Expect(dirErr).NotTo(HaveOccurred()) + _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, nodeDir, "--node-labels", "role=data") + dataNodeDirs = append(dataNodeDirs, nodeDir) + dataNodeDirCleanups = append(dataNodeDirCleanups, nodeDirCleanup) dataNodeClosers = append(dataNodeClosers, closeDataNode) } @@ -134,6 +142,9 @@ var _ = SynchronizedBeforeSuite(func() []byte { for _, closeDataNode := range dataNodeClosers { closeDataNode() } + for _, cleanup := range dataNodeDirCleanups { + cleanup() + } tmpDirCleanup() } @@ -172,11 +183,27 @@ var _ = SynchronizedAfterSuite(func() { } }, func() {}) +// AfterEach restores cluster to 3 active nodes if a spec left a node stopped. +var _ = AfterEach(func() { + if len(dataNodeDirs) == 0 || connection == nil { + return + } + if isClusterStable(connection) { + return + } + // Node 0 may have been stopped by a spec — restart it from its data directory. + _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels", "role=data") + dataNodeClosers[0] = closeDataNode + Eventually(func() bool { + return isClusterStable(connection) + }, flags.EventuallyTimeout).Should(BeTrue(), "Cluster should stabilize after restoring node 0") +}) + var _ = ReportAfterSuite("Replication Suite", func(report Report) { + if deferFunc != nil { + deferFunc() + } if report.SuiteSucceeded { - if deferFunc != nil { - deferFunc() - } Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) } diff --git a/test/integration/replication/replication_test.go b/test/integration/replication/replication_test.go index c609171b0..020a9c45f 100644 --- a/test/integration/replication/replication_test.go +++ b/test/integration/replication/replication_test.go @@ -53,7 +53,8 @@ var _ = g.Describe("Replication", func() { g.Context("with replicated_group", func() { g.It("should survive node failure", func() { g.By("Verifying the measure exists in replicated_group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() measureMetadata := &commonv1.Metadata{ Name: "service_traffic", Group: "replicated_group", @@ -66,29 +67,24 @@ var _ = g.Describe("Replication", func() { gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group")) g.By("Verifying cluster is stable with 3 data nodes") - gm.Expect(isClusterStable(conn)).To(gm.BeTrue(), + gm.Eventually(func() bool { + return isClusterStable(conn) + }, flags.EventuallyTimeout).Should(gm.BeTrue(), "Cluster should have 3 active data nodes before test") g.By("Stopping one data node") - // We should have 3 data node closers in dataNodeClosers - // Stop the first one - // Create a local copy to avoid mutating the package-level slice closersToStop := make([]func(), len(dataNodeClosers)) copy(closersToStop, dataNodeClosers) closersToStop[0]() - // Wait for the cluster to stabilize (should have 3 active nodes after failure) - gm.Eventually(func() bool { - return isClusterStable(conn) - }, flags.EventuallyTimeout).Should(gm.BeTrue(), - "Cluster should have 3 active data nodes after stopping one node") - g.By("Verifying data is still accessible after node failure") verifyDataContentAfterNodeFailure(conn, now) g.By("Verifying replication factor") groupClient := databasev1.NewGroupRegistryServiceClient(conn) - groupResp, err := groupClient.Get(ctx, &databasev1.GroupRegistryServiceGetRequest{ + groupCtx, groupCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer groupCancel() + groupResp, err := groupClient.Get(groupCtx, &databasev1.GroupRegistryServiceGetRequest{ Group: "replicated_group", }) gm.Expect(err).NotTo(gm.HaveOccurred()) diff --git a/test/integration/replication/stream_replication_test.go b/test/integration/replication/stream_replication_test.go index 19d20e9bd..7b70c18ae 100644 --- a/test/integration/replication/stream_replication_test.go +++ b/test/integration/replication/stream_replication_test.go @@ -53,7 +53,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() { g.It("should return consistent results from replicas", func() { g.By("Verifying the stream exists in default group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() streamMetadata := &commonv1.Metadata{ Name: "sw", Group: "default", @@ -85,7 +86,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() { g.It("should survive single node failure", func() { g.By("Verifying the stream exists in default group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() streamMetadata := &commonv1.Metadata{ Name: "sw", Group: "default", @@ -111,7 +113,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() { g.It("should recover data after node restart", func() { g.By("Verifying the stream exists in default group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() streamMetadata := &commonv1.Metadata{ Name: "sw", Group: "default", @@ -135,8 +138,8 @@ var _ = g.Describe("Stream Normal Mode Replication", func() { }) g.By("Restarting the data node") - closeDataNode := setup.DataNode(clusterConfig, "--node-labels", "role=data") - dataNodeClosers = append(dataNodeClosers, closeDataNode) + _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels", "role=data") + dataNodeClosers[0] = closeDataNode g.By("Waiting for cluster to stabilize and handoff queue to drain") gm.Eventually(func() bool { diff --git a/test/integration/replication/trace_replication_test.go b/test/integration/replication/trace_replication_test.go index 067c4cd1d..087b3466e 100644 --- a/test/integration/replication/trace_replication_test.go +++ b/test/integration/replication/trace_replication_test.go @@ -53,7 +53,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() { g.It("should return consistent results from replicas", func() { g.By("Verifying the trace exists in test-trace-group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() traceMetadata := &commonv1.Metadata{ Name: "sw", Group: "test-trace-group", @@ -86,7 +87,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() { g.It("should survive single node failure", func() { g.By("Verifying the trace exists in test-trace-group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() traceMetadata := &commonv1.Metadata{ Name: "sw", Group: "test-trace-group", @@ -112,7 +114,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() { g.It("should recover data after node restart", func() { g.By("Verifying the trace exists in test-trace-group") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() traceMetadata := &commonv1.Metadata{ Name: "sw", Group: "test-trace-group", @@ -136,8 +139,8 @@ var _ = g.Describe("Trace Normal Mode Replication", func() { }) g.By("Restarting the data node") - closeDataNode := setup.DataNode(clusterConfig, "--node-labels", "role=data") - dataNodeClosers = append(dataNodeClosers, closeDataNode) + _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels", "role=data") + dataNodeClosers[0] = closeDataNode g.By("Waiting for cluster to stabilize and handoff queue to drain") gm.Eventually(func() bool {
