This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/snapshot-crash
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 548d147014ef9165d5a03a43997b2d8c4f01c80a
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Mar 12 12:49:00 2026 +0000

    Fix snapshot handling to prevent crashes and improve error handling in 
tsTable.
---
 banyand/measure/snapshot_test.go        | 240 ++++++++++++++++++++++++++++++++
 banyand/measure/tstable.go              |  67 ++++++---
 banyand/stream/snapshot_tstable_test.go | 237 +++++++++++++++++++++++++++++++
 banyand/stream/tstable.go               |  73 +++++++---
 banyand/trace/snapshot_tstable_test.go  | 224 +++++++++++++++++++++++++++++
 banyand/trace/tstable.go                |  73 +++++++---
 pkg/fs/file_system.go                   |   2 +
 pkg/fs/local_file_system.go             |  23 +++
 8 files changed, 882 insertions(+), 57 deletions(-)

diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index c5c717628..1ad40c2e9 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -18,8 +18,10 @@
 package measure
 
 import (
+       "encoding/json"
        "path/filepath"
        "runtime"
+       "sort"
        "testing"
        "time"
 
@@ -27,6 +29,7 @@ import (
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -520,3 +523,240 @@ func TestSnapshotFunctionality(t *testing.T) {
                }
        }
 }
+
+func testSnapshotOption() option {
+       return option{
+               flushTimeout: 0,
+               mergePolicy:  newDefaultMergePolicyForTesting(),
+               protector:    protector.Nop{},
+       }
+}
+
+func TestMustWriteSnapshotUsesTempThenRename(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       const epoch = uint64(0x10)
+       partNames := []string{partName(0x1)}
+       tst.mustWriteSnapshot(epoch, partNames)
+       snpPath := filepath.Join(tabDir, snapshotName(epoch))
+       tmpPathFile := snpPath + ".tmp"
+       require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be 
removed after rename")
+       require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must 
exist")
+       data, err := fileSystem.Read(snpPath)
+       require.NoError(t, err)
+       var decoded []string
+       require.NoError(t, json.Unmarshal(data, &decoded))
+       require.Equal(t, partNames, decoded)
+}
+
+func TestMustWriteSnapshotWithMultipleParts(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       partNames := []string{partName(0x1), partName(0x2), partName(0xff)}
+       tst.mustWriteSnapshot(1, partNames)
+       parts, readErr := tst.readSnapshot(1)
+       require.NoError(t, readErr)
+       require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts)
+}
+
+func TestMustWriteSnapshotEmptyPartList(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       tst.mustWriteSnapshot(1, []string{})
+       parts, readErr := tst.readSnapshot(1)
+       require.NoError(t, readErr)
+       require.Empty(t, parts)
+}
+
+func TestTolerantLoaderFallbackToOlderSnapshot(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst, err := newTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"),
+               timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: 
newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil)
+       require.NoError(t, err)
+       tst.mustAddDataPoints(dpsTS1)
+       time.Sleep(100 * time.Millisecond)
+       require.Eventually(t, func() bool {
+               dd := fileSystem.ReadDir(tabDir)
+               for _, d := range dd {
+                       if d.IsDir() && d.Name() != storage.FailedPartsDirName {
+                               return true
+                       }
+               }
+               return false
+       }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+       tst.Close()
+       snapshots := make([]uint64, 0)
+       for _, e := range fileSystem.ReadDir(tabDir) {
+               if filepath.Ext(e.Name()) == snapshotSuffix {
+                       parsed, parseErr := parseSnapshot(e.Name())
+                       if parseErr == nil {
+                               snapshots = append(snapshots, parsed)
+                       }
+               }
+       }
+       require.GreaterOrEqual(t, len(snapshots), 1)
+       sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > 
snapshots[j] })
+       validEpoch := snapshots[0]
+       corruptEpoch := validEpoch + 1
+       corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch))
+       _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600)
+       require.NoError(t, writeErr)
+       tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"), testSnapshotOption(), nil)
+       require.NotNil(t, tst2)
+       require.Equal(t, validEpoch, epoch2, "should load older valid snapshot 
when newest is corrupt")
+       require.NotNil(t, tst2.snapshot)
+       require.Equal(t, validEpoch, tst2.snapshot.epoch)
+}
+
+func TestReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+       require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+       require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       partNamesData, marshalErr := json.Marshal([]string{"not-hex"})
+       require.NoError(t, marshalErr)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+}
+
+func TestReadSnapshotSucceedsOnValidFile(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)})
+       parts, readErr := tst.readSnapshot(1)
+       require.NoError(t, readErr)
+       require.Equal(t, []uint64{0xa, 0xb}, parts)
+}
+
+func TestMustReadSnapshotPanicsOnError(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       require.Panics(t, func() { _ = tst.mustReadSnapshot(1) })
+}
+
+func TestInitTSTableEmptyDirectory(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"), testSnapshotOption(), nil)
+       require.NotNil(t, tst)
+       require.Greater(t, epoch, uint64(0))
+       require.Nil(t, tst.snapshot)
+}
+
+func TestInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"), testSnapshotOption(), nil)
+       require.NotNil(t, tst)
+       require.Greater(t, epoch, uint64(0))
+       require.Nil(t, tst.snapshot)
+       require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file 
must be deleted")
+}
+
+func TestInitTSTableLoadsNewestWhenMultipleValid(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst, err := newTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"),
+               timestamp.TimeRange{}, testSnapshotOption(), nil)
+       require.NoError(t, err)
+       tst.mustAddDataPoints(dpsTS1)
+       time.Sleep(100 * time.Millisecond)
+       require.Eventually(t, func() bool {
+               dd := fileSystem.ReadDir(tabDir)
+               for _, d := range dd {
+                       if d.IsDir() && d.Name() != storage.FailedPartsDirName {
+                               return true
+                       }
+               }
+               return false
+       }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+       tst.Close()
+       snapshots := make([]uint64, 0)
+       for _, e := range fileSystem.ReadDir(tabDir) {
+               if filepath.Ext(e.Name()) == snapshotSuffix {
+                       parsed, parseErr := parseSnapshot(e.Name())
+                       if parseErr == nil {
+                               snapshots = append(snapshots, parsed)
+                       }
+               }
+       }
+       require.GreaterOrEqual(t, len(snapshots), 1)
+       tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"), testSnapshotOption(), nil)
+       require.NotNil(t, tst2)
+       sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > 
snapshots[j] })
+       require.Equal(t, snapshots[0], epoch2, "must load newest valid 
snapshot")
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 5358b1634..693d86ce5 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -117,9 +117,20 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
        sort.Slice(loadedSnapshots, func(i, j int) bool {
                return loadedSnapshots[i] > loadedSnapshots[j]
        })
-       epoch := loadedSnapshots[0]
-       tst.loadSnapshot(epoch, loadedParts)
-       return &tst, epoch
+       var failedSnapshots []uint64
+       for _, epoch := range loadedSnapshots {
+               loadErr := tst.loadSnapshot(epoch, loadedParts)
+               if loadErr == nil {
+                       return &tst, epoch
+               }
+               tst.l.Warn().Err(loadErr).Uint64("epoch", epoch).Msg("cannot 
load snapshot, trying next older")
+               failedSnapshots = append(failedSnapshots, epoch)
+       }
+       for _, id := range failedSnapshots {
+               tst.l.Info().Str("path", filepath.Join(rootPath, 
snapshotName(id))).Msg("delete unreadable snapshot file")
+               fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id)))
+       }
+       return &tst, uint64(time.Now().UnixNano())
 }
 
 // newTSTable creates a new tsTable and starts the background loop.
@@ -165,8 +176,11 @@ type tsTable struct {
        shardID common.ShardID
 }
 
-func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
-       parts := tst.mustReadSnapshot(epoch)
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error {
+       parts, err := tst.readSnapshot(epoch)
+       if err != nil {
+               return err
+       }
        snp := snapshot{
                epoch: epoch,
        }
@@ -200,13 +214,14 @@ func (tst *tsTable) loadSnapshot(epoch uint64, 
loadedParts []uint64) {
        tst.gc.registerSnapshot(&snp)
        tst.gc.clean()
        if len(snp.parts) < 1 {
-               return
+               return nil
        }
        snp.incRef()
        tst.snapshot = &snp
        if needToPersist {
                tst.persistSnapshot(&snp)
        }
+       return nil
 }
 
 func (tst *tsTable) startLoop(cur uint64) {
@@ -235,38 +250,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
                logger.Panicf("cannot marshal partNames to JSON: %s", err)
        }
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm)
+       snapshotTempPath := snapshotPath + ".tmp"
+       lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, 
storage.FilePerm)
        if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", snapshotPath, 
err)
+               logger.Panicf("cannot create lock file %s: %s", 
snapshotTempPath, err)
        }
        n, err := lf.Write(data)
        if err != nil {
-               logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+               _ = lf.Close()
+               logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, 
err)
        }
        if n != len(data) {
-               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotPath, n, len(data))
+               _ = lf.Close()
+               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotTempPath, n, len(data))
        }
+       if closeErr := lf.Close(); closeErr != nil {
+               logger.Panicf("cannot close snapshot temp file %s: %s", 
snapshotTempPath, closeErr)
+       }
+       if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); 
renameErr != nil {
+               logger.Panicf("cannot rename snapshot %s to %s: %s", 
snapshotTempPath, snapshotPath, renameErr)
+       }
+       tst.fileSystem.SyncPath(tst.root)
 }
 
-func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
        data, err := tst.fileSystem.Read(snapshotPath)
        if err != nil {
-               logger.Panicf("cannot read %s: %s", snapshotPath, err)
+               return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
        }
        var partNames []string
        if err := json.Unmarshal(data, &partNames); err != nil {
-               logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+               return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
        }
        var result []uint64
        for i := range partNames {
-               e, err := parseEpoch(partNames[i])
-               if err != nil {
-                       logger.Panicf("cannot parse %s: %s", partNames[i], err)
+               e, parseErr := parseEpoch(partNames[i])
+               if parseErr != nil {
+                       return nil, fmt.Errorf("cannot parse %s: %w", 
partNames[i], parseErr)
                }
                result = append(result, e)
        }
-       return result
+       return result, nil
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+       parts, err := tst.readSnapshot(snapshot)
+       if err != nil {
+               logger.Panicf("%s", err)
+       }
+       return parts
 }
 
 func (tst *tsTable) Close() error {
diff --git a/banyand/stream/snapshot_tstable_test.go 
b/banyand/stream/snapshot_tstable_test.go
new file mode 100644
index 000000000..0401764f6
--- /dev/null
+++ b/banyand/stream/snapshot_tstable_test.go
@@ -0,0 +1,237 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "encoding/json"
+       "path/filepath"
+       "sort"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func streamSnapshotOption() option {
+       return option{
+               flushTimeout: 0,
+               mergePolicy:  newDefaultMergePolicyForTesting(),
+               protector:    protector.Nop{},
+       }
+}
+
+func TestStreamMustWriteSnapshotUsesTempThenRename(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       const epoch = uint64(0x10)
+       partNames := []string{partName(0x1)}
+       tst.mustWriteSnapshot(epoch, partNames)
+       snpPath := filepath.Join(tabDir, snapshotName(epoch))
+       tmpPathFile := snpPath + ".tmp"
+       require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be 
removed after rename")
+       require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must 
exist")
+       data, readErr := fileSystem.Read(snpPath)
+       require.NoError(t, readErr)
+       var decoded []string
+       require.NoError(t, json.Unmarshal(data, &decoded))
+       require.Equal(t, partNames, decoded)
+}
+
+func TestStreamMustWriteSnapshotWithMultipleParts(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       partNames := []string{partName(0x1), partName(0x2), partName(0xff)}
+       tst.mustWriteSnapshot(1, partNames)
+       parts, readErr := tst.readSnapshot(1)
+       require.NoError(t, readErr)
+       require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts)
+}
+
+func TestStreamReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+       require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestStreamReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+       require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestStreamReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       partNamesData, marshalErr := json.Marshal([]string{"not-hex"})
+       require.NoError(t, marshalErr)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+}
+
+func TestStreamReadSnapshotSucceedsOnValidFile(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)})
+       parts, readErr := tst.readSnapshot(1)
+       require.NoError(t, readErr)
+       require.Equal(t, []uint64{0xa, 0xb}, parts)
+}
+
+func TestStreamMustReadSnapshotPanicsOnError(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       require.Panics(t, func() { _ = tst.mustReadSnapshot(1) })
+}
+
+func TestStreamInitTSTableEmptyDirectory(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst, epoch, initErr := initTSTable(fileSystem, tabDir, 
common.Position{}, logger.GetLogger("test"),
+               streamSnapshotOption(), nil, false)
+       require.NoError(t, initErr)
+       require.NotNil(t, tst)
+       require.Greater(t, epoch, uint64(0))
+       require.Nil(t, tst.snapshot)
+}
+
+func TestStreamInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst, epoch, initErr := initTSTable(fileSystem, tabDir, 
common.Position{}, logger.GetLogger("test"),
+               streamSnapshotOption(), nil, false)
+       require.NoError(t, initErr)
+       require.NotNil(t, tst)
+       require.Greater(t, epoch, uint64(0))
+       require.Nil(t, tst.snapshot)
+       require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file 
must be deleted")
+}
+
+func TestStreamTolerantLoaderFallbackToOlderSnapshot(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst, err := newTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"),
+               timestamp.TimeRange{}, streamSnapshotOption(), nil)
+       require.NoError(t, err)
+       tst.mustAddElements(esTS1)
+       time.Sleep(100 * time.Millisecond)
+       require.Eventually(t, func() bool {
+               dd := fileSystem.ReadDir(tabDir)
+               for _, d := range dd {
+                       if d.IsDir() &&
+                               d.Name() != elementIndexFilename &&
+                               d.Name() != inverted.ExternalSegmentTempDirName 
&&
+                               d.Name() != storage.FailedPartsDirName {
+                               return true
+                       }
+               }
+               return false
+       }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+       tst.Close()
+       snapshots := make([]uint64, 0)
+       for _, e := range fileSystem.ReadDir(tabDir) {
+               if filepath.Ext(e.Name()) == snapshotSuffix {
+                       parsed, parseErr := parseSnapshot(e.Name())
+                       if parseErr == nil {
+                               snapshots = append(snapshots, parsed)
+                       }
+               }
+       }
+       require.GreaterOrEqual(t, len(snapshots), 1)
+       sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > 
snapshots[j] })
+       validEpoch := snapshots[0]
+       corruptEpoch := validEpoch + 1
+       corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch))
+       _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600)
+       require.NoError(t, writeErr)
+       tst2, epoch2, initErr := initTSTable(fileSystem, tabDir, 
common.Position{}, logger.GetLogger("test"),
+               streamSnapshotOption(), nil, true)
+       require.NoError(t, initErr)
+       require.NotNil(t, tst2)
+       require.Equal(t, validEpoch, epoch2, "should load older valid snapshot 
when newest is corrupt")
+       require.NotNil(t, tst2.snapshot)
+       require.Equal(t, validEpoch, tst2.snapshot.epoch)
+       if tst2.index != nil {
+               _ = tst2.index.Close()
+       }
+}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index c980a2e9c..dfda961d5 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -69,8 +69,11 @@ type tsTable struct {
        shardID common.ShardID
 }
 
-func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
-       parts := tst.mustReadSnapshot(epoch)
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error {
+       parts, err := tst.readSnapshot(epoch)
+       if err != nil {
+               return err
+       }
        snp := snapshot{
                epoch: epoch,
        }
@@ -87,9 +90,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts 
[]uint64) {
                        tst.gc.removePart(id)
                        continue
                }
-               err := validatePartMetadata(tst.fileSystem, partPath(tst.root, 
id))
-               if err != nil {
-                       tst.l.Info().Err(err).Uint64("id", id).Msg("cannot 
validate part metadata. skip and delete it")
+               validateErr := validatePartMetadata(tst.fileSystem, 
partPath(tst.root, id))
+               if validateErr != nil {
+                       tst.l.Info().Err(validateErr).Uint64("id", 
id).Msg("cannot validate part metadata. skip and delete it")
                        tst.gc.removePart(id)
                        needToPersist = true
                        continue
@@ -104,13 +107,14 @@ func (tst *tsTable) loadSnapshot(epoch uint64, 
loadedParts []uint64) {
        tst.gc.registerSnapshot(&snp)
        tst.gc.clean()
        if len(snp.parts) < 1 {
-               return
+               return nil
        }
        snp.incRef()
        tst.snapshot = &snp
        if needToPersist {
                tst.persistSnapshot(&snp)
        }
+       return nil
 }
 
 func (tst *tsTable) startLoop(cur uint64) {
@@ -152,38 +156,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
                logger.Panicf("cannot marshal partNames to JSON: %s", err)
        }
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm)
+       snapshotTempPath := snapshotPath + ".tmp"
+       lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, 
storage.FilePerm)
        if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", snapshotPath, 
err)
+               logger.Panicf("cannot create lock file %s: %s", 
snapshotTempPath, err)
        }
        n, err := lf.Write(data)
        if err != nil {
-               logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+               _ = lf.Close()
+               logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, 
err)
        }
        if n != len(data) {
-               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotPath, n, len(data))
+               _ = lf.Close()
+               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotTempPath, n, len(data))
+       }
+       if closeErr := lf.Close(); closeErr != nil {
+               logger.Panicf("cannot close snapshot temp file %s: %s", 
snapshotTempPath, closeErr)
        }
+       if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); 
renameErr != nil {
+               logger.Panicf("cannot rename snapshot %s to %s: %s", 
snapshotTempPath, snapshotPath, renameErr)
+       }
+       tst.fileSystem.SyncPath(tst.root)
 }
 
-func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
        data, err := tst.fileSystem.Read(snapshotPath)
        if err != nil {
-               logger.Panicf("cannot read %s: %s", snapshotPath, err)
+               return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
        }
        var partNames []string
        if err := json.Unmarshal(data, &partNames); err != nil {
-               logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+               return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
        }
        var result []uint64
        for i := range partNames {
-               e, err := parseEpoch(partNames[i])
-               if err != nil {
-                       logger.Panicf("cannot parse %s: %s", partNames[i], err)
+               e, parseErr := parseEpoch(partNames[i])
+               if parseErr != nil {
+                       return nil, fmt.Errorf("cannot parse %s: %w", 
partNames[i], parseErr)
                }
                result = append(result, e)
        }
-       return result
+       return result, nil
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+       parts, err := tst.readSnapshot(snapshot)
+       if err != nil {
+               logger.Panicf("%s", err)
+       }
+       return parts
 }
 
 // initTSTable initializes a tsTable and loads parts/snapshots, but does not 
start any background loops.
@@ -273,9 +295,20 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
        sort.Slice(loadedSnapshots, func(i, j int) bool {
                return loadedSnapshots[i] > loadedSnapshots[j]
        })
-       epoch := loadedSnapshots[0]
-       tst.loadSnapshot(epoch, loadedParts)
-       return &tst, epoch, nil
+       var failedSnapshots []uint64
+       for _, epoch := range loadedSnapshots {
+               loadErr := tst.loadSnapshot(epoch, loadedParts)
+               if loadErr == nil {
+                       return &tst, epoch, nil
+               }
+               tst.l.Warn().Err(loadErr).Uint64("epoch", epoch).Msg("cannot 
load snapshot, trying next older")
+               failedSnapshots = append(failedSnapshots, epoch)
+       }
+       for _, id := range failedSnapshots {
+               tst.l.Info().Str("path", filepath.Join(rootPath, 
snapshotName(id))).Msg("delete unreadable snapshot file")
+               fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id)))
+       }
+       return &tst, uint64(time.Now().UnixNano()), nil
 }
 
 func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
diff --git a/banyand/trace/snapshot_tstable_test.go 
b/banyand/trace/snapshot_tstable_test.go
new file mode 100644
index 000000000..a0caccc82
--- /dev/null
+++ b/banyand/trace/snapshot_tstable_test.go
@@ -0,0 +1,224 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "encoding/json"
+       "path/filepath"
+       "sort"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func traceSnapshotOption() option {
+       return option{
+               flushTimeout: 0,
+               mergePolicy:  newDefaultMergePolicyForTesting(),
+               protector:    protector.Nop{},
+       }
+}
+
+func TestTraceMustWriteSnapshotUsesTempThenRename(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       const epoch = uint64(0x10)
+       partNames := []string{partName(0x1)}
+       tst.mustWriteSnapshot(epoch, partNames)
+       snpPath := filepath.Join(tabDir, snapshotName(epoch))
+       tmpPathFile := snpPath + ".tmp"
+       require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be 
removed after rename")
+       require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must 
exist")
+       data, readErr := fileSystem.Read(snpPath)
+       require.NoError(t, readErr)
+       var decoded []string
+       require.NoError(t, json.Unmarshal(data, &decoded))
+       require.Equal(t, partNames, decoded)
+}
+
+func TestTraceMustWriteSnapshotWithMultipleParts(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       partNames := []string{partName(0x1), partName(0x2), partName(0xff)}
+       tst.mustWriteSnapshot(1, partNames)
+       parts, readErr := tst.readSnapshot(1)
+       require.NoError(t, readErr)
+       require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts)
+}
+
+func TestTraceReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+       require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestTraceReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+       require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestTraceReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       partNamesData, marshalErr := json.Marshal([]string{"not-hex"})
+       require.NoError(t, marshalErr)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       _, readErr := tst.readSnapshot(1)
+       require.Error(t, readErr)
+}
+
+func TestTraceReadSnapshotSucceedsOnValidFile(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)})
+       parts, readErr := tst.readSnapshot(1)
+       require.NoError(t, readErr)
+       require.Equal(t, []uint64{0xa, 0xb}, parts)
+}
+
+func TestTraceMustReadSnapshotPanicsOnError(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+       require.Panics(t, func() { _ = tst.mustReadSnapshot(1) })
+}
+
+func TestTraceInitTSTableEmptyDirectory(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"), traceSnapshotOption(), nil)
+       require.NotNil(t, tst)
+       require.Greater(t, epoch, uint64(0))
+       require.Nil(t, tst.snapshot)
+}
+
+func TestTraceInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       snpPath := filepath.Join(tabDir, snapshotName(1))
+       _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+       require.NoError(t, writeErr)
+       tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"), traceSnapshotOption(), nil)
+       require.NotNil(t, tst)
+       require.Greater(t, epoch, uint64(0))
+       require.Nil(t, tst.snapshot)
+       require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file 
must be deleted")
+}
+
+func TestTraceTolerantLoaderFallbackToOlderSnapshot(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+       tabDir := filepath.Join(tmpPath, "tab")
+       fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+       tst, err := newTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"),
+               timestamp.TimeRange{}, traceSnapshotOption(), nil)
+       require.NoError(t, err)
+       tst.mustAddTraces(tsTS1, nil)
+       time.Sleep(100 * time.Millisecond)
+       require.Eventually(t, func() bool {
+               dd := fileSystem.ReadDir(tabDir)
+               for _, d := range dd {
+                       if d.IsDir() && d.Name() != sidxDirName && d.Name() != 
storage.FailedPartsDirName {
+                               return true
+                       }
+               }
+               return false
+       }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+       tst.Close()
+       snapshots := make([]uint64, 0)
+       for _, e := range fileSystem.ReadDir(tabDir) {
+               if filepath.Ext(e.Name()) == snapshotSuffix {
+                       parsed, parseErr := parseSnapshot(e.Name())
+                       if parseErr == nil {
+                               snapshots = append(snapshots, parsed)
+                       }
+               }
+       }
+       require.GreaterOrEqual(t, len(snapshots), 1)
+       sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > 
snapshots[j] })
+       validEpoch := snapshots[0]
+       corruptEpoch := validEpoch + 1
+       corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch))
+       _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600)
+       require.NoError(t, writeErr)
+       tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{}, 
logger.GetLogger("test"), traceSnapshotOption(), nil)
+       require.NotNil(t, tst2)
+       require.Equal(t, validEpoch, epoch2, "should load older valid snapshot 
when newest is corrupt")
+       require.NotNil(t, tst2.snapshot)
+       require.Equal(t, validEpoch, tst2.snapshot.epoch)
+}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index cc47bae77..0fce82c54 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -69,8 +69,11 @@ type tsTable struct {
        shardID common.ShardID
 }
 
-func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
-       parts := tst.mustReadSnapshot(epoch)
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error {
+       parts, err := tst.readSnapshot(epoch)
+       if err != nil {
+               return err
+       }
        snp := snapshot{
                epoch: epoch,
        }
@@ -87,9 +90,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts 
[]uint64) {
                        tst.gc.removePart(id)
                        continue
                }
-               err := validatePartMetadata(tst.fileSystem, partPath(tst.root, 
id))
-               if err != nil {
-                       tst.l.Info().Err(err).Uint64("id", id).Msg("cannot 
validate part metadata. skip and delete it")
+               validateErr := validatePartMetadata(tst.fileSystem, 
partPath(tst.root, id))
+               if validateErr != nil {
+                       tst.l.Info().Err(validateErr).Uint64("id", 
id).Msg("cannot validate part metadata. skip and delete it")
                        tst.gc.removePart(id)
                        needToPersist = true
                        continue
@@ -104,7 +107,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts 
[]uint64) {
        tst.gc.registerSnapshot(&snp)
        tst.gc.clean()
        if len(snp.parts) < 1 {
-               return
+               return nil
        }
        snp.incRef()
        tst.snapshot = &snp
@@ -112,6 +115,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts 
[]uint64) {
        if needToPersist {
                tst.persistSnapshot(&snp)
        }
+       return nil
 }
 
 func (tst *tsTable) startLoop(cur uint64) {
@@ -153,38 +157,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
                logger.Panicf("cannot marshal partNames to JSON: %s", err)
        }
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm)
+       snapshotTempPath := snapshotPath + ".tmp"
+       lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, 
storage.FilePerm)
        if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", snapshotPath, 
err)
+               logger.Panicf("cannot create lock file %s: %s", 
snapshotTempPath, err)
        }
        n, err := lf.Write(data)
        if err != nil {
-               logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+               _ = lf.Close()
+               logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, 
err)
        }
        if n != len(data) {
-               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotPath, n, len(data))
+               _ = lf.Close()
+               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotTempPath, n, len(data))
        }
+       if closeErr := lf.Close(); closeErr != nil {
+               logger.Panicf("cannot close snapshot temp file %s: %s", 
snapshotTempPath, closeErr)
+       }
+       if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); 
renameErr != nil {
+               logger.Panicf("cannot rename snapshot %s to %s: %s", 
snapshotTempPath, snapshotPath, renameErr)
+       }
+       tst.fileSystem.SyncPath(tst.root)
 }
 
-func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
        data, err := tst.fileSystem.Read(snapshotPath)
        if err != nil {
-               logger.Panicf("cannot read %s: %s", snapshotPath, err)
+               return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
        }
        var partNames []string
        if err := json.Unmarshal(data, &partNames); err != nil {
-               logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+               return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
        }
        var result []uint64
        for i := range partNames {
-               e, err := parseEpoch(partNames[i])
-               if err != nil {
-                       logger.Panicf("cannot parse %s: %s", partNames[i], err)
+               e, parseErr := parseEpoch(partNames[i])
+               if parseErr != nil {
+                       return nil, fmt.Errorf("cannot parse %s: %w", 
partNames[i], parseErr)
                }
                result = append(result, e)
        }
-       return result
+       return result, nil
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+       parts, err := tst.readSnapshot(snapshot)
+       if err != nil {
+               logger.Panicf("%s", err)
+       }
+       return parts
 }
 
 // initTSTable initializes a tsTable and loads parts/snapshots, but does not 
start any background loops.
@@ -262,9 +284,20 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
        sort.Slice(loadedSnapshots, func(i, j int) bool {
                return loadedSnapshots[i] > loadedSnapshots[j]
        })
-       epoch := loadedSnapshots[0]
-       tst.loadSnapshot(epoch, loadedParts)
-       return &tst, epoch
+       var failedSnapshots []uint64
+       for _, epoch := range loadedSnapshots {
+               loadErr := tst.loadSnapshot(epoch, loadedParts)
+               if loadErr == nil {
+                       return &tst, epoch
+               }
+               tst.l.Warn().Err(loadErr).Uint64("epoch", epoch).Msg("cannot 
load snapshot, trying next older")
+               failedSnapshots = append(failedSnapshots, epoch)
+       }
+       for _, id := range failedSnapshots {
+               tst.l.Info().Str("path", filepath.Join(rootPath, 
snapshotName(id))).Msg("delete unreadable snapshot file")
+               fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id)))
+       }
+       return &tst, uint64(time.Now().UnixNano())
 }
 
 func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 9bd585e89..2254fb29b 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -114,6 +114,8 @@ type FileSystem interface {
        Read(name string) ([]byte, error)
        // Delete the file.
        DeleteFile(name string) error
+       // Rename renames oldPath to newPath atomically.
+       Rename(oldPath, newPath string) error
        // Delete the directory.
        MustRMAll(path string)
        // SyncPath the directory of file.
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index f025653f5..aa654050c 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -256,6 +256,29 @@ func (fs *localFileSystem) Read(name string) ([]byte, 
error) {
        }
 }
 
+// Rename renames oldPath to newPath atomically.
+func (fs *localFileSystem) Rename(oldPath, newPath string) error {
+       if err := os.Rename(oldPath, newPath); err != nil {
+               if os.IsNotExist(err) {
+                       return &FileSystemError{
+                               Code:    IsNotExistError,
+                               Message: fmt.Sprintf("Rename failed, file does 
not exist, old: %s, new: %s, error: %s", oldPath, newPath, err),
+                       }
+               }
+               if os.IsPermission(err) {
+                       return &FileSystemError{
+                               Code:    permissionError,
+                               Message: fmt.Sprintf("Rename failed, permission 
denied, old: %s, new: %s, error: %s", oldPath, newPath, err),
+                       }
+               }
+               return &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Rename failed, old: %s, new: %s, 
error: %s", oldPath, newPath, err),
+               }
+       }
+       return nil
+}
+
 // DeleteFile is used to delete the file.
 func (fs *localFileSystem) DeleteFile(name string) error {
        err := os.Remove(name)


Reply via email to