Copilot commented on code in PR #1064:
URL:
https://github.com/apache/skywalking-banyandb/pull/1064#discussion_r3068885936
##########
banyand/trace/handoff_controller.go:
##########
@@ -714,15 +713,22 @@ func (hc *handoffController) stats() (partCount int64,
totalSize int64) {
return count, int64(hc.getTotalSize())
}
-// canEnqueue checks if adding a part of the given size would exceed the total
size limit.
-func (hc *handoffController) canEnqueue(partSize uint64) bool {
+// tryReserveSize atomically checks and reserves the given size if within the
limit.
+// Returns true and increments currentTotalSize if the size can be reserved.
+// Returns false if adding partSize would exceed maxTotalSizeBytes.
+// Callers must call updateTotalSize(-int64(partSize)) to release on failure.
+func (hc *handoffController) tryReserveSize(partSize uint64) bool {
if hc.maxTotalSizeBytes == 0 {
return true // No limit configured
}
Review Comment:
tryReserveSize returns true when maxTotalSizeBytes==0 without incrementing
currentTotalSize. enqueueForNode will still call updateTotalSize(-partSize) on
failure paths, and successful enqueues won’t update the tracked total size at
all, leading to incorrect stats and spurious underflow/reset-to-0 warnings.
Consider reserving/tracking size even when unlimited, or skipping
rollback/update when no limit is configured.
##########
banyand/trace/handoff_controller.go:
##########
@@ -1210,28 +1216,77 @@ func (hc *handoffController)
readPartFromHandoff(nodeAddr string, partID uint64,
PartType: partType,
}
- // For core parts, read additional metadata from metadata.json if
present
- if partType == PartTypeCore {
- metadataPath := filepath.Join(partPath, metadataFilename)
- if metadataBytes, err := hc.fileSystem.Read(metadataPath); err
== nil {
- var pm partMetadata
- if err := json.Unmarshal(metadataBytes, &pm); err ==
nil {
- streamingPart.CompressedSizeBytes =
pm.CompressedSizeBytes
- streamingPart.UncompressedSizeBytes =
pm.UncompressedSpanSizeBytes
- streamingPart.TotalCount = pm.TotalCount
- streamingPart.BlocksCount = pm.BlocksCount
- streamingPart.MinTimestamp = pm.MinTimestamp
- streamingPart.MaxTimestamp = pm.MaxTimestamp
- }
- }
- }
-
release := func() {
for _, buf := range buffers {
bigValuePool.Release(buf)
}
}
+ // For core parts, read additional metadata from metadata.json.
+ if partType == PartTypeCore {
+ metadataPath := filepath.Join(partPath, metadataFilename)
+ metadataBytes, readErr := hc.fileSystem.Read(metadataPath)
+ if readErr != nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to read %s:
%w", metadataFilename, readErr)
+ }
+ var pm partMetadata
+ if parseErr := json.Unmarshal(metadataBytes, &pm); parseErr !=
nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to parse %s:
%w", metadataFilename, parseErr)
+ }
+ streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+ streamingPart.UncompressedSizeBytes =
pm.UncompressedSpanSizeBytes
+ streamingPart.TotalCount = pm.TotalCount
+ streamingPart.BlocksCount = pm.BlocksCount
+ streamingPart.MinTimestamp = pm.MinTimestamp
+ streamingPart.MaxTimestamp = pm.MaxTimestamp
+ } else {
+ // For sidx parts, read manifest.json and populate timestamps
using the same
+ // fallback logic as streaming sync: MinTimestamp pointer →
SegmentID legacy → warn.
+ manifestPath := filepath.Join(partPath, "manifest.json")
+ manifestBytes, readErr := hc.fileSystem.Read(manifestPath)
+ if readErr != nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to read
manifest.json: %w", readErr)
+ }
+ var pm struct {
+ MinTimestamp *int64
`json:"minTimestamp,omitempty"`
+ MaxTimestamp *int64
`json:"maxTimestamp,omitempty"`
+ CompressedSizeBytes uint64
`json:"compressedSizeBytes"`
+ UncompressedSizeBytes uint64
`json:"uncompressedSizeBytes"`
+ TotalCount uint64 `json:"totalCount"`
+ BlocksCount uint64 `json:"blocksCount"`
+ MinKey int64 `json:"minKey"`
+ MaxKey int64 `json:"maxKey"`
+ SegmentID int64 `json:"segmentID"`
+ }
+ if parseErr := json.Unmarshal(manifestBytes, &pm); parseErr !=
nil {
+ release()
+ return nil, func() {}, fmt.Errorf("failed to parse
manifest.json: %w", parseErr)
+ }
+ streamingPart.CompressedSizeBytes = pm.CompressedSizeBytes
+ streamingPart.UncompressedSizeBytes = pm.UncompressedSizeBytes
+ streamingPart.TotalCount = pm.TotalCount
+ streamingPart.BlocksCount = pm.BlocksCount
+ streamingPart.MinKey = pm.MinKey
+ streamingPart.MaxKey = pm.MaxKey
+ switch {
+ case pm.MinTimestamp != nil:
+ streamingPart.MinTimestamp = *pm.MinTimestamp
+ case pm.SegmentID > 0:
+ streamingPart.MinTimestamp = pm.SegmentID
+ default:
+ hc.l.Warn().Uint64("partID", partID).Str("partType",
partType).
+ Msg("sidx handoff replay: part has no valid
timestamp (MinTimestamp=nil, SegmentID=0)")
+ }
+ if pm.MaxTimestamp != nil {
+ streamingPart.MaxTimestamp = *pm.MaxTimestamp
+ } else {
+ streamingPart.MaxTimestamp = streamingPart.MinTimestamp
+ }
Review Comment:
When neither minTimestamp nor segmentID is present/valid in manifest.json,
this code only logs a warning but returns a StreamingPartData with
MinTimestamp/MaxTimestamp left as 0. The receiving chunked sync handler rejects
ctx.MinTimestamp <= 0, so this will cause replay retries/stuck parts rather
than a clean failure. Returning an error here (or otherwise marking the part as
unreplayable) would prevent repeated failed sends and avoid zero-timestamp
corruption paths.
##########
banyand/trace/handoff_replay_test.go:
##########
@@ -404,3 +404,297 @@ func TestHandoffController_SendPartToNode(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, mockClient.getSendCount())
}
+
+// createTestSidxPart creates a minimal sidx part directory with the given
manifest content.
+func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string,
partID uint64, manifestContent []byte) string {
+ t.Helper()
+ partPath := filepath.Join(root, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ sidxFiles := map[string][]byte{
+ "primary.bin": []byte("test primary data"),
+ "data.bin": []byte("test data"),
+ "keys.bin": []byte("test keys"),
+ "meta.bin": []byte("test meta"),
+ }
+ if manifestContent != nil {
+ sidxFiles["manifest.json"] = manifestContent
+ }
+
+ for filename, content := range sidxFiles {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
+ }
+
+ return partPath
+}
+
+func TestHandoffController_ReadPartFromHandoff_CoreMetadata(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x60)
+ partPath := filepath.Join(sourceRoot, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ coreFiles := map[string][]byte{
+ "primary.bin": []byte("primary data"),
+ "spans.bin": []byte("spans data"),
+ "meta.bin": []byte("meta data"),
+ "metadata.json":
[]byte(`{"compressedSizeBytes":1024,"uncompressedSpanSizeBytes":2048,` +
+
`"totalCount":50,"blocksCount":5,"minTimestamp":1700000000,"maxTimestamp":1700001000}`),
+ "tag.type": []byte("tag type"),
+ "traceID.filter": []byte("filter"),
+ }
+ for filename, content := range coreFiles {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
Review Comment:
Lock files created with CreateLockFile in these test helpers aren’t closed
after Write. Since CreateLockFile takes an exclusive lock, this can leak
FDs/locks and cause nondeterministic behavior on some platforms. Close the file
after writing (ideally with t.Cleanup or explicit Close checks).
##########
banyand/trace/handoff_replay_test.go:
##########
@@ -404,3 +404,297 @@ func TestHandoffController_SendPartToNode(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, mockClient.getSendCount())
}
+
+// createTestSidxPart creates a minimal sidx part directory with the given
manifest content.
+func createTestSidxPart(t *testing.T, fileSystem fs.FileSystem, root string,
partID uint64, manifestContent []byte) string {
+ t.Helper()
+ partPath := filepath.Join(root, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ sidxFiles := map[string][]byte{
+ "primary.bin": []byte("test primary data"),
+ "data.bin": []byte("test data"),
+ "keys.bin": []byte("test keys"),
+ "meta.bin": []byte("test meta"),
+ }
+ if manifestContent != nil {
+ sidxFiles["manifest.json"] = manifestContent
+ }
+
+ for filename, content := range sidxFiles {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
+ }
Review Comment:
Files created via fileSystem.CreateLockFile are never closed after writing.
CreateLockFile acquires an OS-level exclusive lock; not closing can leak file
descriptors and keep locks held, making tests flaky or interfering with
subsequent file operations. Close the lock file (and handle close errors) after
each write.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]