This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug-hq-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 8924ec83dcf685233f6b9ebd8f068f24ca91d069
Author: Hongtao Gao <[email protected]>
AuthorDate: Sun Apr 12 01:55:04 2026 +0000

    fix(handoff): prevent size limit bypass and sidx timestamp corruption in 
handoff replay
    
    - Replace canEnqueue+updateTotalSize with atomic tryReserveSize to close
      TOCTOU race that allowed concurrent enqueues to exceed the size limit.
    - Read manifest.json for sidx parts in readPartFromHandoff, populating
      MinTimestamp/MaxTimestamp with the same fallback logic as streaming sync
      (pointer field → SegmentID legacy → warn). Previously sidx parts were
      replayed with MinTimestamp=0, causing rejection or seg-19700101 creation.
    - Surface metadata read/parse errors instead of silently swallowing them.
    - Add queryTraceByService to integration test to verify sidx replay via
      indexed tag lookup, which the existing trace_id-only query couldn't catch.
    - Clean up orphaned parts when no snapshot references them in tsTable init.
---
 CHANGES.md                                     |   2 +
 banyand/measure/tstable.go                     |   8 +
 banyand/stream/tstable.go                      |   8 +
 banyand/trace/handoff_controller.go            | 111 +++++++---
 banyand/trace/handoff_replay_test.go           | 293 +++++++++++++++++++++++++
 banyand/trace/handoff_storage_test.go          |  56 +++++
 banyand/trace/tstable.go                       |   8 +
 test/integration/handoff/handoff_suite_test.go |  44 ++++
 8 files changed, 502 insertions(+), 28 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 939f46a83..dd25bc75f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -30,6 +30,8 @@ Release Notes.
 - Disable the rotation task on warm and cold nodes to prevent incorrect 
segment boundaries during lifecycle migration.
 - Prevent epoch-dated segment directories (seg-19700101) from being created by 
zero timestamps in distributed sync paths.
 - Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the 
actual timestamp, causing sync failures on the receiving node.
+- Fix handoff controller TOCTOU race allowing disk size limit bypass, and 
populate sidx MinTimestamp/MaxTimestamp during replay to prevent corrupt 
segment creation on recovered nodes.
+- Delete orphaned parts when no snapshot references them during tsTable 
initialization.
 
 ### Chores
 
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index ef191a4e0..c9054d541 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -112,6 +112,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
                for _, id := range loadedSnapshots {
                        fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
                }
+               for _, id := range loadedParts {
+                       l.Info().Str("path", partPath(rootPath, 
id)).Msg("delete orphaned part without snapshot")
+                       fileSystem.MustRMAll(partPath(rootPath, id))
+               }
                return &tst, uint64(time.Now().UnixNano())
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -131,6 +135,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
                }
                return &tst, epoch
        }
+       for _, id := range loadedParts {
+               l.Info().Str("path", partPath(rootPath, id)).Msg("delete 
orphaned part after all snapshots failed to load")
+               fileSystem.MustRMAll(partPath(rootPath, id))
+       }
        return &tst, uint64(time.Now().UnixNano())
 }
 
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index dc4ea70ba..60ef8dcee 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -290,6 +290,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
                for _, id := range loadedSnapshots {
                        fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
                }
+               for _, id := range loadedParts {
+                       l.Info().Str("path", partPath(rootPath, 
id)).Msg("delete orphaned part without snapshot")
+                       fileSystem.MustRMAll(partPath(rootPath, id))
+               }
                return &tst, uint64(time.Now().UnixNano()), nil
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -309,6 +313,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
                }
                return &tst, epoch, nil
        }
+       for _, id := range loadedParts {
+               l.Info().Str("path", partPath(rootPath, id)).Msg("delete 
orphaned part after all snapshots failed to load")
+               fileSystem.MustRMAll(partPath(rootPath, id))
+       }
        return &tst, uint64(time.Now().UnixNano()), nil
 }
 
diff --git a/banyand/trace/handoff_controller.go 
b/banyand/trace/handoff_controller.go
index bc45154f3..161b28225 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -223,8 +223,8 @@ func (hc *handoffController) enqueueForNode(nodeAddr 
string, partID uint64, part
        // Read part size from metadata
        partSize := hc.readPartSizeFromMetadata(sourcePath, partType)
 
-       // Check if enqueue would exceed limit
-       if !hc.canEnqueue(partSize) {
+       // Atomically check and reserve size to prevent TOCTOU race between 
check and update.
+       if !hc.tryReserveSize(partSize) {
                currentSize := hc.getTotalSize()
                return fmt.Errorf("handoff queue full: current=%d MB, limit=%d 
MB, part=%d MB",
                        currentSize/1024/1024, hc.maxTotalSizeBytes/1024/1024, 
partSize/1024/1024)
@@ -243,16 +243,15 @@ func (hc *handoffController) enqueueForNode(nodeAddr 
string, partID uint64, part
 
        nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
        if err != nil {
+               hc.updateTotalSize(-int64(partSize))
                return fmt.Errorf("failed to get node queue for %s: %w", 
nodeAddr, err)
        }
 
-       if err := nodeQueue.enqueue(partID, partType, sourcePath, meta); err != 
nil {
-               return err
+       if enqueueErr := nodeQueue.enqueue(partID, partType, sourcePath, meta); 
enqueueErr != nil {
+               hc.updateTotalSize(-int64(partSize))
+               return enqueueErr
        }
 
-       // Update total size after successful enqueue
-       hc.updateTotalSize(int64(partSize))
-
        return nil
 }
 
@@ -714,15 +713,22 @@ func (hc *handoffController) stats() (partCount int64, 
totalSize int64) {
        return count, int64(hc.getTotalSize())
 }
 
-// canEnqueue checks if adding a part of the given size would exceed the total 
size limit.
-func (hc *handoffController) canEnqueue(partSize uint64) bool {
+// tryReserveSize atomically checks and reserves the given size if within the 
limit.
+// Returns true and increments currentTotalSize if the size can be reserved.
+// Returns false if adding partSize would exceed maxTotalSizeBytes.
+// Callers must call updateTotalSize(-int64(partSize)) to release on failure.
+func (hc *handoffController) tryReserveSize(partSize uint64) bool {
        if hc.maxTotalSizeBytes == 0 {
                return true // No limit configured
        }
 
-       hc.sizeMu.RLock()
-       defer hc.sizeMu.RUnlock()
-       return hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes
+       hc.sizeMu.Lock()
+       defer hc.sizeMu.Unlock()
+       if hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes {
+               hc.currentTotalSize += partSize
+               return true
+       }
+       return false
 }
 
 // readPartSizeFromMetadata reads the CompressedSizeBytes from the part's 
metadata file.
@@ -1210,28 +1216,77 @@ func (hc *handoffController) 
readPartFromHandoff(nodeAddr string, partID uint64,
                PartType: partType,
        }
 
-       // For core parts, read additional metadata from metadata.json if 
present
-       if partType == PartTypeCore {
-               metadataPath := filepath.Join(partPath, metadataFilename)
-               if metadataBytes, err := hc.fileSystem.Read(metadataPath); err 
== nil {
-                       var pm partMetadata
-                       if err := json.Unmarshal(metadataBytes, &pm); err == 
nil {
-                               streamingPart.CompressedSizeBytes = 
pm.CompressedSizeBytes
-                               streamingPart.UncompressedSizeBytes = 
pm.UncompressedSpanSizeBytes
-                               streamingPart.TotalCount = pm.TotalCount
-                               streamingPart.BlocksCount = pm.BlocksCount
-                               streamingPart.MinTimestamp = pm.MinTimestamp
-                               streamingPart.MaxTimestamp = pm.MaxTimestamp
-                       }
-               }
-       }
-
        release := func() {
                for _, buf := range buffers {
                        bigValuePool.Release(buf)
                }
        }
 
+       // For core parts, read additional metadata from metadata.json.
+       if partType == PartTypeCore {
+               metadataPath := filepath.Join(partPath, metadataFilename)
+               metadataBytes, readErr := hc.fileSystem.Read(metadataPath)
+               if readErr != nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to read %s: 
%w", metadataFilename, readErr)
+               }
+               var pm partMetadata
+               if parseErr := json.Unmarshal(metadataBytes, &pm); parseErr != 
nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to parse %s: 
%w", metadataFilename, parseErr)
+               }
+               streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+               streamingPart.UncompressedSizeBytes = 
pm.UncompressedSpanSizeBytes
+               streamingPart.TotalCount = pm.TotalCount
+               streamingPart.BlocksCount = pm.BlocksCount
+               streamingPart.MinTimestamp = pm.MinTimestamp
+               streamingPart.MaxTimestamp = pm.MaxTimestamp
+       } else {
+               // For sidx parts, read manifest.json and populate timestamps 
using the same
+               // fallback logic as streaming sync: MinTimestamp pointer → 
SegmentID legacy → warn.
+               manifestPath := filepath.Join(partPath, "manifest.json")
+               manifestBytes, readErr := hc.fileSystem.Read(manifestPath)
+               if readErr != nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to read 
manifest.json: %w", readErr)
+               }
+               var pm struct {
+                       MinTimestamp          *int64 
`json:"minTimestamp,omitempty"`
+                       MaxTimestamp          *int64 
`json:"maxTimestamp,omitempty"`
+                       CompressedSizeBytes   uint64 
`json:"compressedSizeBytes"`
+                       UncompressedSizeBytes uint64 
`json:"uncompressedSizeBytes"`
+                       TotalCount            uint64 `json:"totalCount"`
+                       BlocksCount           uint64 `json:"blocksCount"`
+                       MinKey                int64  `json:"minKey"`
+                       MaxKey                int64  `json:"maxKey"`
+                       SegmentID             int64  `json:"segmentID"`
+               }
+               if parseErr := json.Unmarshal(manifestBytes, &pm); parseErr != 
nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to parse 
manifest.json: %w", parseErr)
+               }
+               streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+               streamingPart.UncompressedSizeBytes = pm.UncompressedSizeBytes
+               streamingPart.TotalCount = pm.TotalCount
+               streamingPart.BlocksCount = pm.BlocksCount
+               streamingPart.MinKey = pm.MinKey
+               streamingPart.MaxKey = pm.MaxKey
+               switch {
+               case pm.MinTimestamp != nil:
+                       streamingPart.MinTimestamp = *pm.MinTimestamp
+               case pm.SegmentID > 0:
+                       streamingPart.MinTimestamp = pm.SegmentID
+               default:
+                       hc.l.Warn().Uint64("partID", partID).Str("partType", 
partType).
+                               Msg("sidx handoff replay: part has no valid 
timestamp (MinTimestamp=nil, SegmentID=0)")
+               }
+               if pm.MaxTimestamp != nil {
+                       streamingPart.MaxTimestamp = *pm.MaxTimestamp
+               } else {
+                       streamingPart.MaxTimestamp = streamingPart.MinTimestamp
+               }
+       }
+
        return streamingPart, release, nil
 }
 
diff --git a/banyand/trace/handoff_replay_test.go 
b/banyand/trace/handoff_replay_test.go
index ec0610cdb..2ab134f53 100644
--- a/banyand/trace/handoff_replay_test.go
+++ b/banyand/trace/handoff_replay_test.go
@@ -404,3 +404,296 @@ func TestHandoffController_SendPartToNode(t *testing.T) {
        require.NoError(t, err)
        assert.Equal(t, 1, mockClient.getSendCount())
 }
+
+// createTestSidxPart creates a minimal sidx part directory with the given 
manifest content.
+func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string, 
partID uint64, manifestContent []byte) string {
+       t.Helper()
+       partPath := filepath.Join(root, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       sidxFiles := map[string][]byte{
+               "primary.bin": []byte("test primary data"),
+               "data.bin":    []byte("test data"),
+               "keys.bin":    []byte("test keys"),
+               "meta.bin":    []byte("test meta"),
+       }
+       if manifestContent != nil {
+               sidxFiles["manifest.json"] = manifestContent
+       }
+
+       for filename, content := range sidxFiles {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)
+       }
+
+       return partPath
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x60)
+       partPath := filepath.Join(sourceRoot, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       coreFiles := map[string][]byte{
+               "primary.bin":    []byte("primary data"),
+               "spans.bin":      []byte("spans data"),
+               "meta.bin":       []byte("meta data"),
+               "metadata.json":  
[]byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`),
+               "tag.type":       []byte("tag type"),
+               "traceID.filter": []byte("filter"),
+       }
+       for filename, content := range coreFiles {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)
+       }
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
PartTypeCore, partPath, "group1", 1))
+
+       streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, 
partID, PartTypeCore)
+       require.NoError(t, err)
+       defer release()
+
+       assert.Equal(t, uint64(1024), streamingPart.CompressedSizeBytes)
+       assert.Equal(t, uint64(2048), streamingPart.UncompressedSizeBytes)
+       assert.Equal(t, uint64(50), streamingPart.TotalCount)
+       assert.Equal(t, uint64(5), streamingPart.BlocksCount)
+       assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp)
+       assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp)
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreMissingMetadata(t 
*testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x61)
+       partPath := filepath.Join(sourceRoot, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       coreFiles := map[string][]byte{
+               "primary.bin": []byte("primary data"),
+               "spans.bin":   []byte("spans data"),
+               "meta.bin":    []byte("meta data"),
+       }
+       for filename, content := range coreFiles {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)
+       }
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
PartTypeCore, partPath, "group1", 1))
+
+       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
PartTypeCore)
+       require.Error(t, readErr)
+       assert.Contains(t, readErr.Error(), "failed to read metadata.json")
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreInvalidMetadata(t 
*testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x62)
+       partPath := filepath.Join(sourceRoot, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       coreFiles := map[string][]byte{
+               "primary.bin":   []byte("primary data"),
+               "spans.bin":     []byte("spans data"),
+               "meta.bin":      []byte("meta data"),
+               "metadata.json": []byte(`{invalid json`),
+       }
+       for filename, content := range coreFiles {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)
+       }
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
PartTypeCore, partPath, "group1", 1))
+
+       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
PartTypeCore)
+       require.Error(t, readErr)
+       assert.Contains(t, readErr.Error(), "failed to parse metadata.json")
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxWithTimestamps(t 
*testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x70)
+
+       manifest := []byte(`{
+               "minTimestamp": 1700000000,
+               "maxTimestamp": 1700001000,
+               "compressedSizeBytes": 512,
+               "uncompressedSizeBytes": 1024,
+               "totalCount": 20,
+               "blocksCount": 2,
+               "minKey": 10,
+               "maxKey": 200,
+               "segmentID": 1700099999
+       }`)
+       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, 
manifest)
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
+
+       streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, 
partID, "sidx_trace_id")
+       require.NoError(t, err)
+       defer release()
+
+       assert.Equal(t, partID, streamingPart.ID)
+       assert.Equal(t, "group1", streamingPart.Group)
+       assert.Equal(t, "sidx_trace_id", streamingPart.PartType)
+       assert.Equal(t, uint64(512), streamingPart.CompressedSizeBytes)
+       assert.Equal(t, uint64(1024), streamingPart.UncompressedSizeBytes)
+       assert.Equal(t, uint64(20), streamingPart.TotalCount)
+       assert.Equal(t, uint64(2), streamingPart.BlocksCount)
+       assert.Equal(t, int64(10), streamingPart.MinKey)
+       assert.Equal(t, int64(200), streamingPart.MaxKey)
+       // MinTimestamp comes from the pointer field, not SegmentID
+       assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp)
+       assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp)
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxWithSegmentIDFallback(t 
*testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x71)
+
+       // Legacy manifest: no minTimestamp/maxTimestamp, only segmentID
+       manifest := []byte(`{
+               "compressedSizeBytes": 256,
+               "uncompressedSizeBytes": 512,
+               "totalCount": 10,
+               "blocksCount": 1,
+               "minKey": 5,
+               "maxKey": 100,
+               "segmentID": 1700050000
+       }`)
+       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, 
manifest)
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
+
+       streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, 
partID, "sidx_trace_id")
+       require.NoError(t, err)
+       defer release()
+
+       // MinTimestamp falls back to SegmentID; MaxTimestamp falls back to 
MinTimestamp
+       assert.Equal(t, int64(1700050000), streamingPart.MinTimestamp)
+       assert.Equal(t, int64(1700050000), streamingPart.MaxTimestamp)
+       assert.Equal(t, uint64(256), streamingPart.CompressedSizeBytes)
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxMissingManifest(t 
*testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x72)
+
+       // Create sidx part without manifest.json
+       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, nil)
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
+
+       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
"sidx_trace_id")
+       require.Error(t, readErr)
+       assert.Contains(t, readErr.Error(), "failed to read manifest.json")
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxInvalidManifest(t 
*testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x73)
+
+       manifest := []byte(`{broken json`)
+       sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, 
manifest)
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       require.NoError(t, controller.enqueueForNode(nodeAddr, partID, 
"sidx_trace_id", sourcePath, "group1", 1))
+
+       _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID, 
"sidx_trace_id")
+       require.Error(t, readErr)
+       assert.Contains(t, readErr.Error(), "failed to parse manifest.json")
+}
diff --git a/banyand/trace/handoff_storage_test.go 
b/banyand/trace/handoff_storage_test.go
index 93084afff..4ca00ca8f 100644
--- a/banyand/trace/handoff_storage_test.go
+++ b/banyand/trace/handoff_storage_test.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "os"
        "path/filepath"
+       "sync"
        "testing"
        "time"
 
@@ -690,3 +691,58 @@ func TestHandoffController_SizeRecovery(t *testing.T) {
        tester.NoError(err)
        tester.Len(node2Pending, 1)
 }
+
+// TestHandoffController_ConcurrentSizeEnforcement verifies that concurrent 
enqueues
+// cannot exceed the size limit due to the atomic tryReserveSize check.
+func TestHandoffController_ConcurrentSizeEnforcement(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       // Create a source part with 3MB metadata
+       partID := uint64(500)
+       partPath := filepath.Join(tempDir, "source", partName(partID))
+       tester.NoError(os.MkdirAll(partPath, 0o755))
+
+       metadata := map[string]interface{}{
+               "compressedSizeBytes": 3 * megabyte,
+       }
+       metadataBytes, err := json.Marshal(metadata)
+       tester.NoError(err)
+       tester.NoError(os.WriteFile(filepath.Join(partPath, "metadata.json"), 
metadataBytes, 0o600))
+       tester.NoError(os.WriteFile(filepath.Join(partPath, "data.bin"), 
[]byte("test data"), 0o600))
+
+       // 10MB limit allows at most 3 parts of 3MB each
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       hc, err := newHandoffController(lfs, tempDir, nil, 
[]string{"node1:17912"}, 10*megabyte, l, nil)
+       tester.NoError(err)
+       defer hc.close()
+
+       const numGoroutines = 20
+       var wg sync.WaitGroup
+       var successCount int64
+       var mu sync.Mutex
+
+       for i := 0; i < numGoroutines; i++ {
+               wg.Add(1)
+               go func(idx int) {
+                       defer wg.Done()
+                       enqueueErr := hc.enqueueForNode("node1:17912", 
partID+uint64(idx), PartTypeCore, partPath, "group1", 1)
+                       if enqueueErr == nil {
+                               mu.Lock()
+                               successCount++
+                               mu.Unlock()
+                       }
+               }(i)
+       }
+       wg.Wait()
+
+       // At most 3 enqueues should succeed (3 * 3MB = 9MB <= 10MB)
+       tester.LessOrEqual(successCount, int64(3))
+
+       // Total size must not exceed 10MB limit
+       totalSize := hc.getTotalSize()
+       tester.LessOrEqual(totalSize, uint64(10*megabyte))
+       tester.Equal(uint64(successCount)*3*megabyte, totalSize)
+}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index dcb68f9b9..741f1221f 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -279,6 +279,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
                for _, id := range loadedSnapshots {
                        fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
                }
+               for _, id := range loadedParts {
+                       l.Info().Str("path", partPath(rootPath, 
id)).Msg("delete orphaned part without snapshot")
+                       fileSystem.MustRMAll(partPath(rootPath, id))
+               }
                return &tst, uint64(time.Now().UnixNano())
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -298,6 +302,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
                }
                return &tst, epoch
        }
+       for _, id := range loadedParts {
+               l.Info().Str("path", partPath(rootPath, id)).Msg("delete 
orphaned part after all snapshots failed to load")
+               fileSystem.MustRMAll(partPath(rootPath, id))
+       }
        return &tst, uint64(time.Now().UnixNano())
 }
 
diff --git a/test/integration/handoff/handoff_suite_test.go 
b/test/integration/handoff/handoff_suite_test.go
index 22ca237ec..e1af1ca70 100644
--- a/test/integration/handoff/handoff_suite_test.go
+++ b/test/integration/handoff/handoff_suite_test.go
@@ -370,6 +370,11 @@ var _ = Describe("trace handoff", func() {
                        return queryTrace(connection, traceID, writeTime)
                }, flags.EventuallyTimeout).Should(Succeed())
 
+               By("verifying sidx data was replayed correctly by querying via 
indexed tag")
+               Eventually(func() error {
+                       return queryTraceByService(connection, 
"handoff_service", writeTime)
+               }, flags.EventuallyTimeout).Should(Succeed())
+
                var otherIndex int
                for idx := range dnHandles {
                        if idx != targetIndex {
@@ -535,3 +540,42 @@ func waitForPendingParts(nodeAddr string, timeout 
time.Duration) bool {
                time.Sleep(100 * time.Millisecond)
        }
 }
+
+// queryTraceByService queries traces by service_id, which exercises the sidx 
secondary index.
+// If sidx parts were replayed with MinTimestamp=0, the sidx data is rejected 
or corrupted,
+// and this query fails to find traces even though core data exists.
+func queryTraceByService(conn *grpc.ClientConn, serviceID string, ts 
time.Time) error {
+       client := tracev1.NewTraceServiceClient(conn)
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
+
+       req := &tracev1.QueryRequest{
+               Groups: []string{"test-trace-group"},
+               Name:   "sw",
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(ts.Add(-5 * time.Minute)),
+                       End:   timestamppb.New(ts.Add(5 * time.Minute)),
+               },
+               Criteria: &modelv1.Criteria{
+                       Exp: &modelv1.Criteria_Condition{
+                               Condition: &modelv1.Condition{
+                                       Name: "service_id",
+                                       Op:   modelv1.Condition_BINARY_OP_EQ,
+                                       Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{
+                                               Str: &modelv1.Str{Value: 
serviceID},
+                                       }},
+                               },
+                       },
+               },
+               TagProjection: []string{"trace_id", "service_id"},
+       }
+
+       resp, queryErr := client.Query(ctx, req)
+       if queryErr != nil {
+               return queryErr
+       }
+       if len(resp.GetTraces()) == 0 {
+               return fmt.Errorf("no traces found for service_id=%s", 
serviceID)
+       }
+       return nil
+}

Reply via email to