Copilot commented on code in PR #1064:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1064#discussion_r3068885936


##########
banyand/trace/handoff_controller.go:
##########
@@ -714,15 +713,22 @@ func (hc *handoffController) stats() (partCount int64, 
totalSize int64) {
        return count, int64(hc.getTotalSize())
 }
 
-// canEnqueue checks if adding a part of the given size would exceed the total 
size limit.
-func (hc *handoffController) canEnqueue(partSize uint64) bool {
+// tryReserveSize atomically checks and reserves the given size if within the 
limit.
+// Returns true and increments currentTotalSize if the size can be reserved.
+// Returns false if adding partSize would exceed maxTotalSizeBytes.
+// Callers must call updateTotalSize(-int64(partSize)) to release on failure.
+func (hc *handoffController) tryReserveSize(partSize uint64) bool {
        if hc.maxTotalSizeBytes == 0 {
                return true // No limit configured
        }

Review Comment:
   tryReserveSize returns true when maxTotalSizeBytes==0 without incrementing 
currentTotalSize. enqueueForNode will still call updateTotalSize(-partSize) on 
failure paths, and successful enqueues won’t update the tracked total size at 
all, leading to incorrect stats and spurious underflow/reset-to-0 warnings. 
Consider reserving/tracking size even when unlimited, or skipping 
rollback/update when no limit is configured.



##########
banyand/trace/handoff_controller.go:
##########
@@ -1210,28 +1216,77 @@ func (hc *handoffController) 
readPartFromHandoff(nodeAddr string, partID uint64,
                PartType: partType,
        }
 
-       // For core parts, read additional metadata from metadata.json if 
present
-       if partType == PartTypeCore {
-               metadataPath := filepath.Join(partPath, metadataFilename)
-               if metadataBytes, err := hc.fileSystem.Read(metadataPath); err 
== nil {
-                       var pm partMetadata
-                       if err := json.Unmarshal(metadataBytes, &pm); err == 
nil {
-                               streamingPart.CompressedSizeBytes = 
pm.CompressedSizeBytes
-                               streamingPart.UncompressedSizeBytes = 
pm.UncompressedSpanSizeBytes
-                               streamingPart.TotalCount = pm.TotalCount
-                               streamingPart.BlocksCount = pm.BlocksCount
-                               streamingPart.MinTimestamp = pm.MinTimestamp
-                               streamingPart.MaxTimestamp = pm.MaxTimestamp
-                       }
-               }
-       }
-
        release := func() {
                for _, buf := range buffers {
                        bigValuePool.Release(buf)
                }
        }
 
+       // For core parts, read additional metadata from metadata.json.
+       if partType == PartTypeCore {
+               metadataPath := filepath.Join(partPath, metadataFilename)
+               metadataBytes, readErr := hc.fileSystem.Read(metadataPath)
+               if readErr != nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to read %s: 
%w", metadataFilename, readErr)
+               }
+               var pm partMetadata
+               if parseErr := json.Unmarshal(metadataBytes, &pm); parseErr != 
nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to parse %s: 
%w", metadataFilename, parseErr)
+               }
+               streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+               streamingPart.UncompressedSizeBytes = 
pm.UncompressedSpanSizeBytes
+               streamingPart.TotalCount = pm.TotalCount
+               streamingPart.BlocksCount = pm.BlocksCount
+               streamingPart.MinTimestamp = pm.MinTimestamp
+               streamingPart.MaxTimestamp = pm.MaxTimestamp
+       } else {
+               // For sidx parts, read manifest.json and populate timestamps 
using the same
+               // fallback logic as streaming sync: MinTimestamp pointer → 
SegmentID legacy → warn.
+               manifestPath := filepath.Join(partPath, "manifest.json")
+               manifestBytes, readErr := hc.fileSystem.Read(manifestPath)
+               if readErr != nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to read 
manifest.json: %w", readErr)
+               }
+               var pm struct {
+                       MinTimestamp          *int64 
`json:"minTimestamp,omitempty"`
+                       MaxTimestamp          *int64 
`json:"maxTimestamp,omitempty"`
+                       CompressedSizeBytes   uint64 
`json:"compressedSizeBytes"`
+                       UncompressedSizeBytes uint64 
`json:"uncompressedSizeBytes"`
+                       TotalCount            uint64 `json:"totalCount"`
+                       BlocksCount           uint64 `json:"blocksCount"`
+                       MinKey                int64  `json:"minKey"`
+                       MaxKey                int64  `json:"maxKey"`
+                       SegmentID             int64  `json:"segmentID"`
+               }
+               if parseErr := json.Unmarshal(manifestBytes, &pm); parseErr != 
nil {
+                       release()
+                       return nil, func() {}, fmt.Errorf("failed to parse 
manifest.json: %w", parseErr)
+               }
+               streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+               streamingPart.UncompressedSizeBytes = pm.UncompressedSizeBytes
+               streamingPart.TotalCount = pm.TotalCount
+               streamingPart.BlocksCount = pm.BlocksCount
+               streamingPart.MinKey = pm.MinKey
+               streamingPart.MaxKey = pm.MaxKey
+               switch {
+               case pm.MinTimestamp != nil:
+                       streamingPart.MinTimestamp = *pm.MinTimestamp
+               case pm.SegmentID > 0:
+                       streamingPart.MinTimestamp = pm.SegmentID
+               default:
+                       hc.l.Warn().Uint64("partID", partID).Str("partType", 
partType).
+                               Msg("sidx handoff replay: part has no valid 
timestamp (MinTimestamp=nil, SegmentID=0)")
+               }
+               if pm.MaxTimestamp != nil {
+                       streamingPart.MaxTimestamp = *pm.MaxTimestamp
+               } else {
+                       streamingPart.MaxTimestamp = streamingPart.MinTimestamp
+               }

Review Comment:
   When neither minTimestamp nor segmentID is present/valid in manifest.json, 
this code only logs a warning but returns a StreamingPartData with 
MinTimestamp/MaxTimestamp left as 0. The receiving chunked sync handler rejects 
ctx.MinTimestamp <= 0, so this will cause replay retries/stuck parts rather 
than a clean failure. Returning an error here (or otherwise marking the part as 
unreplayable) would prevent repeated failed sends and avoid zero-timestamp 
corruption paths.



##########
banyand/trace/handoff_replay_test.go:
##########
@@ -404,3 +404,297 @@ func TestHandoffController_SendPartToNode(t *testing.T) {
        require.NoError(t, err)
        assert.Equal(t, 1, mockClient.getSendCount())
 }
+
+// createTestSidxPart creates a minimal sidx part directory with the given 
manifest content.
+func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string, 
partID uint64, manifestContent []byte) string {
+       t.Helper()
+       partPath := filepath.Join(root, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       sidxFiles := map[string][]byte{
+               "primary.bin": []byte("test primary data"),
+               "data.bin":    []byte("test data"),
+               "keys.bin":    []byte("test keys"),
+               "meta.bin":    []byte("test meta"),
+       }
+       if manifestContent != nil {
+               sidxFiles["manifest.json"] = manifestContent
+       }
+
+       for filename, content := range sidxFiles {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)
+       }
+
+       return partPath
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x60)
+       partPath := filepath.Join(sourceRoot, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       coreFiles := map[string][]byte{
+               "primary.bin": []byte("primary data"),
+               "spans.bin":   []byte("spans data"),
+               "meta.bin":    []byte("meta data"),
+               "metadata.json": 
[]byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,` +
+                       
`"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`),
+               "tag.type":       []byte("tag type"),
+               "traceID.filter": []byte("filter"),
+       }
+       for filename, content := range coreFiles {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)

Review Comment:
   Lock files created with CreateLockFile in these test helpers aren’t closed 
after Write. Since CreateLockFile takes an exclusive lock, this can leak 
FDs/locks and cause nondeterministic behavior on some platforms. Close the file 
after writing (ideally with t.Cleanup or explicit Close checks).



##########
banyand/trace/handoff_replay_test.go:
##########
@@ -404,3 +404,297 @@ func TestHandoffController_SendPartToNode(t *testing.T) {
        require.NoError(t, err)
        assert.Equal(t, 1, mockClient.getSendCount())
 }
+
+// createTestSidxPart creates a minimal sidx part directory with the given 
manifest content.
+func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string, 
partID uint64, manifestContent []byte) string {
+       t.Helper()
+       partPath := filepath.Join(root, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       sidxFiles := map[string][]byte{
+               "primary.bin": []byte("test primary data"),
+               "data.bin":    []byte("test data"),
+               "keys.bin":    []byte("test keys"),
+               "meta.bin":    []byte("test meta"),
+       }
+       if manifestContent != nil {
+               sidxFiles["manifest.json"] = manifestContent
+       }
+
+       for filename, content := range sidxFiles {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)
+       }

Review Comment:
   Files created via fileSystem.CreateLockFile are never closed after writing. 
CreateLockFile acquires an OS-level exclusive lock; not closing can leak file 
descriptors and keep locks held, making tests flaky or interfering with 
subsequent file operations. Close the lock file (and handle close errors) after 
each write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to