This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug-hq-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 8924ec83dcf685233f6b9ebd8f068f24ca91d069 Author: Hongtao Gao <[email protected]> AuthorDate: Sun Apr 12 01:55:04 2026 +0000 fix(handoff): prevent size limit bypass and sidx timestamp corruption in handoff replay - Replace canEnqueue+updateTotalSize with atomic tryReserveSize to close TOCTOU race that allowed concurrent enqueues to exceed the size limit. - Read manifest.json for sidx parts in readPartFromHandoff, populating MinTimestamp/MaxTimestamp with the same fallback logic as streaming sync (pointer field → SegmentID legacy → warn). Previously sidx parts were replayed with MinTimestamp=0, causing rejection or seg-19700101 creation. - Surface metadata read/parse errors instead of silently swallowing them. - Add queryTraceByService to integration test to verify sidx replay via indexed tag lookup, which the existing trace_id-only query couldn't catch. - Clean up orphaned parts when no snapshot references them in tsTable init. --- CHANGES.md | 2 + banyand/measure/tstable.go | 8 + banyand/stream/tstable.go | 8 + banyand/trace/handoff_controller.go | 111 +++++++--- banyand/trace/handoff_replay_test.go | 293 +++++++++++++++++++++++++ banyand/trace/handoff_storage_test.go | 56 +++++ banyand/trace/tstable.go | 8 + test/integration/handoff/handoff_suite_test.go | 44 ++++ 8 files changed, 502 insertions(+), 28 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 939f46a83..dd25bc75f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -30,6 +30,8 @@ Release Notes. - Disable the rotation task on warm and cold nodes to prevent incorrect segment boundaries during lifecycle migration. - Prevent epoch-dated segment directories (seg-19700101) from being created by zero timestamps in distributed sync paths. - Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the actual timestamp, causing sync failures on the receiving node. +- Fix handoff controller TOCTOU race allowing disk size limit bypass, and populate sidx MinTimestamp/MaxTimestamp during replay to prevent corrupt segment creation on recovered nodes. +- Delete orphaned parts when no snapshot references them during tsTable initialization. ### Chores diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index ef191a4e0..c9054d541 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -112,6 +112,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, for _, id := range loadedSnapshots { fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) } + for _, id := range loadedParts { + l.Info().Str("path", partPath(rootPath, id)).Msg("delete orphaned part without snapshot") + fileSystem.MustRMAll(partPath(rootPath, id)) + } return &tst, uint64(time.Now().UnixNano()) } sort.Slice(loadedSnapshots, func(i, j int) bool { @@ -131,6 +135,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, } return &tst, epoch } + for _, id := range loadedParts { + l.Info().Str("path", partPath(rootPath, id)).Msg("delete orphaned part after all snapshots failed to load") + fileSystem.MustRMAll(partPath(rootPath, id)) + } return &tst, uint64(time.Now().UnixNano()) } diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index dc4ea70ba..60ef8dcee 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -290,6 +290,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, for _, id := range loadedSnapshots { fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) } + for _, id := range loadedParts { + l.Info().Str("path", partPath(rootPath, id)).Msg("delete orphaned part without snapshot") + fileSystem.MustRMAll(partPath(rootPath, id)) + } return &tst, uint64(time.Now().UnixNano()), nil } sort.Slice(loadedSnapshots, func(i, j int) bool { @@ -309,6 +313,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, } return &tst, epoch, nil } + for _, id := range loadedParts { + l.Info().Str("path", partPath(rootPath, id)).Msg("delete orphaned part after all snapshots failed to load") + fileSystem.MustRMAll(partPath(rootPath, id)) + } return &tst, uint64(time.Now().UnixNano()), nil } diff --git a/banyand/trace/handoff_controller.go b/banyand/trace/handoff_controller.go index bc45154f3..161b28225 100644 --- a/banyand/trace/handoff_controller.go +++ b/banyand/trace/handoff_controller.go @@ -223,8 +223,8 @@ func (hc *handoffController) enqueueForNode(nodeAddr string, partID uint64, part // Read part size from metadata partSize := hc.readPartSizeFromMetadata(sourcePath, partType) - // Check if enqueue would exceed limit - if !hc.canEnqueue(partSize) { + // Atomically check and reserve size to prevent TOCTOU race between check and update. + if !hc.tryReserveSize(partSize) { currentSize := hc.getTotalSize() return fmt.Errorf("handoff queue full: current=%d MB, limit=%d MB, part=%d MB", currentSize/1024/1024, hc.maxTotalSizeBytes/1024/1024, partSize/1024/1024) @@ -243,16 +243,15 @@ func (hc *handoffController) enqueueForNode(nodeAddr string, partID uint64, part nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr) if err != nil { + hc.updateTotalSize(-int64(partSize)) return fmt.Errorf("failed to get node queue for %s: %w", nodeAddr, err) } - if err := nodeQueue.enqueue(partID, partType, sourcePath, meta); err != nil { - return err + if enqueueErr := nodeQueue.enqueue(partID, partType, sourcePath, meta); enqueueErr != nil { + hc.updateTotalSize(-int64(partSize)) + return enqueueErr } - // Update total size after successful enqueue - hc.updateTotalSize(int64(partSize)) - return nil } @@ -714,15 +713,22 @@ func (hc *handoffController) stats() (partCount int64, totalSize int64) { return count, int64(hc.getTotalSize()) } -// canEnqueue checks if adding a part of the given size would exceed the total size limit. -func (hc *handoffController) canEnqueue(partSize uint64) bool { +// tryReserveSize atomically checks and reserves the given size if within the limit. +// Returns true and increments currentTotalSize if the size can be reserved. +// Returns false if adding partSize would exceed maxTotalSizeBytes. +// Callers must call updateTotalSize(-int64(partSize)) to release on failure. +func (hc *handoffController) tryReserveSize(partSize uint64) bool { if hc.maxTotalSizeBytes == 0 { return true // No limit configured } - hc.sizeMu.RLock() - defer hc.sizeMu.RUnlock() - return hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes + hc.sizeMu.Lock() + defer hc.sizeMu.Unlock() + if hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes { + hc.currentTotalSize += partSize + return true + } + return false } // readPartSizeFromMetadata reads the CompressedSizeBytes from the part's metadata file. @@ -1210,28 +1216,77 @@ func (hc *handoffController) readPartFromHandoff(nodeAddr string, partID uint64, PartType: partType, } - // For core parts, read additional metadata from metadata.json if present - if partType == PartTypeCore { - metadataPath := filepath.Join(partPath, metadataFilename) - if metadataBytes, err := hc.fileSystem.Read(metadataPath); err == nil { - var pm partMetadata - if err := json.Unmarshal(metadataBytes, &pm); err == nil { - streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes - streamingPart.UncompressedSizeBytes = pm.UncompressedSpanSizeBytes - streamingPart.TotalCount = pm.TotalCount - streamingPart.BlocksCount = pm.BlocksCount - streamingPart.MinTimestamp = pm.MinTimestamp - streamingPart.MaxTimestamp = pm.MaxTimestamp - } - } - } - release := func() { for _, buf := range buffers { bigValuePool.Release(buf) } } + // For core parts, read additional metadata from metadata.json. + if partType == PartTypeCore { + metadataPath := filepath.Join(partPath, metadataFilename) + metadataBytes, readErr := hc.fileSystem.Read(metadataPath) + if readErr != nil { + release() + return nil, func() {}, fmt.Errorf("failed to read %s: %w", metadataFilename, readErr) + } + var pm partMetadata + if parseErr := json.Unmarshal(metadataBytes, &pm); parseErr != nil { + release() + return nil, func() {}, fmt.Errorf("failed to parse %s: %w", metadataFilename, parseErr) + } + streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes + streamingPart.UncompressedSizeBytes = pm.UncompressedSpanSizeBytes + streamingPart.TotalCount = pm.TotalCount + streamingPart.BlocksCount = pm.BlocksCount + streamingPart.MinTimestamp = pm.MinTimestamp + streamingPart.MaxTimestamp = pm.MaxTimestamp + } else { + // For sidx parts, read manifest.json and populate timestamps using the same + // fallback logic as streaming sync: MinTimestamp pointer → SegmentID legacy → warn. + manifestPath := filepath.Join(partPath, "manifest.json") + manifestBytes, readErr := hc.fileSystem.Read(manifestPath) + if readErr != nil { + release() + return nil, func() {}, fmt.Errorf("failed to read manifest.json: %w", readErr) + } + var pm struct { + MinTimestamp *int64 `json:"minTimestamp,omitempty"` + MaxTimestamp *int64 `json:"maxTimestamp,omitempty"` + CompressedSizeBytes uint64 `json:"compressedSizeBytes"` + UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` + TotalCount uint64 `json:"totalCount"` + BlocksCount uint64 `json:"blocksCount"` + MinKey int64 `json:"minKey"` + MaxKey int64 `json:"maxKey"` + SegmentID int64 `json:"segmentID"` + } + if parseErr := json.Unmarshal(manifestBytes, &pm); parseErr != nil { + release() + return nil, func() {}, fmt.Errorf("failed to parse manifest.json: %w", parseErr) + } + streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes + streamingPart.UncompressedSizeBytes = pm.UncompressedSizeBytes + streamingPart.TotalCount = pm.TotalCount + streamingPart.BlocksCount = pm.BlocksCount + streamingPart.MinKey = pm.MinKey + streamingPart.MaxKey = pm.MaxKey + switch { + case pm.MinTimestamp != nil: + streamingPart.MinTimestamp = *pm.MinTimestamp + case pm.SegmentID > 0: + streamingPart.MinTimestamp = pm.SegmentID + default: + hc.l.Warn().Uint64("partID", partID).Str("partType", partType). + Msg("sidx handoff replay: part has no valid timestamp (MinTimestamp=nil, SegmentID=0)") + } + if pm.MaxTimestamp != nil { + streamingPart.MaxTimestamp = *pm.MaxTimestamp + } else { + streamingPart.MaxTimestamp = streamingPart.MinTimestamp + } + } + return streamingPart, release, nil } diff --git a/banyand/trace/handoff_replay_test.go b/banyand/trace/handoff_replay_test.go index ec0610cdb..2ab134f53 100644 --- a/banyand/trace/handoff_replay_test.go +++ b/banyand/trace/handoff_replay_test.go @@ -404,3 +404,296 @@ func TestHandoffController_SendPartToNode(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, mockClient.getSendCount()) } + +// createTestSidxPart creates a minimal sidx part directory with the given manifest content. +func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string, partID uint64, manifestContent []byte) string { + t.Helper() + partPath := filepath.Join(root, partName(partID)) + fileSystem.MkdirIfNotExist(partPath, storage.DirPerm) + + sidxFiles := map[string][]byte{ + "primary.bin": []byte("test primary data"), + "data.bin": []byte("test data"), + "keys.bin": []byte("test keys"), + "meta.bin": []byte("test meta"), + } + if manifestContent != nil { + sidxFiles["manifest.json"] = manifestContent + } + + for filename, content := range sidxFiles { + filePath := filepath.Join(partPath, filename) + lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm) + require.NoError(t, err) + _, err = lf.Write(content) + require.NoError(t, err) + } + + return partPath +} + +func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x60) + partPath := filepath.Join(sourceRoot, partName(partID)) + fileSystem.MkdirIfNotExist(partPath, storage.DirPerm) + + coreFiles := map[string][]byte{ + "primary.bin": []byte("primary data"), + "spans.bin": []byte("spans data"), + "meta.bin": []byte("meta data"), + "metadata.json": []byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`), + "tag.type": []byte("tag type"), + "traceID.filter": []byte("filter"), + } + for filename, content := range coreFiles { + filePath := filepath.Join(partPath, filename) + lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm) + require.NoError(t, err) + _, err = lf.Write(content) + require.NoError(t, err) + } + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, PartTypeCore, partPath, "group1", 1)) + + streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, PartTypeCore) + require.NoError(t, err) + defer release() + + assert.Equal(t, uint64(1024), streamingPart.CompressedSizeBytes) + assert.Equal(t, uint64(2048), streamingPart.UncompressedSizeBytes) + assert.Equal(t, uint64(50), streamingPart.TotalCount) + assert.Equal(t, uint64(5), streamingPart.BlocksCount) + assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp) + assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp) +} + +func TestHandoffController_ReadPartFromHandoff_CoreMissingMetadata(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x61) + partPath := filepath.Join(sourceRoot, partName(partID)) + fileSystem.MkdirIfNotExist(partPath, storage.DirPerm) + + coreFiles := map[string][]byte{ + "primary.bin": []byte("primary data"), + "spans.bin": []byte("spans data"), + "meta.bin": []byte("meta data"), + } + for filename, content := range coreFiles { + filePath := filepath.Join(partPath, filename) + lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm) + require.NoError(t, err) + _, err = lf.Write(content) + require.NoError(t, err) + } + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, PartTypeCore, partPath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, PartTypeCore) + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "failed to read metadata.json") +} + +func TestHandoffController_ReadPartFromHandoff_CoreInvalidMetadata(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x62) + partPath := filepath.Join(sourceRoot, partName(partID)) + fileSystem.MkdirIfNotExist(partPath, storage.DirPerm) + + coreFiles := map[string][]byte{ + "primary.bin": []byte("primary data"), + "spans.bin": []byte("spans data"), + "meta.bin": []byte("meta data"), + "metadata.json": []byte(`{invalid json`), + } + for filename, content := range coreFiles { + filePath := filepath.Join(partPath, filename) + lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm) + require.NoError(t, err) + _, err = lf.Write(content) + require.NoError(t, err) + } + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, PartTypeCore, partPath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, PartTypeCore) + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "failed to parse metadata.json") +} + +func TestHandoffController_ReadPartFromHandoff_SidxWithTimestamps(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x70) + + manifest := []byte(`{ + "minTimestamp": 1700000000, + "maxTimestamp": 1700001000, + "compressedSizeBytes": 512, + "uncompressedSizeBytes": 1024, + "totalCount": 20, + "blocksCount": 2, + "minKey": 10, + "maxKey": 200, + "segmentID": 1700099999 + }`) + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.NoError(t, err) + defer release() + + assert.Equal(t, partID, streamingPart.ID) + assert.Equal(t, "group1", streamingPart.Group) + assert.Equal(t, "sidx_trace_id", streamingPart.PartType) + assert.Equal(t, uint64(512), streamingPart.CompressedSizeBytes) + assert.Equal(t, uint64(1024), streamingPart.UncompressedSizeBytes) + assert.Equal(t, uint64(20), streamingPart.TotalCount) + assert.Equal(t, uint64(2), streamingPart.BlocksCount) + assert.Equal(t, int64(10), streamingPart.MinKey) + assert.Equal(t, int64(200), streamingPart.MaxKey) + // MinTimestamp comes from the pointer field, not SegmentID + assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp) + assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp) +} + +func TestHandoffController_ReadPartFromHandoff_SidxWithSegmentIDFallback(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x71) + + // Legacy manifest: no minTimestamp/maxTimestamp, only segmentID + manifest := []byte(`{ + "compressedSizeBytes": 256, + "uncompressedSizeBytes": 512, + "totalCount": 10, + "blocksCount": 1, + "minKey": 5, + "maxKey": 100, + "segmentID": 1700050000 + }`) + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.NoError(t, err) + defer release() + + // MinTimestamp falls back to SegmentID; MaxTimestamp falls back to MinTimestamp + assert.Equal(t, int64(1700050000), streamingPart.MinTimestamp) + assert.Equal(t, int64(1700050000), streamingPart.MaxTimestamp) + assert.Equal(t, uint64(256), streamingPart.CompressedSizeBytes) +} + +func TestHandoffController_ReadPartFromHandoff_SidxMissingManifest(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x72) + + // Create sidx part without manifest.json + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, nil) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "failed to read manifest.json") +} + +func TestHandoffController_ReadPartFromHandoff_SidxInvalidManifest(t *testing.T) { + tempDir, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + + sourceRoot := filepath.Join(tempDir, "source") + fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm) + partID := uint64(0x73) + + manifest := []byte(`{broken json`) + sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, manifest) + + nodeAddr := testNodeAddrPrimary + controller, err := newHandoffController(fileSystem, tempDir, nil, []string{nodeAddr}, 0, l, nil) + require.NoError(t, err) + defer controller.close() + + require.NoError(t, controller.enqueueForNode(nodeAddr, partID, "sidx_trace_id", sourcePath, "group1", 1)) + + _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, "sidx_trace_id") + require.Error(t, readErr) + assert.Contains(t, readErr.Error(), "failed to parse manifest.json") +} diff --git a/banyand/trace/handoff_storage_test.go b/banyand/trace/handoff_storage_test.go index 93084afff..4ca00ca8f 100644 --- a/banyand/trace/handoff_storage_test.go +++ b/banyand/trace/handoff_storage_test.go @@ -22,6 +22,7 @@ import ( "errors" "os" "path/filepath" + "sync" "testing" "time" @@ -690,3 +691,58 @@ func TestHandoffController_SizeRecovery(t *testing.T) { tester.NoError(err) tester.Len(node2Pending, 1) } + +// TestHandoffController_ConcurrentSizeEnforcement verifies that concurrent enqueues +// cannot exceed the size limit due to the atomic tryReserveSize check. +func TestHandoffController_ConcurrentSizeEnforcement(t *testing.T) { + tester := require.New(t) + tempDir, deferFunc := test.Space(tester) + defer deferFunc() + + // Create a source part with 3MB metadata + partID := uint64(500) + partPath := filepath.Join(tempDir, "source", partName(partID)) + tester.NoError(os.MkdirAll(partPath, 0o755)) + + metadata := map[string]interface{}{ + "compressedSizeBytes": 3 * megabyte, + } + metadataBytes, err := json.Marshal(metadata) + tester.NoError(err) + tester.NoError(os.WriteFile(filepath.Join(partPath, "metadata.json"), metadataBytes, 0o600)) + tester.NoError(os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("test data"), 0o600)) + + // 10MB limit allows at most 3 parts of 3MB each + lfs := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + hc, err := newHandoffController(lfs, tempDir, nil, []string{"node1:17912"}, 10*megabyte, l, nil) + tester.NoError(err) + defer hc.close() + + const numGoroutines = 20 + var wg sync.WaitGroup + var successCount int64 + var mu sync.Mutex + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + enqueueErr := hc.enqueueForNode("node1:17912", partID+uint64(idx), PartTypeCore, partPath, "group1", 1) + if enqueueErr == nil { + mu.Lock() + successCount++ + mu.Unlock() + } + }(i) + } + wg.Wait() + + // At most 3 enqueues should succeed (3 * 3MB = 9MB <= 10MB) + tester.LessOrEqual(successCount, int64(3)) + + // Total size must not exceed 10MB limit + totalSize := hc.getTotalSize() + tester.LessOrEqual(totalSize, uint64(10*megabyte)) + tester.Equal(uint64(successCount)*3*megabyte, totalSize) +} diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go index dcb68f9b9..741f1221f 100644 --- a/banyand/trace/tstable.go +++ b/banyand/trace/tstable.go @@ -279,6 +279,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, for _, id := range loadedSnapshots { fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) } + for _, id := range loadedParts { + l.Info().Str("path", partPath(rootPath, id)).Msg("delete orphaned part without snapshot") + fileSystem.MustRMAll(partPath(rootPath, id)) + } return &tst, uint64(time.Now().UnixNano()) } sort.Slice(loadedSnapshots, func(i, j int) bool { @@ -298,6 +302,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, } return &tst, epoch } + for _, id := range loadedParts { + l.Info().Str("path", partPath(rootPath, id)).Msg("delete orphaned part after all snapshots failed to load") + fileSystem.MustRMAll(partPath(rootPath, id)) + } return &tst, uint64(time.Now().UnixNano()) } diff --git a/test/integration/handoff/handoff_suite_test.go b/test/integration/handoff/handoff_suite_test.go index 22ca237ec..e1af1ca70 100644 --- a/test/integration/handoff/handoff_suite_test.go +++ b/test/integration/handoff/handoff_suite_test.go @@ -370,6 +370,11 @@ var _ = Describe("trace handoff", func() { return queryTrace(connection, traceID, writeTime) }, flags.EventuallyTimeout).Should(Succeed()) + By("verifying sidx data was replayed correctly by querying via indexed tag") + Eventually(func() error { + return queryTraceByService(connection, "handoff_service", writeTime) + }, flags.EventuallyTimeout).Should(Succeed()) + var otherIndex int for idx := range dnHandles { if idx != targetIndex { @@ -535,3 +540,42 @@ func waitForPendingParts(nodeAddr string, timeout time.Duration) bool { time.Sleep(100 * time.Millisecond) } } + +// queryTraceByService queries traces by service_id, which exercises the sidx secondary index. +// If sidx parts were replayed with MinTimestamp=0, the sidx data is rejected or corrupted, +// and this query fails to find traces even though core data exists. +func queryTraceByService(conn *grpc.ClientConn, serviceID string, ts time.Time) error { + client := tracev1.NewTraceServiceClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + req := &tracev1.QueryRequest{ + Groups: []string{"test-trace-group"}, + Name: "sw", + TimeRange: &modelv1.TimeRange{ + Begin: timestamppb.New(ts.Add(-5 * time.Minute)), + End: timestamppb.New(ts.Add(5 * time.Minute)), + }, + Criteria: &modelv1.Criteria{ + Exp: &modelv1.Criteria_Condition{ + Condition: &modelv1.Condition{ + Name: "service_id", + Op: modelv1.Condition_BINARY_OP_EQ, + Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: serviceID}, + }}, + }, + }, + }, + TagProjection: []string{"trace_id", "service_id"}, + } + + resp, queryErr := client.Query(ctx, req) + if queryErr != nil { + return queryErr + } + if len(resp.GetTraces()) == 0 { + return fmt.Errorf("no traces found for service_id=%s", serviceID) + } + return nil +}
