This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test/replication in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0aae7e985b142491e88878a6f2d9a321c605a9d0 Author: Hongtao Gao <[email protected]> AuthorDate: Fri Mar 20 03:22:11 2026 +0000 docs: update replication integration tests design for property-based schema - Switch from embedded etcd to property-based schema repo - Use DNS-based node discovery via discovery.yaml file - Add Step 1: modify current replication suite first before adding new tests - Remove etcd endpoint and server from test infrastructure Co-Authored-By: Claude Opus 4.6 <[email protected]> --- ...6-03-20-replication-integration-tests-design.md | 227 ++++++++++++--------- 1 file changed, 131 insertions(+), 96 deletions(-) diff --git a/docs/superpowers/specs/2026-03-20-replication-integration-tests-design.md b/docs/superpowers/specs/2026-03-20-replication-integration-tests-design.md index bdbaf50af..ed940b6ff 100644 --- a/docs/superpowers/specs/2026-03-20-replication-integration-tests-design.md +++ b/docs/superpowers/specs/2026-03-20-replication-integration-tests-design.md @@ -9,15 +9,19 @@ Add comprehensive replication integration tests for BanyanDB that verify: ## Background -GitHub issue [#13229](https://github.com/apache/skywalking/issues/13229) requests integration tests for BanyanDB replication resilience. The existing replication test only covers `service_traffic` measure with `index_mode: true` in a replicated group. +GitHub issue [#13229](https://github.com/apache/skywalking/issues/13229) requests integration tests for BanyanDB replication resilience. The existing replication test only covers `service_traffic` measure with `index_mode: true` in a replicated group and uses embedded etcd for both node discovery and schema storage. -## Scope +## Key Architectural Changes -This design adds replication tests for: -- **Stream** - replicated stream schemas -- **Measure (normal mode)** - measures without `index_mode: true` -- **Trace** - replicated trace schemas -- **TopN** - replicated TopN schemas (runs on top of measures) +### Current State (Must Change) +- Uses embedded etcd for node discovery and schema storage +- `test/integration/replication/replication_suite_test.go` starts embedded etcd server +- Schema loading via `test_measure.PreloadSchema`, `test_stream.PreloadSchema`, etc. + +### Target State +- **Property-based schema repo** (`ModeProperty`) - schemas stored in data nodes' property schema servers +- **DNS-based node discovery** (`ModeFile`) - nodes discover each other via `discovery.yaml` file +- **No embedded etcd** - no `embeddedetcd.NewServer()` call ## Design Principles @@ -25,6 +29,49 @@ This design adds replication tests for: 2. **Same schema names** - Measures/streams/traces keep their original names 3. **Separate schema location** - Replicated schemas in a new directory `pkg/test/replicated/` 4. **Isolation** - Replication test suite loads only from `pkg/test/replicated/` schemas +5. **No etcd** - Use property-based schema storage and file-based node discovery + +## Infrastructure Changes + +### Modified: `test/integration/replication/replication_suite_test.go` + +#### Before (etcd-based) +```go +// Start embedded etcd server +server, err := embeddedetcd.NewServer(...) +schemaRegistry, err := schema.NewEtcdSchemaRegistry(...) +test_stream.PreloadSchema(ctx, schemaRegistry) +test_measure.PreloadSchema(ctx, schemaRegistry) +test_trace.PreloadSchema(ctx, schemaRegistry) +clusterConfig = setup.EtcdClusterConfig(ep) +``` + +#### After (property + file-based) +```go +// Create discovery file writer for DNS-based node discovery +tmpDir, tmpDirCleanup, tmpErr := test.NewSpace() +dfWriter := setup.NewDiscoveryFileWriter(tmpDir) +clusterConfig := setup.PropertyClusterConfig(dfWriter) + +// Start data nodes and liaison node +// Data nodes register themselves via dfWriter.AddNode() +// Liaison node discovers data nodes from the discovery.yaml file + +// Load schemas via property-based registry +setup.PreloadSchemaViaProperty(clusterConfig, + test_replicated_measure.PreloadSchema, + test_replicated_stream.PreloadSchema, + test_replicated_trace.PreloadSchema, +) +``` + +### Key Setup Functions + +| Function | Purpose | +|----------|---------| +| `setup.NewDiscoveryFileWriter(tmpDir)` | Creates a `discovery.yaml` file for node discovery | +| `setup.PropertyClusterConfig(dfWriter)` | Configures cluster with `ModeFile` (node discovery) and `ModeProperty` (schema registry) | +| `setup.PreloadSchemaViaProperty(config, loaders...)` | Loads schemas via property-based schema registry | ## Schema Location @@ -33,9 +80,9 @@ pkg/test/replicated/ ├── measure/ │ └── testdata/ │ ├── groups/ -│ │ ├── sw_metric_replicated.json # replicas: 2 -│ │ ├── index_mode_replicated.json # replicas: 2 -│ │ └── exception_replicated.json # replicas: 2 +│ │ ├── sw_metric.json # replicas: 2 +│ │ ├── index_mode.json # replicas: 2 +│ │ └── exception.json # replicas: 2 │ └── measures/ │ ├── service_traffic.json # index_mode: true │ ├── service_cpm_minute.json # normal mode @@ -44,6 +91,7 @@ pkg/test/replicated/ ├── stream/ │ └── testdata/ │ ├── group.json # replicas: 2 +│ ├── group_with_stages.json # replicas: 2 │ └── streams/ │ ├── sw.json │ ├── duplicated.json @@ -52,7 +100,10 @@ pkg/test/replicated/ │ └── testdata/ │ └── groups/ │ ├── test-trace-group.json # replicas: 2 -│ └── zipkin-trace-group.json # replicas: 2 +│ ├── test-trace-updated.json # replicas: 2 +│ ├── zipkin-trace-group.json # replicas: 2 +│ ├── test-trace-spec.json # replicas: 2 +│ └── test-trace-spec2.json # replicas: 2 └── topn/ └── testdata/ └── topnaggregations/ @@ -63,72 +114,73 @@ pkg/test/replicated/ ### Measure Groups -| Original Group | Replicated Group | Replicas | -|----------------|------------------|----------| -| sw_metric | sw_metric | 2 | -| index_mode | index_mode | 2 | -| exception | exception | 2 | +| Original Group | Replicas (Original) | Replicas (New) | +|----------------|---------------------|----------------| +| sw_metric | 1 | 2 | +| index_mode | - | 2 | +| exception | 0 | 2 | ### Stream Groups -| Original Group | Replicated Group | Replicas | -|----------------|------------------|----------| -| default | default | 2 | -| default-spec | default-spec | 2 | -| default-spec2 | default-spec2 | 2 | +| Original Group | Replicas (Original) | Replicas (New) | +|----------------|---------------------|----------------| +| default | 0 | 2 | +| default-spec | 0 | 2 | +| default-spec2 | 0 | 2 | ### Trace Groups -| Original Group | Replicated Group | Replicas | -|----------------|------------------|----------| -| test-trace-group | test-trace-group | 2 | -| zipkinTrace | zipkinTrace | 2 | -| test-trace-updated | test-trace-updated | 2 | -| test-trace-spec | test-trace-spec | 2 | -| test-trace-spec2 | test-trace-spec2 | 2 | +| Original Group | Replicas (Original) | Replicas (New) | +|----------------|---------------------|----------------| +| test-trace-group | 0 | 2 | +| zipkinTrace | 0 | 2 | +| test-trace-updated | 0 | 2 | +| test-trace-spec | 0 | 2 | +| test-trace-spec2 | 0 | 2 | -## Test Files to Create +## Implementation Steps -### Schema Files (pkg/test/replicated/) +### Step 1: Modify Current Replication Suite (Must Do First) -#### Measure Schemas -- `pkg/test/replicated/measure/testdata/groups/sw_metric.json` (copy from `pkg/test/measure/testdata/groups/sw_metric.json` with `replicas: 2`) -- `pkg/test/replicated/measure/testdata/groups/index_mode.json` (copy with `replicas: 2`) -- `pkg/test/replicated/measure/testdata/groups/exception.json` (copy with `replicas: 2`) -- `pkg/test/replicated/measure/testdata/measures/*.json` (all measure schemas) +**File:** `test/integration/replication/replication_suite_test.go` -#### Stream Schemas -- `pkg/test/replicated/stream/testdata/group.json` (copy with `replicas: 2`) -- `pkg/test/replicated/stream/testdata/group_with_stages.json` (copy with `replicas: 2`) -- `pkg/test/replicated/stream/testdata/streams/*.json` (all stream schemas) +1. Remove embedded etcd server startup +2. Remove `schema.NewEtcdSchemaRegistry()` call +3. Add `setup.NewDiscoveryFileWriter()` for DNS-based node discovery +4. Use `setup.PropertyClusterConfig()` instead of `setup.EtcdClusterConfig()` +5. Use `setup.PreloadSchemaViaProperty()` instead of direct schema preloading +6. Remove `embeddedetcd` import -#### Trace Schemas -- `pkg/test/replicated/trace/testdata/groups/test-trace-group.json` (copy with `replicas: 2`) -- `pkg/test/replicated/trace/testdata/groups/test-trace-updated.json` (copy with `replicas: 2`) -- `pkg/test/replicated/trace/testdata/groups/zipkin-trace-group.json` (copy with `replicas: 2`) -- `pkg/test/replicated/trace/testdata/groups/test-trace-spec.json` (copy with `replicas: 2`) -- `pkg/test/replicated/trace/testdata/groups/test-trace-spec2.json` (copy with `replicas: 2`) +**Node Discovery Flow:** +- Each data node calls `config.NodeDiscovery.FileWriter.AddNode(nodeName, grpcAddr)` to register itself in `discovery.yaml` +- The liaison node reads `discovery.yaml` to discover data nodes -#### TopN Schemas -- `pkg/test/replicated/topn/testdata/topnaggregations/*.json` (all TopN schemas) +### Step 2: Create Replicated Schema Directory -### Go Files (pkg/test/replicated/) +**Location:** `pkg/test/replicated/` -#### etcd.go Preload Functions -- `pkg/test/replicated/measure/etcd.go` - `PreloadSchema(ctx, registry)` for replicated measures -- `pkg/test/replicated/stream/etcd.go` - `PreloadSchema(ctx, registry)` for replicated streams -- `pkg/test/replicated/trace/etcd.go` - `PreloadSchema(ctx, registry)` for replicated traces +Structure mirrors `pkg/test/measure/`, `pkg/test/stream/`, `pkg/test/trace/` -### Integration Test Files (test/integration/replication/) +### Step 3: Create Replicated Schemas -#### New Test Files -- `test/integration/replication/measure_normal_replication_test.go` - Normal mode measure replication tests -- `test/integration/replication/stream_replication_test.go` - Stream replication tests -- `test/integration/replication/trace_replication_test.go` - Trace replication tests +For each measure/stream/trace schema: +1. Copy from original location (`pkg/test/measure/testdata/`, etc.) +2. Modify group to have `replicas: 2` +3. Keep measure/stream/trace names unchanged -#### Modified Files -- `test/integration/replication/replication_suite_test.go` - Add preloading of replicated schemas -- `test/cases/init.go` - Add data initialization for replicated schemas +### Step 4: Create Replicated Preload Functions + +**Files:** +- `pkg/test/replicated/measure/etcd.go` - PreloadSchema for replicated measures +- `pkg/test/replicated/stream/etcd.go` - PreloadSchema for replicated streams +- `pkg/test/replicated/trace/etcd.go` - PreloadSchema for replicated traces + +### Step 5: Add New Replication Tests + +**Files:** +- `test/integration/replication/measure_normal_replication_test.go` - Normal mode measure tests +- `test/integration/replication/stream_replication_test.go` - Stream tests +- `test/integration/replication/trace_replication_test.go` - Trace tests ## Extended Test Pattern @@ -137,9 +189,15 @@ For each data type (measure, stream, trace), the test verifies: ### 1. Deduplication Verification ```go // Query the same data multiple times +var firstResult *measurev1.QueryResponse for i := 0; i < 3; i++ { resp := query() -// Verify results are consistent (deduplication works across replicas) + if i == 0 { + firstResult = resp + } else { + // Verify consistent results (deduplication works across replicas) + Expect(cmp.Equal(resp, firstResult, ...)).To(BeTrue()) + } } ``` @@ -148,10 +206,10 @@ for i := 0; i < 3; i++ { // Stop one data node closersToStop[0]() -// Wait for cluster stabilization +// Wait for cluster stabilization via file-based discovery gm.Eventually(func() int { - nodes, _ := helpers.ListKeys(etcdEndpoint, nodePath) - return len(nodes) + // Check number of active nodes in discovery file + return len(dfWriter.ListNodes()) }, flags.EventuallyTimeout).Should(gm.Equal(3)) // Verify queries still work @@ -161,7 +219,7 @@ verifyQueryResults(conn, now) ### 3. Recovery Verification ```go // Restart the stopped node -restartedNode := startDataNode(config) +restartedNode := setup.DataNode(clusterConfig, "--node-labels", "role=data") // Wait for handoff queue to drain gm.Eventually(func() int { @@ -172,36 +230,13 @@ gm.Eventually(func() int { verifyQueryResults(conn, now) ``` -## Implementation Order - -1. **Measure (normal mode)** - Create `pkg/test/replicated/measure/` and add measure replication tests -2. **Stream** - Create `pkg/test/replicated/stream/` and add stream replication tests -3. **Trace** - Create `pkg/test/replicated/trace/` and add trace replication tests -4. **TopN** - Create `pkg/test/replicated/topn/` and add TopN replication tests - -## Data Initialization - -The `test/cases/init.go` file will be extended to initialize data for replicated schemas: - -```go -// replicated measure -casesmeasuredata.Write(conn, "service_traffic", "index_mode", "service_traffic_data.json", now, interval) -casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) -// ... (all measures written to their replicated groups) - -// replicated stream -casesstreamdata.Write(conn, "sw", now, interval) -// ... (all streams written to their replicated groups) - -// replicated trace -casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now, interval) -// ... (all traces written to their replicated groups) -``` - ## Acceptance Criteria -1. All existing test cases in `test/cases/measure/data/`, `test/cases/stream/data/`, `test/cases/trace/data/` can run against replicated schemas without modification -2. Replication tests verify deduplication (consistent results from replicas) -3. Replication tests verify accessibility (queries work after node failure) -4. Replication tests verify recovery (handoff queue drains after node restart) -5. Test infrastructure is isolated: replication tests use only `pkg/test/replicated/` schemas +1. **No etcd** - Replication tests do not start embedded etcd server +2. **Property-based schema** - Schemas loaded via `ModeProperty` registry +3. **File-based discovery** - Nodes discover each other via `discovery.yaml` +4. **No test case copying** - Existing test cases reused +5. **Same schema names** - Replicated schemas use identical names to originals +6. **Deduplication verified** - Consistent results from replicas +7. **Accessibility verified** - Queries work after node failure +8. **Recovery verified** - Handoff queue drains after node restart
