This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 617edf8c3 Fix snapshot handling to prevent crashes (#1016)
617edf8c3 is described below
commit 617edf8c32dfe1ca933fd57502010726845a80ba
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Mar 19 20:06:32 2026 +0800
Fix snapshot handling to prevent crashes (#1016)
* Fix snapshot handling to prevent crashes and improve error handling in
tsTable.
* Refactor snapshot loading logic in initTSTable to improve error handling
and cleanup of unreadable snapshots across multiple modules.
* Add tests to ensure multiple corrupt snapshots are deleted on fallback
across measure, stream, and trace modules.
---
banyand/measure/snapshot_test.go | 292 +++++++++++++++++++++++++++++++
banyand/measure/tstable.go | 68 ++++++--
banyand/stream/snapshot_tstable_test.go | 297 ++++++++++++++++++++++++++++++++
banyand/stream/tstable.go | 74 +++++---
banyand/trace/snapshot_tstable_test.go | 276 +++++++++++++++++++++++++++++
banyand/trace/tstable.go | 74 +++++---
pkg/fs/file_system.go | 2 +
pkg/fs/local_file_system.go | 23 +++
8 files changed, 1049 insertions(+), 57 deletions(-)
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index c5c717628..dc8c51d6b 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -18,8 +18,10 @@
package measure
import (
+ "encoding/json"
"path/filepath"
"runtime"
+ "sort"
"testing"
"time"
@@ -27,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/fs"
@@ -520,3 +523,292 @@ func TestSnapshotFunctionality(t *testing.T) {
}
}
}
+
+func testSnapshotOption() option {
+ return option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicyForTesting(),
+ protector: protector.Nop{},
+ }
+}
+
+func TestMustWriteSnapshotUsesTempThenRename(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ const epoch = uint64(0x10)
+ partNames := []string{partName(0x1)}
+ tst.mustWriteSnapshot(epoch, partNames)
+ snpPath := filepath.Join(tabDir, snapshotName(epoch))
+ tmpPathFile := snpPath + ".tmp"
+ require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be
removed after rename")
+ require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must
exist")
+ data, err := fileSystem.Read(snpPath)
+ require.NoError(t, err)
+ var decoded []string
+ require.NoError(t, json.Unmarshal(data, &decoded))
+ require.Equal(t, partNames, decoded)
+}
+
+func TestMustWriteSnapshotWithMultipleParts(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ partNames := []string{partName(0x1), partName(0x2), partName(0xff)}
+ tst.mustWriteSnapshot(1, partNames)
+ parts, readErr := tst.readSnapshot(1)
+ require.NoError(t, readErr)
+ require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts)
+}
+
+func TestMustWriteSnapshotEmptyPartList(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ tst.mustWriteSnapshot(1, []string{})
+ parts, readErr := tst.readSnapshot(1)
+ require.NoError(t, readErr)
+ require.Empty(t, parts)
+}
+
+func TestTolerantLoaderFallbackToOlderSnapshot(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, err := newTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"),
+ timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy:
newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil)
+ require.NoError(t, err)
+ tst.mustAddDataPoints(dpsTS1)
+ time.Sleep(100 * time.Millisecond)
+ require.Eventually(t, func() bool {
+ dd := fileSystem.ReadDir(tabDir)
+ for _, d := range dd {
+ if d.IsDir() && d.Name() != storage.FailedPartsDirName {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ tst.Close()
+ snapshots := make([]uint64, 0)
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if filepath.Ext(e.Name()) == snapshotSuffix {
+ parsed, parseErr := parseSnapshot(e.Name())
+ if parseErr == nil {
+ snapshots = append(snapshots, parsed)
+ }
+ }
+ }
+ require.GreaterOrEqual(t, len(snapshots), 1)
+ sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
+ validEpoch := snapshots[0]
+ corruptEpoch := validEpoch + 1
+ corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch))
+ _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600)
+ require.NoError(t, writeErr)
+ tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), testSnapshotOption(), nil)
+ require.NotNil(t, tst2)
+ require.Equal(t, validEpoch, epoch2, "should load older valid snapshot
when newest is corrupt")
+ require.NotNil(t, tst2.snapshot)
+ require.Equal(t, validEpoch, tst2.snapshot.epoch)
+ require.False(t, fileSystem.IsExist(corruptPath), "corrupt newer
snapshot must be deleted after fallback")
+}
+
+func TestMeasureInitTSTableDeletesMultipleFailedSnapshotsOnFallback(t
*testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, err := newTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"),
+ timestamp.TimeRange{}, testSnapshotOption(), nil)
+ require.NoError(t, err)
+ tst.mustAddDataPoints(dpsTS1)
+ time.Sleep(100 * time.Millisecond)
+ require.Eventually(t, func() bool {
+ dd := fileSystem.ReadDir(tabDir)
+ for _, d := range dd {
+ if d.IsDir() && d.Name() != storage.FailedPartsDirName {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ tst.Close()
+ snapshots := make([]uint64, 0)
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if filepath.Ext(e.Name()) == snapshotSuffix {
+ parsed, parseErr := parseSnapshot(e.Name())
+ if parseErr == nil {
+ snapshots = append(snapshots, parsed)
+ }
+ }
+ }
+ require.GreaterOrEqual(t, len(snapshots), 1)
+ sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
+ validEpoch := snapshots[0]
+ // Create two corrupt newer snapshots
+ corruptEpoch1 := validEpoch + 1
+ corruptEpoch2 := validEpoch + 2
+ corruptPath1 := filepath.Join(tabDir, snapshotName(corruptEpoch1))
+ corruptPath2 := filepath.Join(tabDir, snapshotName(corruptEpoch2))
+ _, writeErr := fileSystem.Write([]byte{}, corruptPath1, 0o600)
+ require.NoError(t, writeErr)
+ _, writeErr = fileSystem.Write([]byte{}, corruptPath2, 0o600)
+ require.NoError(t, writeErr)
+ tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), testSnapshotOption(), nil)
+ require.NotNil(t, tst2)
+ require.Equal(t, validEpoch, epoch2, "should load older valid snapshot
when newer ones are corrupt")
+ require.NotNil(t, tst2.snapshot)
+ require.Equal(t, validEpoch, tst2.snapshot.epoch)
+ require.False(t, fileSystem.IsExist(corruptPath1), "first corrupt
snapshot must be deleted after fallback")
+ require.False(t, fileSystem.IsExist(corruptPath2), "second corrupt
snapshot must be deleted after fallback")
+}
+
+func TestReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+ require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+ require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ partNamesData, marshalErr := json.Marshal([]string{"not-hex"})
+ require.NoError(t, marshalErr)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+}
+
+func TestReadSnapshotSucceedsOnValidFile(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)})
+ parts, readErr := tst.readSnapshot(1)
+ require.NoError(t, readErr)
+ require.Equal(t, []uint64{0xa, 0xb}, parts)
+}
+
+func TestMustReadSnapshotPanicsOnError(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ require.Panics(t, func() { _ = tst.mustReadSnapshot(1) })
+}
+
+func TestInitTSTableEmptyDirectory(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, epoch := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), testSnapshotOption(), nil)
+ require.NotNil(t, tst)
+ require.Greater(t, epoch, uint64(0))
+ require.Nil(t, tst.snapshot)
+}
+
+func TestInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst, epoch := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), testSnapshotOption(), nil)
+ require.NotNil(t, tst)
+ require.Greater(t, epoch, uint64(0))
+ require.Nil(t, tst.snapshot)
+ require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file
must be deleted")
+}
+
+func TestInitTSTableLoadsNewestWhenMultipleValid(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, err := newTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"),
+ timestamp.TimeRange{}, testSnapshotOption(), nil)
+ require.NoError(t, err)
+ tst.mustAddDataPoints(dpsTS1)
+ time.Sleep(100 * time.Millisecond)
+ require.Eventually(t, func() bool {
+ dd := fileSystem.ReadDir(tabDir)
+ for _, d := range dd {
+ if d.IsDir() && d.Name() != storage.FailedPartsDirName {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ tst.Close()
+ snapshots := make([]uint64, 0)
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if filepath.Ext(e.Name()) == snapshotSuffix {
+ parsed, parseErr := parseSnapshot(e.Name())
+ if parseErr == nil {
+ snapshots = append(snapshots, parsed)
+ }
+ }
+ }
+ require.GreaterOrEqual(t, len(snapshots), 1)
+ tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), testSnapshotOption(), nil)
+ require.NotNil(t, tst2)
+ sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
+ require.Equal(t, snapshots[0], epoch2, "must load newest valid
snapshot")
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 5358b1634..ef191a4e0 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -117,9 +117,21 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
sort.Slice(loadedSnapshots, func(i, j int) bool {
return loadedSnapshots[i] > loadedSnapshots[j]
})
- epoch := loadedSnapshots[0]
- tst.loadSnapshot(epoch, loadedParts)
- return &tst, epoch
+ var failedSnapshotIDs []uint64
+ for _, epoch := range loadedSnapshots {
+ loadErr := tst.loadSnapshot(epoch, loadedParts)
+ if loadErr != nil {
+ tst.l.Warn().Err(loadErr).Uint64("epoch",
epoch).Msg("cannot load snapshot, trying next older")
+ failedSnapshotIDs = append(failedSnapshotIDs, epoch)
+ continue
+ }
+ for _, id := range failedSnapshotIDs {
+ tst.l.Info().Str("path", filepath.Join(rootPath,
snapshotName(id))).Msg("delete unreadable snapshot file")
+ fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
+ }
+ return &tst, epoch
+ }
+ return &tst, uint64(time.Now().UnixNano())
}
// newTSTable creates a new tsTable and starts the background loop.
@@ -165,8 +177,11 @@ type tsTable struct {
shardID common.ShardID
}
-func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
- parts := tst.mustReadSnapshot(epoch)
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error {
+ parts, err := tst.readSnapshot(epoch)
+ if err != nil {
+ return err
+ }
snp := snapshot{
epoch: epoch,
}
@@ -200,13 +215,14 @@ func (tst *tsTable) loadSnapshot(epoch uint64,
loadedParts []uint64) {
tst.gc.registerSnapshot(&snp)
tst.gc.clean()
if len(snp.parts) < 1 {
- return
+ return nil
}
snp.incRef()
tst.snapshot = &snp
if needToPersist {
tst.persistSnapshot(&snp)
}
+ return nil
}
func (tst *tsTable) startLoop(cur uint64) {
@@ -235,38 +251,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64,
partNames []string) {
logger.Panicf("cannot marshal partNames to JSON: %s", err)
}
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
- lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm)
+ snapshotTempPath := snapshotPath + ".tmp"
+ lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath,
storage.FilePerm)
if err != nil {
- logger.Panicf("cannot create lock file %s: %s", snapshotPath,
err)
+ logger.Panicf("cannot create lock file %s: %s",
snapshotTempPath, err)
}
n, err := lf.Write(data)
if err != nil {
- logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+ _ = lf.Close()
+ logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath,
err)
}
if n != len(data) {
- logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotPath, n, len(data))
+ _ = lf.Close()
+ logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotTempPath, n, len(data))
+ }
+ if closeErr := lf.Close(); closeErr != nil {
+ logger.Panicf("cannot close snapshot temp file %s: %s",
snapshotTempPath, closeErr)
+ }
+ if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath);
renameErr != nil {
+ logger.Panicf("cannot rename snapshot %s to %s: %s",
snapshotTempPath, snapshotPath, renameErr)
}
+ tst.fileSystem.SyncPath(tst.root)
}
-func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
data, err := tst.fileSystem.Read(snapshotPath)
if err != nil {
- logger.Panicf("cannot read %s: %s", snapshotPath, err)
+ return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
}
var partNames []string
if err := json.Unmarshal(data, &partNames); err != nil {
- logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+ return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
}
var result []uint64
for i := range partNames {
- e, err := parseEpoch(partNames[i])
- if err != nil {
- logger.Panicf("cannot parse %s: %s", partNames[i], err)
+ e, parseErr := parseEpoch(partNames[i])
+ if parseErr != nil {
+ return nil, fmt.Errorf("cannot parse %s: %w",
partNames[i], parseErr)
}
result = append(result, e)
}
- return result
+ return result, nil
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+ parts, err := tst.readSnapshot(snapshot)
+ if err != nil {
+ logger.Panicf("%s", err)
+ }
+ return parts
}
func (tst *tsTable) Close() error {
diff --git a/banyand/stream/snapshot_tstable_test.go
b/banyand/stream/snapshot_tstable_test.go
new file mode 100644
index 000000000..e3527356f
--- /dev/null
+++ b/banyand/stream/snapshot_tstable_test.go
@@ -0,0 +1,297 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "encoding/json"
+ "path/filepath"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func streamSnapshotOption() option {
+ return option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicyForTesting(),
+ protector: protector.Nop{},
+ }
+}
+
+func TestStreamMustWriteSnapshotUsesTempThenRename(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ const epoch = uint64(0x10)
+ partNames := []string{partName(0x1)}
+ tst.mustWriteSnapshot(epoch, partNames)
+ snpPath := filepath.Join(tabDir, snapshotName(epoch))
+ tmpPathFile := snpPath + ".tmp"
+ require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be
removed after rename")
+ require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must
exist")
+ data, readErr := fileSystem.Read(snpPath)
+ require.NoError(t, readErr)
+ var decoded []string
+ require.NoError(t, json.Unmarshal(data, &decoded))
+ require.Equal(t, partNames, decoded)
+}
+
+func TestStreamMustWriteSnapshotWithMultipleParts(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ partNames := []string{partName(0x1), partName(0x2), partName(0xff)}
+ tst.mustWriteSnapshot(1, partNames)
+ parts, readErr := tst.readSnapshot(1)
+ require.NoError(t, readErr)
+ require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts)
+}
+
+func TestStreamReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+ require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestStreamReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+ require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestStreamReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ partNamesData, marshalErr := json.Marshal([]string{"not-hex"})
+ require.NoError(t, marshalErr)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+}
+
+func TestStreamReadSnapshotSucceedsOnValidFile(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)})
+ parts, readErr := tst.readSnapshot(1)
+ require.NoError(t, readErr)
+ require.Equal(t, []uint64{0xa, 0xb}, parts)
+}
+
+func TestStreamMustReadSnapshotPanicsOnError(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ require.Panics(t, func() { _ = tst.mustReadSnapshot(1) })
+}
+
+func TestStreamInitTSTableEmptyDirectory(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, epoch, initErr := initTSTable(fileSystem, tabDir,
common.Position{}, logger.GetLogger("test"),
+ streamSnapshotOption(), nil, false)
+ require.NoError(t, initErr)
+ require.NotNil(t, tst)
+ require.Greater(t, epoch, uint64(0))
+ require.Nil(t, tst.snapshot)
+}
+
+func TestStreamInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst, epoch, initErr := initTSTable(fileSystem, tabDir,
common.Position{}, logger.GetLogger("test"),
+ streamSnapshotOption(), nil, false)
+ require.NoError(t, initErr)
+ require.NotNil(t, tst)
+ require.Greater(t, epoch, uint64(0))
+ require.Nil(t, tst.snapshot)
+ require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file
must be deleted")
+}
+
+func TestStreamTolerantLoaderFallbackToOlderSnapshot(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, err := newTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"),
+ timestamp.TimeRange{}, streamSnapshotOption(), nil)
+ require.NoError(t, err)
+ tst.mustAddElements(esTS1)
+ time.Sleep(100 * time.Millisecond)
+ require.Eventually(t, func() bool {
+ dd := fileSystem.ReadDir(tabDir)
+ for _, d := range dd {
+ if d.IsDir() &&
+ d.Name() != elementIndexFilename &&
+ d.Name() != inverted.ExternalSegmentTempDirName
&&
+ d.Name() != storage.FailedPartsDirName {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ tst.Close()
+ snapshots := make([]uint64, 0)
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if filepath.Ext(e.Name()) == snapshotSuffix {
+ parsed, parseErr := parseSnapshot(e.Name())
+ if parseErr == nil {
+ snapshots = append(snapshots, parsed)
+ }
+ }
+ }
+ require.GreaterOrEqual(t, len(snapshots), 1)
+ sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
+ validEpoch := snapshots[0]
+ corruptEpoch := validEpoch + 1
+ corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch))
+ _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600)
+ require.NoError(t, writeErr)
+ tst2, epoch2, initErr := initTSTable(fileSystem, tabDir,
common.Position{}, logger.GetLogger("test"),
+ streamSnapshotOption(), nil, true)
+ require.NoError(t, initErr)
+ require.NotNil(t, tst2)
+ require.Equal(t, validEpoch, epoch2, "should load older valid snapshot
when newest is corrupt")
+ require.NotNil(t, tst2.snapshot)
+ require.Equal(t, validEpoch, tst2.snapshot.epoch)
+ require.False(t, fileSystem.IsExist(corruptPath), "corrupt newer
snapshot must be deleted after fallback")
+ if tst2.index != nil {
+ _ = tst2.index.Close()
+ }
+}
+
+func TestStreamInitTSTableDeletesMultipleFailedSnapshotsOnFallback(t
*testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, err := newTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"),
+ timestamp.TimeRange{}, streamSnapshotOption(), nil)
+ require.NoError(t, err)
+ tst.mustAddElements(esTS1)
+ time.Sleep(100 * time.Millisecond)
+ require.Eventually(t, func() bool {
+ dd := fileSystem.ReadDir(tabDir)
+ for _, d := range dd {
+ if d.IsDir() &&
+ d.Name() != elementIndexFilename &&
+ d.Name() != inverted.ExternalSegmentTempDirName
&&
+ d.Name() != storage.FailedPartsDirName {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ tst.Close()
+ snapshots := make([]uint64, 0)
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if filepath.Ext(e.Name()) == snapshotSuffix {
+ parsed, parseErr := parseSnapshot(e.Name())
+ if parseErr == nil {
+ snapshots = append(snapshots, parsed)
+ }
+ }
+ }
+ require.GreaterOrEqual(t, len(snapshots), 1)
+ sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
+ validEpoch := snapshots[0]
+ // Create two corrupt newer snapshots
+ corruptEpoch1 := validEpoch + 1
+ corruptEpoch2 := validEpoch + 2
+ corruptPath1 := filepath.Join(tabDir, snapshotName(corruptEpoch1))
+ corruptPath2 := filepath.Join(tabDir, snapshotName(corruptEpoch2))
+ _, writeErr := fileSystem.Write([]byte{}, corruptPath1, 0o600)
+ require.NoError(t, writeErr)
+ _, writeErr = fileSystem.Write([]byte{}, corruptPath2, 0o600)
+ require.NoError(t, writeErr)
+ tst2, epoch2, initErr := initTSTable(fileSystem, tabDir,
common.Position{}, logger.GetLogger("test"),
+ streamSnapshotOption(), nil, true)
+ require.NoError(t, initErr)
+ require.NotNil(t, tst2)
+ require.Equal(t, validEpoch, epoch2, "should load older valid snapshot
when newer ones are corrupt")
+ require.NotNil(t, tst2.snapshot)
+ require.Equal(t, validEpoch, tst2.snapshot.epoch)
+ require.False(t, fileSystem.IsExist(corruptPath1), "first corrupt
snapshot must be deleted after fallback")
+ require.False(t, fileSystem.IsExist(corruptPath2), "second corrupt
snapshot must be deleted after fallback")
+ if tst2.index != nil {
+ _ = tst2.index.Close()
+ }
+}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index c980a2e9c..dc4ea70ba 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -69,8 +69,11 @@ type tsTable struct {
shardID common.ShardID
}
-func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
- parts := tst.mustReadSnapshot(epoch)
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error {
+ parts, err := tst.readSnapshot(epoch)
+ if err != nil {
+ return err
+ }
snp := snapshot{
epoch: epoch,
}
@@ -87,9 +90,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts
[]uint64) {
tst.gc.removePart(id)
continue
}
- err := validatePartMetadata(tst.fileSystem, partPath(tst.root,
id))
- if err != nil {
- tst.l.Info().Err(err).Uint64("id", id).Msg("cannot
validate part metadata. skip and delete it")
+ validateErr := validatePartMetadata(tst.fileSystem,
partPath(tst.root, id))
+ if validateErr != nil {
+ tst.l.Info().Err(validateErr).Uint64("id",
id).Msg("cannot validate part metadata. skip and delete it")
tst.gc.removePart(id)
needToPersist = true
continue
@@ -104,13 +107,14 @@ func (tst *tsTable) loadSnapshot(epoch uint64,
loadedParts []uint64) {
tst.gc.registerSnapshot(&snp)
tst.gc.clean()
if len(snp.parts) < 1 {
- return
+ return nil
}
snp.incRef()
tst.snapshot = &snp
if needToPersist {
tst.persistSnapshot(&snp)
}
+ return nil
}
func (tst *tsTable) startLoop(cur uint64) {
@@ -152,38 +156,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64,
partNames []string) {
logger.Panicf("cannot marshal partNames to JSON: %s", err)
}
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
- lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm)
+ snapshotTempPath := snapshotPath + ".tmp"
+ lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath,
storage.FilePerm)
if err != nil {
- logger.Panicf("cannot create lock file %s: %s", snapshotPath,
err)
+ logger.Panicf("cannot create lock file %s: %s",
snapshotTempPath, err)
}
n, err := lf.Write(data)
if err != nil {
- logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+ _ = lf.Close()
+ logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath,
err)
}
if n != len(data) {
- logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotPath, n, len(data))
+ _ = lf.Close()
+ logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotTempPath, n, len(data))
+ }
+ if closeErr := lf.Close(); closeErr != nil {
+ logger.Panicf("cannot close snapshot temp file %s: %s",
snapshotTempPath, closeErr)
+ }
+ if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath);
renameErr != nil {
+ logger.Panicf("cannot rename snapshot %s to %s: %s",
snapshotTempPath, snapshotPath, renameErr)
}
+ tst.fileSystem.SyncPath(tst.root)
}
-func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
data, err := tst.fileSystem.Read(snapshotPath)
if err != nil {
- logger.Panicf("cannot read %s: %s", snapshotPath, err)
+ return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
}
var partNames []string
if err := json.Unmarshal(data, &partNames); err != nil {
- logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+ return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
}
var result []uint64
for i := range partNames {
- e, err := parseEpoch(partNames[i])
- if err != nil {
- logger.Panicf("cannot parse %s: %s", partNames[i], err)
+ e, parseErr := parseEpoch(partNames[i])
+ if parseErr != nil {
+ return nil, fmt.Errorf("cannot parse %s: %w",
partNames[i], parseErr)
}
result = append(result, e)
}
- return result
+ return result, nil
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+ parts, err := tst.readSnapshot(snapshot)
+ if err != nil {
+ logger.Panicf("%s", err)
+ }
+ return parts
}
// initTSTable initializes a tsTable and loads parts/snapshots, but does not
start any background loops.
@@ -273,9 +295,21 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
sort.Slice(loadedSnapshots, func(i, j int) bool {
return loadedSnapshots[i] > loadedSnapshots[j]
})
- epoch := loadedSnapshots[0]
- tst.loadSnapshot(epoch, loadedParts)
- return &tst, epoch, nil
+ var failedSnapshotIDs []uint64
+ for _, epoch := range loadedSnapshots {
+ loadErr := tst.loadSnapshot(epoch, loadedParts)
+ if loadErr != nil {
+ tst.l.Warn().Err(loadErr).Uint64("epoch",
epoch).Msg("cannot load snapshot, trying next older")
+ failedSnapshotIDs = append(failedSnapshotIDs, epoch)
+ continue
+ }
+ for _, id := range failedSnapshotIDs {
+ tst.l.Info().Str("path", filepath.Join(rootPath,
snapshotName(id))).Msg("delete unreadable snapshot file")
+ fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
+ }
+ return &tst, epoch, nil
+ }
+ return &tst, uint64(time.Now().UnixNano()), nil
}
func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
diff --git a/banyand/trace/snapshot_tstable_test.go
b/banyand/trace/snapshot_tstable_test.go
new file mode 100644
index 000000000..b4a1d97db
--- /dev/null
+++ b/banyand/trace/snapshot_tstable_test.go
@@ -0,0 +1,276 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "encoding/json"
+ "path/filepath"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func traceSnapshotOption() option {
+ return option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicyForTesting(),
+ protector: protector.Nop{},
+ }
+}
+
+func TestTraceMustWriteSnapshotUsesTempThenRename(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ const epoch = uint64(0x10)
+ partNames := []string{partName(0x1)}
+ tst.mustWriteSnapshot(epoch, partNames)
+ snpPath := filepath.Join(tabDir, snapshotName(epoch))
+ tmpPathFile := snpPath + ".tmp"
+ require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be
removed after rename")
+ require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must
exist")
+ data, readErr := fileSystem.Read(snpPath)
+ require.NoError(t, readErr)
+ var decoded []string
+ require.NoError(t, json.Unmarshal(data, &decoded))
+ require.Equal(t, partNames, decoded)
+}
+
+func TestTraceMustWriteSnapshotWithMultipleParts(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ partNames := []string{partName(0x1), partName(0x2), partName(0xff)}
+ tst.mustWriteSnapshot(1, partNames)
+ parts, readErr := tst.readSnapshot(1)
+ require.NoError(t, readErr)
+ require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts)
+}
+
+func TestTraceReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+ require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestTraceReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+ require.Contains(t, readErr.Error(), "cannot parse")
+}
+
+func TestTraceReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ partNamesData, marshalErr := json.Marshal([]string{"not-hex"})
+ require.NoError(t, marshalErr)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ _, readErr := tst.readSnapshot(1)
+ require.Error(t, readErr)
+}
+
+func TestTraceReadSnapshotSucceedsOnValidFile(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)})
+ parts, readErr := tst.readSnapshot(1)
+ require.NoError(t, readErr)
+ require.Equal(t, []uint64{0xa, 0xb}, parts)
+}
+
+func TestTraceMustReadSnapshotPanicsOnError(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst := &tsTable{fileSystem: fileSystem, root: tabDir}
+ require.Panics(t, func() { _ = tst.mustReadSnapshot(1) })
+}
+
+func TestTraceInitTSTableEmptyDirectory(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, epoch := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), traceSnapshotOption(), nil)
+ require.NotNil(t, tst)
+ require.Greater(t, epoch, uint64(0))
+ require.Nil(t, tst.snapshot)
+}
+
+func TestTraceInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ snpPath := filepath.Join(tabDir, snapshotName(1))
+ _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600)
+ require.NoError(t, writeErr)
+ tst, epoch := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), traceSnapshotOption(), nil)
+ require.NotNil(t, tst)
+ require.Greater(t, epoch, uint64(0))
+ require.Nil(t, tst.snapshot)
+ require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file
must be deleted")
+}
+
+func TestTraceTolerantLoaderFallbackToOlderSnapshot(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, err := newTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"),
+ timestamp.TimeRange{}, traceSnapshotOption(), nil)
+ require.NoError(t, err)
+ tst.mustAddTraces(tsTS1, nil)
+ time.Sleep(100 * time.Millisecond)
+ require.Eventually(t, func() bool {
+ dd := fileSystem.ReadDir(tabDir)
+ for _, d := range dd {
+ if d.IsDir() && d.Name() != sidxDirName && d.Name() !=
storage.FailedPartsDirName {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ tst.Close()
+ snapshots := make([]uint64, 0)
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if filepath.Ext(e.Name()) == snapshotSuffix {
+ parsed, parseErr := parseSnapshot(e.Name())
+ if parseErr == nil {
+ snapshots = append(snapshots, parsed)
+ }
+ }
+ }
+ require.GreaterOrEqual(t, len(snapshots), 1)
+ sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
+ validEpoch := snapshots[0]
+ corruptEpoch := validEpoch + 1
+ corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch))
+ _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600)
+ require.NoError(t, writeErr)
+ tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), traceSnapshotOption(), nil)
+ require.NotNil(t, tst2)
+ require.Equal(t, validEpoch, epoch2, "should load older valid snapshot
when newest is corrupt")
+ require.NotNil(t, tst2.snapshot)
+ require.Equal(t, validEpoch, tst2.snapshot.epoch)
+ require.False(t, fileSystem.IsExist(corruptPath), "corrupt newer
snapshot must be deleted after fallback")
+}
+
+func TestTraceInitTSTableDeletesMultipleFailedSnapshotsOnFallback(t
*testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+ tst, err := newTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"),
+ timestamp.TimeRange{}, traceSnapshotOption(), nil)
+ require.NoError(t, err)
+ tst.mustAddTraces(tsTS1, nil)
+ time.Sleep(100 * time.Millisecond)
+ require.Eventually(t, func() bool {
+ dd := fileSystem.ReadDir(tabDir)
+ for _, d := range dd {
+ if d.IsDir() && d.Name() != sidxDirName && d.Name() !=
storage.FailedPartsDirName {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ tst.Close()
+ snapshots := make([]uint64, 0)
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if filepath.Ext(e.Name()) == snapshotSuffix {
+ parsed, parseErr := parseSnapshot(e.Name())
+ if parseErr == nil {
+ snapshots = append(snapshots, parsed)
+ }
+ }
+ }
+ require.GreaterOrEqual(t, len(snapshots), 1)
+ sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
+ validEpoch := snapshots[0]
+ // Create two corrupt newer snapshots
+ corruptEpoch1 := validEpoch + 1
+ corruptEpoch2 := validEpoch + 2
+ corruptPath1 := filepath.Join(tabDir, snapshotName(corruptEpoch1))
+ corruptPath2 := filepath.Join(tabDir, snapshotName(corruptEpoch2))
+ _, writeErr := fileSystem.Write([]byte{}, corruptPath1, 0o600)
+ require.NoError(t, writeErr)
+ _, writeErr = fileSystem.Write([]byte{}, corruptPath2, 0o600)
+ require.NoError(t, writeErr)
+ tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{},
logger.GetLogger("test"), traceSnapshotOption(), nil)
+ require.NotNil(t, tst2)
+ require.Equal(t, validEpoch, epoch2, "should load older valid snapshot
when newer ones are corrupt")
+ require.NotNil(t, tst2.snapshot)
+ require.Equal(t, validEpoch, tst2.snapshot.epoch)
+ require.False(t, fileSystem.IsExist(corruptPath1), "first corrupt
snapshot must be deleted after fallback")
+ require.False(t, fileSystem.IsExist(corruptPath2), "second corrupt
snapshot must be deleted after fallback")
+}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index cc47bae77..dcb68f9b9 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -69,8 +69,11 @@ type tsTable struct {
shardID common.ShardID
}
-func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
- parts := tst.mustReadSnapshot(epoch)
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error {
+ parts, err := tst.readSnapshot(epoch)
+ if err != nil {
+ return err
+ }
snp := snapshot{
epoch: epoch,
}
@@ -87,9 +90,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts
[]uint64) {
tst.gc.removePart(id)
continue
}
- err := validatePartMetadata(tst.fileSystem, partPath(tst.root,
id))
- if err != nil {
- tst.l.Info().Err(err).Uint64("id", id).Msg("cannot
validate part metadata. skip and delete it")
+ validateErr := validatePartMetadata(tst.fileSystem,
partPath(tst.root, id))
+ if validateErr != nil {
+ tst.l.Info().Err(validateErr).Uint64("id",
id).Msg("cannot validate part metadata. skip and delete it")
tst.gc.removePart(id)
needToPersist = true
continue
@@ -104,7 +107,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts
[]uint64) {
tst.gc.registerSnapshot(&snp)
tst.gc.clean()
if len(snp.parts) < 1 {
- return
+ return nil
}
snp.incRef()
tst.snapshot = &snp
@@ -112,6 +115,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts
[]uint64) {
if needToPersist {
tst.persistSnapshot(&snp)
}
+ return nil
}
func (tst *tsTable) startLoop(cur uint64) {
@@ -153,38 +157,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64,
partNames []string) {
logger.Panicf("cannot marshal partNames to JSON: %s", err)
}
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
- lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm)
+ snapshotTempPath := snapshotPath + ".tmp"
+ lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath,
storage.FilePerm)
if err != nil {
- logger.Panicf("cannot create lock file %s: %s", snapshotPath,
err)
+ logger.Panicf("cannot create lock file %s: %s",
snapshotTempPath, err)
}
n, err := lf.Write(data)
if err != nil {
- logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+ _ = lf.Close()
+ logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath,
err)
}
if n != len(data) {
- logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotPath, n, len(data))
+ _ = lf.Close()
+ logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotTempPath, n, len(data))
}
+ if closeErr := lf.Close(); closeErr != nil {
+ logger.Panicf("cannot close snapshot temp file %s: %s",
snapshotTempPath, closeErr)
+ }
+ if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath);
renameErr != nil {
+ logger.Panicf("cannot rename snapshot %s to %s: %s",
snapshotTempPath, snapshotPath, renameErr)
+ }
+ tst.fileSystem.SyncPath(tst.root)
}
-func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
data, err := tst.fileSystem.Read(snapshotPath)
if err != nil {
- logger.Panicf("cannot read %s: %s", snapshotPath, err)
+ return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
}
var partNames []string
if err := json.Unmarshal(data, &partNames); err != nil {
- logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+ return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
}
var result []uint64
for i := range partNames {
- e, err := parseEpoch(partNames[i])
- if err != nil {
- logger.Panicf("cannot parse %s: %s", partNames[i], err)
+ e, parseErr := parseEpoch(partNames[i])
+ if parseErr != nil {
+ return nil, fmt.Errorf("cannot parse %s: %w",
partNames[i], parseErr)
}
result = append(result, e)
}
- return result
+ return result, nil
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+ parts, err := tst.readSnapshot(snapshot)
+ if err != nil {
+ logger.Panicf("%s", err)
+ }
+ return parts
}
// initTSTable initializes a tsTable and loads parts/snapshots, but does not
start any background loops.
@@ -262,9 +284,21 @@ func initTSTable(fileSystem fs.FileSystem, rootPath
string, p common.Position,
sort.Slice(loadedSnapshots, func(i, j int) bool {
return loadedSnapshots[i] > loadedSnapshots[j]
})
- epoch := loadedSnapshots[0]
- tst.loadSnapshot(epoch, loadedParts)
- return &tst, epoch
+ var failedSnapshotIDs []uint64
+ for _, epoch := range loadedSnapshots {
+ loadErr := tst.loadSnapshot(epoch, loadedParts)
+ if loadErr != nil {
+ tst.l.Warn().Err(loadErr).Uint64("epoch",
epoch).Msg("cannot load snapshot, trying next older")
+ failedSnapshotIDs = append(failedSnapshotIDs, epoch)
+ continue
+ }
+ for _, id := range failedSnapshotIDs {
+ tst.l.Info().Str("path", filepath.Join(rootPath,
snapshotName(id))).Msg("delete unreadable snapshot file")
+ fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
+ }
+ return &tst, epoch
+ }
+ return &tst, uint64(time.Now().UnixNano())
}
func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 9bd585e89..2254fb29b 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -114,6 +114,8 @@ type FileSystem interface {
Read(name string) ([]byte, error)
// Delete the file.
DeleteFile(name string) error
+ // Rename renames oldPath to newPath atomically.
+ Rename(oldPath, newPath string) error
// Delete the directory.
MustRMAll(path string)
// SyncPath the directory of file.
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index f025653f5..aa654050c 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -256,6 +256,29 @@ func (fs *localFileSystem) Read(name string) ([]byte,
error) {
}
}
+// Rename renames oldPath to newPath atomically.
+func (fs *localFileSystem) Rename(oldPath, newPath string) error {
+ if err := os.Rename(oldPath, newPath); err != nil {
+ if os.IsNotExist(err) {
+ return &FileSystemError{
+ Code: IsNotExistError,
+ Message: fmt.Sprintf("Rename failed, file does
not exist, old: %s, new: %s, error: %s", oldPath, newPath, err),
+ }
+ }
+ if os.IsPermission(err) {
+ return &FileSystemError{
+ Code: permissionError,
+ Message: fmt.Sprintf("Rename failed, permission
denied, old: %s, new: %s, error: %s", oldPath, newPath, err),
+ }
+ }
+ return &FileSystemError{
+ Code: otherError,
+ Message: fmt.Sprintf("Rename failed, old: %s, new: %s,
error: %s", oldPath, newPath, err),
+ }
+ }
+ return nil
+}
+
// DeleteFile is used to delete the file.
func (fs *localFileSystem) DeleteFile(name string) error {
err := os.Remove(name)