This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ae3de0997 fix take snapshot error when no data in the segment (#1046)
ae3de0997 is described below

commit ae3de0997032e59c3c7bafb2cd6c8c9874e90fb4
Author: mrproliu <[email protected]>
AuthorDate: Wed Apr 8 13:03:46 2026 +0800

    fix take snapshot error when no data in the segment (#1046)
    
    * fix take snapshot error when no data in the segment
    
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                |  1 +
 banyand/internal/storage/rotation_test.go | 21 +++++++++++++
 banyand/internal/storage/storage.go       |  4 ++-
 banyand/internal/storage/tsdb.go          | 31 ++++++++++++-------
 banyand/internal/storage/tsdb_test.go     | 50 +++++++++++++++++++++++++++++++
 banyand/measure/snapshot.go               | 13 +++++---
 banyand/measure/snapshot_test.go          | 40 +++++++++++++++++++++++++
 banyand/stream/snapshot.go                | 24 ++++++++-------
 banyand/stream/snapshot_test.go           | 41 +++++++++++++++++++++++++
 banyand/trace/snapshot.go                 | 26 +++++++++-------
 banyand/trace/snapshot_test.go            | 46 ++++++++++++++++++++++++++++
 11 files changed, 262 insertions(+), 35 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 728519a7e..e6c355c75 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@ Release Notes.
 - MCP: Add validation for properties and harden the mcp server.
 - Fix property schema client connection not stable after data node restarted.
 - Fix flaky on-disk integration tests caused by Ginkgo v2 random container 
shuffling closing gRPC connections prematurely.
+- Fix snapshot error when there is no data in a segment.
 - ui: fix query editor refresh/reset behavior and BydbQL keyword highlighting.
 
 ## 0.10.0
diff --git a/banyand/internal/storage/rotation_test.go 
b/banyand/internal/storage/rotation_test.go
index 5a605e67c..e9e003916 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -208,6 +208,27 @@ var MockTSTableCreator = func(_ fs.FileSystem, _ string, _ 
common.Position,
        return &MockTSTable{}, nil
 }
 
+type SnapshotMockTSTable struct {
+       timeRange timestamp.TimeRange
+}
+
+func (m *SnapshotMockTSTable) Close() error { return nil }
+
+func (m *SnapshotMockTSTable) Collect(_ Metrics) {}
+
+func (m *SnapshotMockTSTable) TakeFileSnapshot(_ string) (bool, error) {
+       if m.timeRange.Start.Equal(time.Unix(0, 0)) {
+               return false, ErrNoCurrentSnapshot
+       }
+       return true, nil
+}
+
+var SnapshotMockTSTableCreator = func(_ fs.FileSystem, _ string, _ 
common.Position,
+       _ *logger.Logger, timeRange timestamp.TimeRange, _, _ any,
+) (*SnapshotMockTSTable, error) {
+       return &SnapshotMockTSTable{timeRange: timeRange}, nil
+}
+
 type MockMetrics struct{}
 
 func (m *MockMetrics) DeleteAll() {}
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 3b30bef84..799f42b82 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -64,7 +64,9 @@ const (
 var (
        // ErrUnknownShard indicates that the shard is not found.
        ErrUnknownShard = errors.New("unknown shard")
-       errOpenDatabase = errors.New("fails to open the database")
+       // ErrNoCurrentSnapshot is returned when a shard has no current 
snapshot available.
+       ErrNoCurrentSnapshot = errors.New("no current snapshot available")
+       errOpenDatabase      = errors.New("fails to open the database")
 
        lfs = fs.NewLocalFileSystemWithLogger(logger.GetLogger("storage"))
 )
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 27df4885c..ca6b74ba9 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -268,14 +268,14 @@ func (d *database[T, O]) UpdateOptions(resourceOpts 
*commonv1.ResourceOpts) {
        d.segmentController.updateOptions(resourceOpts)
 }
 
-func (d *database[T, O]) TakeFileSnapshot(dst string) (bool, error) {
+func (d *database[T, O]) TakeFileSnapshot(dst string) (success bool, err 
error) {
        if d.closed.Load() {
                return false, errors.New("database is closed")
        }
 
-       segments, err := d.segmentController.segments(true)
-       if err != nil {
-               return false, errors.Wrap(err, "failed to get segments")
+       segments, segErr := d.segmentController.segments(true)
+       if segErr != nil {
+               return false, errors.Wrap(segErr, "failed to get segments")
        }
        defer func() {
                for _, seg := range segments {
@@ -287,6 +287,12 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) 
(bool, error) {
                return false, nil
        }
 
+       defer func() {
+               if err != nil {
+                       d.lfs.MustRMAll(dst)
+               }
+       }()
+
        log.Info().Int("segment_count", len(segments)).Str("db_location", 
d.location).
                Msgf("taking file snapshot for %s", dst)
        for _, seg := range segments {
@@ -296,14 +302,14 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) 
(bool, error) {
 
                metadataSrc := filepath.Join(seg.location, metadataFilename)
                metadataDest := filepath.Join(segPath, metadataFilename)
-               if err := d.lfs.CreateHardLink(metadataSrc, metadataDest, nil); 
err != nil {
-                       return false, errors.Wrapf(err, "failed to snapshot 
metadata for segment %s", segDir)
+               if linkErr := d.lfs.CreateHardLink(metadataSrc, metadataDest, 
nil); linkErr != nil {
+                       return false, errors.Wrapf(linkErr, "failed to snapshot 
metadata for segment %s", segDir)
                }
 
                indexPath := filepath.Join(segPath, seriesIndexDirName)
                d.lfs.MkdirIfNotExist(indexPath, DirPerm)
-               if err := seg.index.store.TakeFileSnapshot(indexPath); err != 
nil {
-                       return false, errors.Wrapf(err, "failed to snapshot 
index for segment %s", segDir)
+               if indexErr := seg.index.store.TakeFileSnapshot(indexPath); 
indexErr != nil {
+                       return false, errors.Wrapf(indexErr, "failed to 
snapshot index for segment %s", segDir)
                }
 
                sLst := seg.sLst.Load()
@@ -314,8 +320,13 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) 
(bool, error) {
                        shardDir := filepath.Base(shard.location)
                        shardPath := filepath.Join(segPath, shardDir)
                        d.lfs.MkdirIfNotExist(shardPath, DirPerm)
-                       if _, err := shard.table.TakeFileSnapshot(shardPath); 
err != nil {
-                               return false, errors.Wrapf(err, "failed to 
snapshot shard %s in segment %s", shardDir, segDir)
+                       if _, shardErr := 
shard.table.TakeFileSnapshot(shardPath); shardErr != nil {
+                               if errors.Is(shardErr, ErrNoCurrentSnapshot) {
+                                       log.Debug().Str("shard", 
shardDir).Str("segment", segDir).
+                                               Msg("skipping empty shard 
snapshot")
+                                       continue
+                               }
+                               return false, errors.Wrapf(shardErr, "failed to 
snapshot shard %s in segment %s", shardDir, segDir)
                        }
                }
        }
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index d3b84e889..08b285d7a 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -308,6 +308,56 @@ func TestTakeFileSnapshot(t *testing.T) {
 
                require.NoError(t, tsdb.Close())
        })
+
+       t.Run("Take snapshot skips shard with no current snapshot", func(t 
*testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               snapshotDir := filepath.Join(dir, "snapshot")
+
+               opts := TSDBOpts[*SnapshotMockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 7},
+                       ShardNum:        1,
+                       TSTableCreator:  SnapshotMockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(ts)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               serviceCache := NewServiceCache()
+               tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+               require.NoError(t, err)
+               defer tsdb.Close()
+
+               normalSeg, err := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, err)
+               normalSegLocation := normalSeg.(*segment[*SnapshotMockTSTable, 
any]).location
+               normalSeg.DecRef()
+
+               epochSeg, err := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0))
+               require.NoError(t, err)
+               epochSegLocation := epochSeg.(*segment[*SnapshotMockTSTable, 
any]).location
+               epochSeg.DecRef()
+
+               created, snapshotErr := tsdb.TakeFileSnapshot(snapshotDir)
+               require.NoError(t, snapshotErr,
+                       "snapshot should not fail due to empty shard in epoch 
segment")
+               require.True(t, created)
+
+               normalSegDir := filepath.Join(snapshotDir, 
filepath.Base(normalSegLocation))
+               require.DirExists(t, normalSegDir,
+                       "normal segment should be present in snapshot")
+
+               epochSegDir := filepath.Join(snapshotDir, 
filepath.Base(epochSegLocation))
+               require.DirExists(t, epochSegDir,
+                       "epoch segment directory should still be created")
+       })
 }
 
 func TestTSDBCollect(t *testing.T) {
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 80c1df9fd..fa8323a94 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -153,12 +153,17 @@ func parseSnapshot(name string) (uint64, error) {
        return parseEpoch(name[:16])
 }
 
-func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
+func (tst *tsTable) TakeFileSnapshot(dst string) (success bool, err error) {
        snapshot := tst.currentSnapshot()
        if snapshot == nil {
-               return false, fmt.Errorf("no current snapshot available")
+               return false, storage.ErrNoCurrentSnapshot
        }
        defer snapshot.decRef()
+       defer func() {
+               if err != nil {
+                       tst.fileSystem.MustRMAll(dst)
+               }
+       }()
 
        hasDiskParts := false
        for _, pw := range snapshot.parts {
@@ -171,8 +176,8 @@ func (tst *tsTable) TakeFileSnapshot(dst string) (bool, 
error) {
                srcPath := part.path
                destPartPath := filepath.Join(dst, filepath.Base(srcPath))
 
-               if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath, 
nil); err != nil {
-                       return false, fmt.Errorf("failed to create snapshot for 
part %d: %w", part.partMetadata.ID, err)
+               if linkErr := tst.fileSystem.CreateHardLink(srcPath, 
destPartPath, nil); linkErr != nil {
+                       return false, fmt.Errorf("failed to create snapshot for 
part %d: %w", part.partMetadata.ID, linkErr)
                }
        }
        if !hasDiskParts {
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index dc8c51d6b..67aa4f896 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -812,3 +812,43 @@ func TestInitTSTableLoadsNewestWhenMultipleValid(t 
*testing.T) {
        sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > 
snapshots[j] })
        require.Equal(t, snapshots[0], epoch2, "must load newest valid 
snapshot")
 }
+
+func TestTakeFileSnapshotEmptySegment(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+       tst, err := newTSTable(
+               fileSystem,
+               tabDir,
+               common.Position{},
+               logger.GetLogger("test"),
+               timestamp.TimeRange{},
+               option{
+                       flushTimeout: 0,
+                       mergePolicy:  newDefaultMergePolicy(),
+                       protector:    protector.Nop{},
+               },
+               nil,
+       )
+       require.NoError(t, err)
+       defer tst.Close()
+
+       tst.Lock()
+       tst.snapshot = nil
+       tst.Unlock()
+
+       snapshotPath := filepath.Join(tmpPath, "snapshot")
+       fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+       created, err := tst.TakeFileSnapshot(snapshotPath)
+       require.ErrorIs(t, err, storage.ErrNoCurrentSnapshot)
+       assert.False(t, created)
+
+       entries := fileSystem.ReadDir(snapshotPath)
+       assert.Empty(t, entries, "no files or dirs should remain when snapshot 
is nil")
+}
diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go
index 67f79f335..e3fd0d9ff 100644
--- a/banyand/stream/snapshot.go
+++ b/banyand/stream/snapshot.go
@@ -195,21 +195,25 @@ func parseSnapshot(name string) (uint64, error) {
        return parseEpoch(name[:16])
 }
 
-func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
+func (tst *tsTable) TakeFileSnapshot(dst string) (success bool, err error) {
        if tst.index == nil {
                return false, fmt.Errorf("cannot take file snapshot: index is 
not initialized for this tsTable")
        }
-       indexDir := filepath.Join(dst, filepath.Base(tst.index.location))
-       tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
-       if err := tst.index.store.TakeFileSnapshot(indexDir); err != nil {
-               return false, fmt.Errorf("failed to take file snapshot for 
index: %w", err)
-       }
-
        snapshot := tst.currentSnapshot()
        if snapshot == nil {
-               return false, fmt.Errorf("no current snapshot available")
+               return false, storage.ErrNoCurrentSnapshot
        }
        defer snapshot.decRef()
+       defer func() {
+               if err != nil {
+                       tst.fileSystem.MustRMAll(dst)
+               }
+       }()
+       indexDir := filepath.Join(dst, filepath.Base(tst.index.location))
+       tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
+       if indexErr := tst.index.store.TakeFileSnapshot(indexDir); indexErr != 
nil {
+               return false, fmt.Errorf("failed to take file snapshot for 
index: %w", indexErr)
+       }
 
        hasDiskParts := false
        for _, pw := range snapshot.parts {
@@ -221,8 +225,8 @@ func (tst *tsTable) TakeFileSnapshot(dst string) (bool, 
error) {
                srcPath := part.path
                destPartPath := filepath.Join(dst, filepath.Base(srcPath))
 
-               if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath, 
nil); err != nil {
-                       return false, fmt.Errorf("failed to create snapshot for 
part %d: %w", part.partMetadata.ID, err)
+               if linkErr := tst.fileSystem.CreateHardLink(srcPath, 
destPartPath, nil); linkErr != nil {
+                       return false, fmt.Errorf("failed to create snapshot for 
part %d: %w", part.partMetadata.ID, linkErr)
                }
                hasDiskParts = true
        }
diff --git a/banyand/stream/snapshot_test.go b/banyand/stream/snapshot_test.go
index bfbad9c79..d98560b19 100644
--- a/banyand/stream/snapshot_test.go
+++ b/banyand/stream/snapshot_test.go
@@ -27,6 +27,7 @@ import (
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -609,3 +610,43 @@ func TestTakeFileSnapshotNoDiskParts(t *testing.T) {
        }
        assert.True(t, hasIndex, "expected index directory in snapshot")
 }
+
+func TestTakeFileSnapshotEmptySegment(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+       tst, err := newTSTable(
+               fileSystem,
+               tabDir,
+               common.Position{},
+               logger.GetLogger("test"),
+               timestamp.TimeRange{},
+               option{
+                       flushTimeout: 0,
+                       mergePolicy:  newDefaultMergePolicy(),
+                       protector:    protector.Nop{},
+               },
+               nil,
+       )
+       require.NoError(t, err)
+       defer tst.Close()
+
+       tst.Lock()
+       tst.snapshot = nil
+       tst.Unlock()
+
+       snapshotPath := filepath.Join(tmpPath, "snapshot")
+       fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+       created, err := tst.TakeFileSnapshot(snapshotPath)
+       require.ErrorIs(t, err, storage.ErrNoCurrentSnapshot)
+       assert.False(t, created)
+
+       entries := fileSystem.ReadDir(snapshotPath)
+       assert.Empty(t, entries, "no files or dirs should remain when snapshot 
is nil")
+}
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 9b8d3a485..6c120d7d3 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -217,19 +217,25 @@ func parseSnapshot(name string) (uint64, error) {
        return parseEpoch(name[:16])
 }
 
-func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
+func (tst *tsTable) TakeFileSnapshot(dst string) (success bool, err error) {
+       snapshot := tst.currentSnapshot()
+       if snapshot == nil {
+               return false, storage.ErrNoCurrentSnapshot
+       }
+       defer snapshot.decRef()
+       defer func() {
+               if err != nil {
+                       tst.fileSystem.MustRMAll(dst)
+               }
+       }()
+
        for k, v := range tst.sidxMap {
                indexDir := filepath.Join(dst, sidxDirName, k)
                tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
-               if err := v.TakeFileSnapshot(indexDir); err != nil {
-                       return false, fmt.Errorf("failed to take file snapshot 
for index, %s: %w", k, err)
+               if sidxErr := v.TakeFileSnapshot(indexDir); sidxErr != nil {
+                       return false, fmt.Errorf("failed to take file snapshot 
for index, %s: %w", k, sidxErr)
                }
        }
-       snapshot := tst.currentSnapshot()
-       if snapshot == nil {
-               return false, fmt.Errorf("no current snapshot available")
-       }
-       defer snapshot.decRef()
 
        hasDiskParts := false
        for _, pw := range snapshot.parts {
@@ -241,8 +247,8 @@ func (tst *tsTable) TakeFileSnapshot(dst string) (bool, 
error) {
                srcPath := part.path
                destPartPath := filepath.Join(dst, filepath.Base(srcPath))
 
-               if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath, 
nil); err != nil {
-                       return false, fmt.Errorf("failed to create snapshot for 
part %d: %w", part.partMetadata.ID, err)
+               if linkErr := tst.fileSystem.CreateHardLink(srcPath, 
destPartPath, nil); linkErr != nil {
+                       return false, fmt.Errorf("failed to create snapshot for 
part %d: %w", part.partMetadata.ID, linkErr)
                }
                hasDiskParts = true
        }
diff --git a/banyand/trace/snapshot_test.go b/banyand/trace/snapshot_test.go
index 9f3db9fcc..f8909c9dd 100644
--- a/banyand/trace/snapshot_test.go
+++ b/banyand/trace/snapshot_test.go
@@ -27,6 +27,7 @@ import (
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -738,3 +739,48 @@ func TestTakeFileSnapshotNoDiskPartsWithoutSidx(t 
*testing.T) {
        require.NoError(t, err)
        assert.False(t, created, "TakeFileSnapshot should return false when 
sidxMap is empty and no disk parts")
 }
+
+func TestTakeFileSnapshotEmptySegment(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+       tst, err := newTSTable(
+               fileSystem,
+               tabDir,
+               common.Position{},
+               logger.GetLogger("test"),
+               timestamp.TimeRange{},
+               option{
+                       flushTimeout: 0,
+                       mergePolicy:  newDefaultMergePolicy(),
+                       protector:    protector.Nop{},
+               },
+               nil,
+       )
+       require.NoError(t, err)
+       defer tst.Close()
+
+       // Populate sidxMap so the test guards against the prior bug where the
+       // sidxMap loop created sidx directories before the nil-snapshot check.
+       _, err = tst.getOrCreateSidx("test_sidx")
+       require.NoError(t, err)
+
+       tst.Lock()
+       tst.snapshot = nil
+       tst.Unlock()
+
+       snapshotPath := filepath.Join(tmpPath, "snapshot")
+       fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+       created, err := tst.TakeFileSnapshot(snapshotPath)
+       require.ErrorIs(t, err, storage.ErrNoCurrentSnapshot)
+       assert.False(t, created)
+
+       entries := fileSystem.ReadDir(snapshotPath)
+       assert.Empty(t, entries, "no files or dirs should remain when snapshot 
is nil")
+}

Reply via email to