This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 8105dfe1b fix(handoff): prevent size limit bypass and sidx timestamp
corruption in handoff replay (#1064)
8105dfe1b is described below
commit 8105dfe1bd9787d8acdb5e9d9b780d85eb4db9a7
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Apr 12 12:51:04 2026 +0800
fix(handoff): prevent size limit bypass and sidx timestamp corruption in
handoff replay (#1064)
---
CHANGES.md | 2 +
banyand/measure/tstable.go | 8 +
banyand/stream/tstable.go | 8 +
banyand/trace/handoff_controller.go | 113 ++++++---
banyand/trace/handoff_replay_test.go | 332 +++++++++++++++++++++++++
banyand/trace/handoff_storage_test.go | 56 +++++
banyand/trace/tstable.go | 8 +
test/integration/handoff/handoff_suite_test.go | 44 ++++
8 files changed, 540 insertions(+), 31 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 939f46a83..dd25bc75f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -30,6 +30,8 @@ Release Notes.
- Disable the rotation task on warm and cold nodes to prevent incorrect
segment boundaries during lifecycle migration.
- Prevent epoch-dated segment directories (seg-19700101) from being created by
zero timestamps in distributed sync paths.
- Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the
actual timestamp, causing sync failures on the receiving node.
+- Fix handoff controller TOCTOU race allowing disk size limit bypass, and
populate sidx MinTimestamp/MaxTimestamp during replay to prevent corrupt
segment creation on recovered nodes.
+- Delete orphaned parts when no snapshot references them during tsTable
initialization.
### Chores
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index ef191a4e0..c9054d541 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -112,6 +112,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
for _, id := range loadedSnapshots {
fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
}
+ for _, id := range loadedParts {
+ l.Info().Str("path", partPath(rootPath,
id)).Msg("delete orphaned part without snapshot")
+ fileSystem.MustRMAll(partPath(rootPath, id))
+ }
return &tst, uint64(time.Now().UnixNano())
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -131,6 +135,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
}
return &tst, epoch
}
+ for _, id := range loadedParts {
+ l.Info().Str("path", partPath(rootPath, id)).Msg("delete
orphaned part after all snapshots failed to load")
+ fileSystem.MustRMAll(partPath(rootPath, id))
+ }
return &tst, uint64(time.Now().UnixNano())
}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index dc4ea70ba..60ef8dcee 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -290,6 +290,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
for _, id := range loadedSnapshots {
fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
}
+ for _, id := range loadedParts {
+ l.Info().Str("path", partPath(rootPath,
id)).Msg("delete orphaned part without snapshot")
+ fileSystem.MustRMAll(partPath(rootPath, id))
+ }
return &tst, uint64(time.Now().UnixNano()), nil
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -309,6 +313,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
}
return &tst, epoch, nil
}
+ for _, id := range loadedParts {
+ l.Info().Str("path", partPath(rootPath, id)).Msg("delete
orphaned part after all snapshots failed to load")
+ fileSystem.MustRMAll(partPath(rootPath, id))
+ }
return &tst, uint64(time.Now().UnixNano()), nil
}
diff --git a/banyand/trace/handoff_controller.go
b/banyand/trace/handoff_controller.go
index bc45154f3..4e69f7559 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -223,8 +223,8 @@ func (hc *handoffController) enqueueForNode(nodeAddr
string, partID uint64, part
// Read part size from metadata
partSize := hc.readPartSizeFromMetadata(sourcePath, partType)
- // Check if enqueue would exceed limit
- if !hc.canEnqueue(partSize) {
+ // Atomically check and reserve size to prevent TOCTOU race between
check and update.
+ if !hc.tryReserveSize(partSize) {
currentSize := hc.getTotalSize()
return fmt.Errorf("handoff queue full: current=%d MB, limit=%d
MB, part=%d MB",
currentSize/1024/1024, hc.maxTotalSizeBytes/1024/1024,
partSize/1024/1024)
@@ -243,16 +243,15 @@ func (hc *handoffController) enqueueForNode(nodeAddr
string, partID uint64, part
nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
if err != nil {
+ hc.updateTotalSize(-int64(partSize))
return fmt.Errorf("failed to get node queue for %s: %w",
nodeAddr, err)
}
- if err := nodeQueue.enqueue(partID, partType, sourcePath, meta); err !=
nil {
- return err
+ if enqueueErr := nodeQueue.enqueue(partID, partType, sourcePath, meta);
enqueueErr != nil {
+ hc.updateTotalSize(-int64(partSize))
+ return enqueueErr
}
- // Update total size after successful enqueue
- hc.updateTotalSize(int64(partSize))
-
return nil
}
@@ -714,15 +713,18 @@ func (hc *handoffController) stats() (partCount int64,
totalSize int64) {
return count, int64(hc.getTotalSize())
}
-// canEnqueue checks if adding a part of the given size would exceed the total
size limit.
-func (hc *handoffController) canEnqueue(partSize uint64) bool {
- if hc.maxTotalSizeBytes == 0 {
- return true // No limit configured
+// tryReserveSize atomically checks and reserves the given size if within the
limit.
+// Returns true and increments currentTotalSize if the size can be reserved.
+// Returns false if adding partSize would exceed maxTotalSizeBytes.
+// Callers must call updateTotalSize(-int64(partSize)) to release on failure.
+func (hc *handoffController) tryReserveSize(partSize uint64) bool {
+ hc.sizeMu.Lock()
+ defer hc.sizeMu.Unlock()
+ if hc.maxTotalSizeBytes > 0 && hc.currentTotalSize+partSize >
hc.maxTotalSizeBytes {
+ return false
}
-
- hc.sizeMu.RLock()
- defer hc.sizeMu.RUnlock()
- return hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes
+ hc.currentTotalSize += partSize
+ return true
}
// readPartSizeFromMetadata reads the CompressedSizeBytes from the part's
metadata file.
@@ -1210,28 +1212,77 @@ func (hc *handoffController)
readPartFromHandoff(nodeAddr string, partID uint64,
PartType: partType,
}
- // For core parts, read additional metadata from metadata.json if
present
- if partType == PartTypeCore {
- metadataPath := filepath.Join(partPath, metadataFilename)
- if metadataBytes, err := hc.fileSystem.Read(metadataPath); err
== nil {
- var pm partMetadata
- if err := json.Unmarshal(metadataBytes, &pm); err ==
nil {
- streamingPart.CompressedSizeBytes =
pm.CompressedSizeBytes
- streamingPart.UncompressedSizeBytes =
pm.UncompressedSpanSizeBytes
- streamingPart.TotalCount = pm.TotalCount
- streamingPart.BlocksCount = pm.BlocksCount
- streamingPart.MinTimestamp = pm.MinTimestamp
- streamingPart.MaxTimestamp = pm.MaxTimestamp
- }
- }
- }
-
release := func() {
for _, buf := range buffers {
bigValuePool.Release(buf)
}
}
+ // For core parts, read additional metadata from metadata.json.
+ if partType == PartTypeCore {
+ metadataPath := filepath.Join(partPath, metadataFilename)
+ metadataBytes, readErr := hc.fileSystem.Read(metadataPath)
+ if readErr != nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to read %s:
%w", metadataFilename, readErr)
+ }
+ var pm partMetadata
+ if parseErr := json.Unmarshal(metadataBytes, &pm); parseErr !=
nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to parse %s:
%w", metadataFilename, parseErr)
+ }
+ streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+ streamingPart.UncompressedSizeBytes =
pm.UncompressedSpanSizeBytes
+ streamingPart.TotalCount = pm.TotalCount
+ streamingPart.BlocksCount = pm.BlocksCount
+ streamingPart.MinTimestamp = pm.MinTimestamp
+ streamingPart.MaxTimestamp = pm.MaxTimestamp
+ } else {
+ // For sidx parts, read manifest.json and populate timestamps
using the same
+ // fallback logic as streaming sync: MinTimestamp pointer →
SegmentID legacy → warn.
+ manifestPath := filepath.Join(partPath, "manifest.json")
+ manifestBytes, readErr := hc.fileSystem.Read(manifestPath)
+ if readErr != nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to read
manifest.json: %w", readErr)
+ }
+ var pm struct {
+ MinTimestamp *int64
`json:"minTimestamp,omitempty"`
+ MaxTimestamp *int64
`json:"maxTimestamp,omitempty"`
+ CompressedSizeBytes uint64
`json:"compressedSizeBytes"`
+ UncompressedSizeBytes uint64
`json:"uncompressedSizeBytes"`
+ TotalCount uint64 `json:"totalCount"`
+ BlocksCount uint64 `json:"blocksCount"`
+ MinKey int64 `json:"minKey"`
+ MaxKey int64 `json:"maxKey"`
+ SegmentID int64 `json:"segmentID"`
+ }
+ if parseErr := json.Unmarshal(manifestBytes, &pm); parseErr !=
nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to parse
manifest.json: %w", parseErr)
+ }
+ streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+ streamingPart.UncompressedSizeBytes = pm.UncompressedSizeBytes
+ streamingPart.TotalCount = pm.TotalCount
+ streamingPart.BlocksCount = pm.BlocksCount
+ streamingPart.MinKey = pm.MinKey
+ streamingPart.MaxKey = pm.MaxKey
+ switch {
+ case pm.MinTimestamp != nil:
+ streamingPart.MinTimestamp = *pm.MinTimestamp
+ case pm.SegmentID > 0:
+ streamingPart.MinTimestamp = pm.SegmentID
+ default:
+ release()
+ return nil, func() {}, fmt.Errorf("sidx part %d has no
valid timestamp (MinTimestamp=nil, SegmentID=0)", partID)
+ }
+ if pm.MaxTimestamp != nil {
+ streamingPart.MaxTimestamp = *pm.MaxTimestamp
+ } else {
+ streamingPart.MaxTimestamp = streamingPart.MinTimestamp
+ }
+ }
+
return streamingPart, release, nil
}
diff --git a/banyand/trace/handoff_replay_test.go
b/banyand/trace/handoff_replay_test.go
index ec0610cdb..87335f2a1 100644
--- a/banyand/trace/handoff_replay_test.go
+++ b/banyand/trace/handoff_replay_test.go
@@ -404,3 +404,335 @@ func TestHandoffController_SendPartToNode(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, mockClient.getSendCount())
}
+
+// createTestSidxPart creates a minimal sidx part directory with the given
manifest content.
+func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string,
partID uint64, manifestContent []byte) string {
+ t.Helper()
+ partPath := filepath.Join(root, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ sidxFiles := map[string][]byte{
+ "primary.bin": []byte("test primary data"),
+ "data.bin": []byte("test data"),
+ "keys.bin": []byte("test keys"),
+ "meta.bin": []byte("test meta"),
+ }
+ if manifestContent != nil {
+ sidxFiles["manifest.json"] = manifestContent
+ }
+
+ for filename, content := range sidxFiles {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
+ require.NoError(t, lf.Close())
+ }
+
+ return partPath
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x60)
+ partPath := filepath.Join(sourceRoot, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ coreFiles := map[string][]byte{
+ "primary.bin": []byte("primary data"),
+ "spans.bin": []byte("spans data"),
+ "meta.bin": []byte("meta data"),
+ "metadata.json":
[]byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,` +
+
`"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`),
+ "tag.type": []byte("tag type"),
+ "traceID.filter": []byte("filter"),
+ }
+ for filename, content := range coreFiles {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
+ require.NoError(t, lf.Close())
+ }
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
PartTypeCore, partPath, "group1", 1))
+
+ streamingPart, release, err := controller.readPartFromHandoff(nodeAddr,
partID, PartTypeCore)
+ require.NoError(t, err)
+ defer release()
+
+ assert.Equal(t, uint64(1024), streamingPart.CompressedSizeBytes)
+ assert.Equal(t, uint64(2048), streamingPart.UncompressedSizeBytes)
+ assert.Equal(t, uint64(50), streamingPart.TotalCount)
+ assert.Equal(t, uint64(5), streamingPart.BlocksCount)
+ assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp)
+ assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp)
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreMissingMetadata(t
*testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x61)
+ partPath := filepath.Join(sourceRoot, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ coreFiles := map[string][]byte{
+ "primary.bin": []byte("primary data"),
+ "spans.bin": []byte("spans data"),
+ "meta.bin": []byte("meta data"),
+ }
+ for filename, content := range coreFiles {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
+ require.NoError(t, lf.Close())
+ }
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
PartTypeCore, partPath, "group1", 1))
+
+ _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID,
PartTypeCore)
+ require.Error(t, readErr)
+ assert.Contains(t, readErr.Error(), "failed to read metadata.json")
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreInvalidMetadata(t
*testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x62)
+ partPath := filepath.Join(sourceRoot, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ coreFiles := map[string][]byte{
+ "primary.bin": []byte("primary data"),
+ "spans.bin": []byte("spans data"),
+ "meta.bin": []byte("meta data"),
+ "metadata.json": []byte(`{invalid json`),
+ }
+ for filename, content := range coreFiles {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
+ require.NoError(t, lf.Close())
+ }
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
PartTypeCore, partPath, "group1", 1))
+
+ _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID,
PartTypeCore)
+ require.Error(t, readErr)
+ assert.Contains(t, readErr.Error(), "failed to parse metadata.json")
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxWithTimestamps(t
*testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x70)
+
+ manifest := []byte(`{
+ "minTimestamp": 1700000000,
+ "maxTimestamp": 1700001000,
+ "compressedSizeBytes": 512,
+ "uncompressedSizeBytes": 1024,
+ "totalCount": 20,
+ "blocksCount": 2,
+ "minKey": 10,
+ "maxKey": 200,
+ "segmentID": 1700099999
+ }`)
+ sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID,
manifest)
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
"sidx_trace_id", sourcePath, "group1", 1))
+
+ streamingPart, release, err := controller.readPartFromHandoff(nodeAddr,
partID, "sidx_trace_id")
+ require.NoError(t, err)
+ defer release()
+
+ assert.Equal(t, partID, streamingPart.ID)
+ assert.Equal(t, "group1", streamingPart.Group)
+ assert.Equal(t, "sidx_trace_id", streamingPart.PartType)
+ assert.Equal(t, uint64(512), streamingPart.CompressedSizeBytes)
+ assert.Equal(t, uint64(1024), streamingPart.UncompressedSizeBytes)
+ assert.Equal(t, uint64(20), streamingPart.TotalCount)
+ assert.Equal(t, uint64(2), streamingPart.BlocksCount)
+ assert.Equal(t, int64(10), streamingPart.MinKey)
+ assert.Equal(t, int64(200), streamingPart.MaxKey)
+ // MinTimestamp comes from the pointer field, not SegmentID
+ assert.Equal(t, int64(1700000000), streamingPart.MinTimestamp)
+ assert.Equal(t, int64(1700001000), streamingPart.MaxTimestamp)
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxWithSegmentIDFallback(t
*testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x71)
+
+ // Legacy manifest: no minTimestamp/maxTimestamp, only segmentID
+ manifest := []byte(`{
+ "compressedSizeBytes": 256,
+ "uncompressedSizeBytes": 512,
+ "totalCount": 10,
+ "blocksCount": 1,
+ "minKey": 5,
+ "maxKey": 100,
+ "segmentID": 1700050000
+ }`)
+ sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID,
manifest)
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
"sidx_trace_id", sourcePath, "group1", 1))
+
+ streamingPart, release, err := controller.readPartFromHandoff(nodeAddr,
partID, "sidx_trace_id")
+ require.NoError(t, err)
+ defer release()
+
+ // MinTimestamp falls back to SegmentID; MaxTimestamp falls back to
MinTimestamp
+ assert.Equal(t, int64(1700050000), streamingPart.MinTimestamp)
+ assert.Equal(t, int64(1700050000), streamingPart.MaxTimestamp)
+ assert.Equal(t, uint64(256), streamingPart.CompressedSizeBytes)
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxMissingManifest(t
*testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x72)
+
+ // Create sidx part without manifest.json
+ sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID, nil)
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
"sidx_trace_id", sourcePath, "group1", 1))
+
+ _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID,
"sidx_trace_id")
+ require.Error(t, readErr)
+ assert.Contains(t, readErr.Error(), "failed to read manifest.json")
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxInvalidManifest(t
*testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x73)
+
+ manifest := []byte(`{broken json`)
+ sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID,
manifest)
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
"sidx_trace_id", sourcePath, "group1", 1))
+
+ _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID,
"sidx_trace_id")
+ require.Error(t, readErr)
+ assert.Contains(t, readErr.Error(), "failed to parse manifest.json")
+}
+
+func TestHandoffController_ReadPartFromHandoff_SidxNoValidTimestamp(t
*testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x74)
+
+ // Manifest with no minTimestamp and segmentID=0 (invalid)
+ manifest := []byte(`{
+ "compressedSizeBytes": 128,
+ "totalCount": 5,
+ "blocksCount": 1,
+ "minKey": 1,
+ "maxKey": 10,
+ "segmentID": 0
+ }`)
+ sourcePath := createTestSidxPart(t, fileSystem, sourceRoot, partID,
manifest)
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ require.NoError(t, controller.enqueueForNode(nodeAddr, partID,
"sidx_trace_id", sourcePath, "group1", 1))
+
+ _, _, readErr := controller.readPartFromHandoff(nodeAddr, partID,
"sidx_trace_id")
+ require.Error(t, readErr)
+ assert.Contains(t, readErr.Error(), "has no valid timestamp")
+}
diff --git a/banyand/trace/handoff_storage_test.go
b/banyand/trace/handoff_storage_test.go
index 93084afff..4ca00ca8f 100644
--- a/banyand/trace/handoff_storage_test.go
+++ b/banyand/trace/handoff_storage_test.go
@@ -22,6 +22,7 @@ import (
"errors"
"os"
"path/filepath"
+ "sync"
"testing"
"time"
@@ -690,3 +691,58 @@ func TestHandoffController_SizeRecovery(t *testing.T) {
tester.NoError(err)
tester.Len(node2Pending, 1)
}
+
+// TestHandoffController_ConcurrentSizeEnforcement verifies that concurrent
enqueues
+// cannot exceed the size limit due to the atomic tryReserveSize check.
+func TestHandoffController_ConcurrentSizeEnforcement(t *testing.T) {
+ tester := require.New(t)
+ tempDir, deferFunc := test.Space(tester)
+ defer deferFunc()
+
+ // Create a source part with 3MB metadata
+ partID := uint64(500)
+ partPath := filepath.Join(tempDir, "source", partName(partID))
+ tester.NoError(os.MkdirAll(partPath, 0o755))
+
+ metadata := map[string]interface{}{
+ "compressedSizeBytes": 3 * megabyte,
+ }
+ metadataBytes, err := json.Marshal(metadata)
+ tester.NoError(err)
+ tester.NoError(os.WriteFile(filepath.Join(partPath, "metadata.json"),
metadataBytes, 0o600))
+ tester.NoError(os.WriteFile(filepath.Join(partPath, "data.bin"),
[]byte("test data"), 0o600))
+
+ // 10MB limit allows at most 3 parts of 3MB each
+ lfs := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ hc, err := newHandoffController(lfs, tempDir, nil,
[]string{"node1:17912"}, 10*megabyte, l, nil)
+ tester.NoError(err)
+ defer hc.close()
+
+ const numGoroutines = 20
+ var wg sync.WaitGroup
+ var successCount int64
+ var mu sync.Mutex
+
+ for i := 0; i < numGoroutines; i++ {
+ wg.Add(1)
+ go func(idx int) {
+ defer wg.Done()
+ enqueueErr := hc.enqueueForNode("node1:17912",
partID+uint64(idx), PartTypeCore, partPath, "group1", 1)
+ if enqueueErr == nil {
+ mu.Lock()
+ successCount++
+ mu.Unlock()
+ }
+ }(i)
+ }
+ wg.Wait()
+
+ // At most 3 enqueues should succeed (3 * 3MB = 9MB <= 10MB)
+ tester.LessOrEqual(successCount, int64(3))
+
+ // Total size must not exceed 10MB limit
+ totalSize := hc.getTotalSize()
+ tester.LessOrEqual(totalSize, uint64(10*megabyte))
+ tester.Equal(uint64(successCount)*3*megabyte, totalSize)
+}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index dcb68f9b9..741f1221f 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -279,6 +279,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
for _, id := range loadedSnapshots {
fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
}
+ for _, id := range loadedParts {
+ l.Info().Str("path", partPath(rootPath,
id)).Msg("delete orphaned part without snapshot")
+ fileSystem.MustRMAll(partPath(rootPath, id))
+ }
return &tst, uint64(time.Now().UnixNano())
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -298,6 +302,10 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
}
return &tst, epoch
}
+ for _, id := range loadedParts {
+ l.Info().Str("path", partPath(rootPath, id)).Msg("delete
orphaned part after all snapshots failed to load")
+ fileSystem.MustRMAll(partPath(rootPath, id))
+ }
return &tst, uint64(time.Now().UnixNano())
}
diff --git a/test/integration/handoff/handoff_suite_test.go
b/test/integration/handoff/handoff_suite_test.go
index 22ca237ec..e1af1ca70 100644
--- a/test/integration/handoff/handoff_suite_test.go
+++ b/test/integration/handoff/handoff_suite_test.go
@@ -370,6 +370,11 @@ var _ = Describe("trace handoff", func() {
return queryTrace(connection, traceID, writeTime)
}, flags.EventuallyTimeout).Should(Succeed())
+ By("verifying sidx data was replayed correctly by querying via
indexed tag")
+ Eventually(func() error {
+ return queryTraceByService(connection,
"handoff_service", writeTime)
+ }, flags.EventuallyTimeout).Should(Succeed())
+
var otherIndex int
for idx := range dnHandles {
if idx != targetIndex {
@@ -535,3 +540,42 @@ func waitForPendingParts(nodeAddr string, timeout
time.Duration) bool {
time.Sleep(100 * time.Millisecond)
}
}
+
+// queryTraceByService queries traces by service_id, which exercises the sidx
secondary index.
+// If sidx parts were replayed with MinTimestamp=0, the sidx data is rejected
or corrupted,
+// and this query fails to find traces even though core data exists.
+func queryTraceByService(conn *grpc.ClientConn, serviceID string, ts
time.Time) error {
+ client := tracev1.NewTraceServiceClient(conn)
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ req := &tracev1.QueryRequest{
+ Groups: []string{"test-trace-group"},
+ Name: "sw",
+ TimeRange: &modelv1.TimeRange{
+ Begin: timestamppb.New(ts.Add(-5 * time.Minute)),
+ End: timestamppb.New(ts.Add(5 * time.Minute)),
+ },
+ Criteria: &modelv1.Criteria{
+ Exp: &modelv1.Criteria_Condition{
+ Condition: &modelv1.Condition{
+ Name: "service_id",
+ Op: modelv1.Condition_BINARY_OP_EQ,
+ Value: &modelv1.TagValue{Value:
&modelv1.TagValue_Str{
+ Str: &modelv1.Str{Value:
serviceID},
+ }},
+ },
+ },
+ },
+ TagProjection: []string{"trace_id", "service_id"},
+ }
+
+ resp, queryErr := client.Query(ctx, req)
+ if queryErr != nil {
+ return queryErr
+ }
+ if len(resp.GetTraces()) == 0 {
+ return fmt.Errorf("no traces found for service_id=%s",
serviceID)
+ }
+ return nil
+}