This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/snapshot-crash in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 548d147014ef9165d5a03a43997b2d8c4f01c80a Author: Gao Hongtao <[email protected]> AuthorDate: Thu Mar 12 12:49:00 2026 +0000 Fix snapshot handling to prevent crashes and improve error handling in tsTable. --- banyand/measure/snapshot_test.go | 240 ++++++++++++++++++++++++++++++++ banyand/measure/tstable.go | 67 ++++++--- banyand/stream/snapshot_tstable_test.go | 237 +++++++++++++++++++++++++++++++ banyand/stream/tstable.go | 73 +++++++--- banyand/trace/snapshot_tstable_test.go | 224 +++++++++++++++++++++++++++++ banyand/trace/tstable.go | 73 +++++++--- pkg/fs/file_system.go | 2 + pkg/fs/local_file_system.go | 23 +++ 8 files changed, 882 insertions(+), 57 deletions(-) diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go index c5c717628..1ad40c2e9 100644 --- a/banyand/measure/snapshot_test.go +++ b/banyand/measure/snapshot_test.go @@ -18,8 +18,10 @@ package measure import ( + "encoding/json" "path/filepath" "runtime" + "sort" "testing" "time" @@ -27,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/fs" @@ -520,3 +523,240 @@ func TestSnapshotFunctionality(t *testing.T) { } } } + +func testSnapshotOption() option { + return option{ + flushTimeout: 0, + mergePolicy: newDefaultMergePolicyForTesting(), + protector: protector.Nop{}, + } +} + +func TestMustWriteSnapshotUsesTempThenRename(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + const epoch = uint64(0x10) + partNames := []string{partName(0x1)} + tst.mustWriteSnapshot(epoch, partNames) + snpPath := filepath.Join(tabDir, snapshotName(epoch)) + tmpPathFile := snpPath + ".tmp" + require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be removed after rename") + require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must exist") + data, err := fileSystem.Read(snpPath) + require.NoError(t, err) + var decoded []string + require.NoError(t, json.Unmarshal(data, &decoded)) + require.Equal(t, partNames, decoded) +} + +func TestMustWriteSnapshotWithMultipleParts(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + partNames := []string{partName(0x1), partName(0x2), partName(0xff)} + tst.mustWriteSnapshot(1, partNames) + parts, readErr := tst.readSnapshot(1) + require.NoError(t, readErr) + require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts) +} + +func TestMustWriteSnapshotEmptyPartList(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + tst.mustWriteSnapshot(1, []string{}) + parts, readErr := tst.readSnapshot(1) + require.NoError(t, readErr) + require.Empty(t, parts) +} + +func TestTolerantLoaderFallbackToOlderSnapshot(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst, err := newTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), + timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil) + require.NoError(t, err) + tst.mustAddDataPoints(dpsTS1) + time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + dd := fileSystem.ReadDir(tabDir) + for _, d := range dd { + if d.IsDir() && d.Name() != storage.FailedPartsDirName { + return true + } + } + return false + }, flags.EventuallyTimeout, time.Millisecond, "wait for part") + tst.Close() + snapshots := make([]uint64, 0) + for _, e := range fileSystem.ReadDir(tabDir) { + if filepath.Ext(e.Name()) == snapshotSuffix { + parsed, parseErr := parseSnapshot(e.Name()) + if parseErr == nil { + snapshots = append(snapshots, parsed) + } + } + } + require.GreaterOrEqual(t, len(snapshots), 1) + sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > snapshots[j] }) + validEpoch := snapshots[0] + corruptEpoch := validEpoch + 1 + corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch)) + _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600) + require.NoError(t, writeErr) + tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), testSnapshotOption(), nil) + require.NotNil(t, tst2) + require.Equal(t, validEpoch, epoch2, "should load older valid snapshot when newest is corrupt") + require.NotNil(t, tst2.snapshot) + require.Equal(t, validEpoch, tst2.snapshot.epoch) +} + +func TestReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) + require.Contains(t, readErr.Error(), "cannot parse") +} + +func TestReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) + require.Contains(t, readErr.Error(), "cannot parse") +} + +func TestReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + partNamesData, marshalErr := json.Marshal([]string{"not-hex"}) + require.NoError(t, marshalErr) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) +} + +func TestReadSnapshotSucceedsOnValidFile(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)}) + parts, readErr := tst.readSnapshot(1) + require.NoError(t, readErr) + require.Equal(t, []uint64{0xa, 0xb}, parts) +} + +func TestMustReadSnapshotPanicsOnError(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + require.Panics(t, func() { _ = tst.mustReadSnapshot(1) }) +} + +func TestInitTSTableEmptyDirectory(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), testSnapshotOption(), nil) + require.NotNil(t, tst) + require.Greater(t, epoch, uint64(0)) + require.Nil(t, tst.snapshot) +} + +func TestInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), testSnapshotOption(), nil) + require.NotNil(t, tst) + require.Greater(t, epoch, uint64(0)) + require.Nil(t, tst.snapshot) + require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file must be deleted") +} + +func TestInitTSTableLoadsNewestWhenMultipleValid(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst, err := newTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), + timestamp.TimeRange{}, testSnapshotOption(), nil) + require.NoError(t, err) + tst.mustAddDataPoints(dpsTS1) + time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + dd := fileSystem.ReadDir(tabDir) + for _, d := range dd { + if d.IsDir() && d.Name() != storage.FailedPartsDirName { + return true + } + } + return false + }, flags.EventuallyTimeout, time.Millisecond, "wait for part") + tst.Close() + snapshots := make([]uint64, 0) + for _, e := range fileSystem.ReadDir(tabDir) { + if filepath.Ext(e.Name()) == snapshotSuffix { + parsed, parseErr := parseSnapshot(e.Name()) + if parseErr == nil { + snapshots = append(snapshots, parsed) + } + } + } + require.GreaterOrEqual(t, len(snapshots), 1) + tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), testSnapshotOption(), nil) + require.NotNil(t, tst2) + sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > snapshots[j] }) + require.Equal(t, snapshots[0], epoch2, "must load newest valid snapshot") +} diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 5358b1634..693d86ce5 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -117,9 +117,20 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, sort.Slice(loadedSnapshots, func(i, j int) bool { return loadedSnapshots[i] > loadedSnapshots[j] }) - epoch := loadedSnapshots[0] - tst.loadSnapshot(epoch, loadedParts) - return &tst, epoch + var failedSnapshots []uint64 + for _, epoch := range loadedSnapshots { + loadErr := tst.loadSnapshot(epoch, loadedParts) + if loadErr == nil { + return &tst, epoch + } + tst.l.Warn().Err(loadErr).Uint64("epoch", epoch).Msg("cannot load snapshot, trying next older") + failedSnapshots = append(failedSnapshots, epoch) + } + for _, id := range failedSnapshots { + tst.l.Info().Str("path", filepath.Join(rootPath, snapshotName(id))).Msg("delete unreadable snapshot file") + fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) + } + return &tst, uint64(time.Now().UnixNano()) } // newTSTable creates a new tsTable and starts the background loop. @@ -165,8 +176,11 @@ type tsTable struct { shardID common.ShardID } -func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { - parts := tst.mustReadSnapshot(epoch) +func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error { + parts, err := tst.readSnapshot(epoch) + if err != nil { + return err + } snp := snapshot{ epoch: epoch, } @@ -200,13 +214,14 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { tst.gc.registerSnapshot(&snp) tst.gc.clean() if len(snp.parts) < 1 { - return + return nil } snp.incRef() tst.snapshot = &snp if needToPersist { tst.persistSnapshot(&snp) } + return nil } func (tst *tsTable) startLoop(cur uint64) { @@ -235,38 +250,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) { logger.Panicf("cannot marshal partNames to JSON: %s", err) } snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) - lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm) + snapshotTempPath := snapshotPath + ".tmp" + lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, storage.FilePerm) if err != nil { - logger.Panicf("cannot create lock file %s: %s", snapshotPath, err) + logger.Panicf("cannot create lock file %s: %s", snapshotTempPath, err) } n, err := lf.Write(data) if err != nil { - logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err) + _ = lf.Close() + logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, err) } if n != len(data) { - logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotPath, n, len(data)) + _ = lf.Close() + logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotTempPath, n, len(data)) } + if closeErr := lf.Close(); closeErr != nil { + logger.Panicf("cannot close snapshot temp file %s: %s", snapshotTempPath, closeErr) + } + if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); renameErr != nil { + logger.Panicf("cannot rename snapshot %s to %s: %s", snapshotTempPath, snapshotPath, renameErr) + } + tst.fileSystem.SyncPath(tst.root) } -func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { +func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) { snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) data, err := tst.fileSystem.Read(snapshotPath) if err != nil { - logger.Panicf("cannot read %s: %s", snapshotPath, err) + return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err) } var partNames []string if err := json.Unmarshal(data, &partNames); err != nil { - logger.Panicf("cannot parse %s: %s", snapshotPath, err) + return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err) } var result []uint64 for i := range partNames { - e, err := parseEpoch(partNames[i]) - if err != nil { - logger.Panicf("cannot parse %s: %s", partNames[i], err) + e, parseErr := parseEpoch(partNames[i]) + if parseErr != nil { + return nil, fmt.Errorf("cannot parse %s: %w", partNames[i], parseErr) } result = append(result, e) } - return result + return result, nil +} + +func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { + parts, err := tst.readSnapshot(snapshot) + if err != nil { + logger.Panicf("%s", err) + } + return parts } func (tst *tsTable) Close() error { diff --git a/banyand/stream/snapshot_tstable_test.go b/banyand/stream/snapshot_tstable_test.go new file mode 100644 index 000000000..0401764f6 --- /dev/null +++ b/banyand/stream/snapshot_tstable_test.go @@ -0,0 +1,237 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "encoding/json" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +func streamSnapshotOption() option { + return option{ + flushTimeout: 0, + mergePolicy: newDefaultMergePolicyForTesting(), + protector: protector.Nop{}, + } +} + +func TestStreamMustWriteSnapshotUsesTempThenRename(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + const epoch = uint64(0x10) + partNames := []string{partName(0x1)} + tst.mustWriteSnapshot(epoch, partNames) + snpPath := filepath.Join(tabDir, snapshotName(epoch)) + tmpPathFile := snpPath + ".tmp" + require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be removed after rename") + require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must exist") + data, readErr := fileSystem.Read(snpPath) + require.NoError(t, readErr) + var decoded []string + require.NoError(t, json.Unmarshal(data, &decoded)) + require.Equal(t, partNames, decoded) +} + +func TestStreamMustWriteSnapshotWithMultipleParts(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + partNames := []string{partName(0x1), partName(0x2), partName(0xff)} + tst.mustWriteSnapshot(1, partNames) + parts, readErr := tst.readSnapshot(1) + require.NoError(t, readErr) + require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts) +} + +func TestStreamReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) + require.Contains(t, readErr.Error(), "cannot parse") +} + +func TestStreamReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) + require.Contains(t, readErr.Error(), "cannot parse") +} + +func TestStreamReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + partNamesData, marshalErr := json.Marshal([]string{"not-hex"}) + require.NoError(t, marshalErr) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) +} + +func TestStreamReadSnapshotSucceedsOnValidFile(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)}) + parts, readErr := tst.readSnapshot(1) + require.NoError(t, readErr) + require.Equal(t, []uint64{0xa, 0xb}, parts) +} + +func TestStreamMustReadSnapshotPanicsOnError(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + require.Panics(t, func() { _ = tst.mustReadSnapshot(1) }) +} + +func TestStreamInitTSTableEmptyDirectory(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst, epoch, initErr := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), + streamSnapshotOption(), nil, false) + require.NoError(t, initErr) + require.NotNil(t, tst) + require.Greater(t, epoch, uint64(0)) + require.Nil(t, tst.snapshot) +} + +func TestStreamInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst, epoch, initErr := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), + streamSnapshotOption(), nil, false) + require.NoError(t, initErr) + require.NotNil(t, tst) + require.Greater(t, epoch, uint64(0)) + require.Nil(t, tst.snapshot) + require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file must be deleted") +} + +func TestStreamTolerantLoaderFallbackToOlderSnapshot(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst, err := newTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), + timestamp.TimeRange{}, streamSnapshotOption(), nil) + require.NoError(t, err) + tst.mustAddElements(esTS1) + time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + dd := fileSystem.ReadDir(tabDir) + for _, d := range dd { + if d.IsDir() && + d.Name() != elementIndexFilename && + d.Name() != inverted.ExternalSegmentTempDirName && + d.Name() != storage.FailedPartsDirName { + return true + } + } + return false + }, flags.EventuallyTimeout, time.Millisecond, "wait for part") + tst.Close() + snapshots := make([]uint64, 0) + for _, e := range fileSystem.ReadDir(tabDir) { + if filepath.Ext(e.Name()) == snapshotSuffix { + parsed, parseErr := parseSnapshot(e.Name()) + if parseErr == nil { + snapshots = append(snapshots, parsed) + } + } + } + require.GreaterOrEqual(t, len(snapshots), 1) + sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > snapshots[j] }) + validEpoch := snapshots[0] + corruptEpoch := validEpoch + 1 + corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch)) + _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600) + require.NoError(t, writeErr) + tst2, epoch2, initErr := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), + streamSnapshotOption(), nil, true) + require.NoError(t, initErr) + require.NotNil(t, tst2) + require.Equal(t, validEpoch, epoch2, "should load older valid snapshot when newest is corrupt") + require.NotNil(t, tst2.snapshot) + require.Equal(t, validEpoch, tst2.snapshot.epoch) + if tst2.index != nil { + _ = tst2.index.Close() + } +} diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index c980a2e9c..dfda961d5 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -69,8 +69,11 @@ type tsTable struct { shardID common.ShardID } -func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { - parts := tst.mustReadSnapshot(epoch) +func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error { + parts, err := tst.readSnapshot(epoch) + if err != nil { + return err + } snp := snapshot{ epoch: epoch, } @@ -87,9 +90,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { tst.gc.removePart(id) continue } - err := validatePartMetadata(tst.fileSystem, partPath(tst.root, id)) - if err != nil { - tst.l.Info().Err(err).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it") + validateErr := validatePartMetadata(tst.fileSystem, partPath(tst.root, id)) + if validateErr != nil { + tst.l.Info().Err(validateErr).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it") tst.gc.removePart(id) needToPersist = true continue @@ -104,13 +107,14 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { tst.gc.registerSnapshot(&snp) tst.gc.clean() if len(snp.parts) < 1 { - return + return nil } snp.incRef() tst.snapshot = &snp if needToPersist { tst.persistSnapshot(&snp) } + return nil } func (tst *tsTable) startLoop(cur uint64) { @@ -152,38 +156,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) { logger.Panicf("cannot marshal partNames to JSON: %s", err) } snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) - lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm) + snapshotTempPath := snapshotPath + ".tmp" + lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, storage.FilePerm) if err != nil { - logger.Panicf("cannot create lock file %s: %s", snapshotPath, err) + logger.Panicf("cannot create lock file %s: %s", snapshotTempPath, err) } n, err := lf.Write(data) if err != nil { - logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err) + _ = lf.Close() + logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, err) } if n != len(data) { - logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotPath, n, len(data)) + _ = lf.Close() + logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotTempPath, n, len(data)) + } + if closeErr := lf.Close(); closeErr != nil { + logger.Panicf("cannot close snapshot temp file %s: %s", snapshotTempPath, closeErr) } + if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); renameErr != nil { + logger.Panicf("cannot rename snapshot %s to %s: %s", snapshotTempPath, snapshotPath, renameErr) + } + tst.fileSystem.SyncPath(tst.root) } -func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { +func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) { snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) data, err := tst.fileSystem.Read(snapshotPath) if err != nil { - logger.Panicf("cannot read %s: %s", snapshotPath, err) + return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err) } var partNames []string if err := json.Unmarshal(data, &partNames); err != nil { - logger.Panicf("cannot parse %s: %s", snapshotPath, err) + return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err) } var result []uint64 for i := range partNames { - e, err := parseEpoch(partNames[i]) - if err != nil { - logger.Panicf("cannot parse %s: %s", partNames[i], err) + e, parseErr := parseEpoch(partNames[i]) + if parseErr != nil { + return nil, fmt.Errorf("cannot parse %s: %w", partNames[i], parseErr) } result = append(result, e) } - return result + return result, nil +} + +func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { + parts, err := tst.readSnapshot(snapshot) + if err != nil { + logger.Panicf("%s", err) + } + return parts } // initTSTable initializes a tsTable and loads parts/snapshots, but does not start any background loops. @@ -273,9 +295,20 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, sort.Slice(loadedSnapshots, func(i, j int) bool { return loadedSnapshots[i] > loadedSnapshots[j] }) - epoch := loadedSnapshots[0] - tst.loadSnapshot(epoch, loadedParts) - return &tst, epoch, nil + var failedSnapshots []uint64 + for _, epoch := range loadedSnapshots { + loadErr := tst.loadSnapshot(epoch, loadedParts) + if loadErr == nil { + return &tst, epoch, nil + } + tst.l.Warn().Err(loadErr).Uint64("epoch", epoch).Msg("cannot load snapshot, trying next older") + failedSnapshots = append(failedSnapshots, epoch) + } + for _, id := range failedSnapshots { + tst.l.Info().Str("path", filepath.Join(rootPath, snapshotName(id))).Msg("delete unreadable snapshot file") + fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) + } + return &tst, uint64(time.Now().UnixNano()), nil } func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, diff --git a/banyand/trace/snapshot_tstable_test.go b/banyand/trace/snapshot_tstable_test.go new file mode 100644 index 000000000..a0caccc82 --- /dev/null +++ b/banyand/trace/snapshot_tstable_test.go @@ -0,0 +1,224 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package trace + +import ( + "encoding/json" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +func traceSnapshotOption() option { + return option{ + flushTimeout: 0, + mergePolicy: newDefaultMergePolicyForTesting(), + protector: protector.Nop{}, + } +} + +func TestTraceMustWriteSnapshotUsesTempThenRename(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + const epoch = uint64(0x10) + partNames := []string{partName(0x1)} + tst.mustWriteSnapshot(epoch, partNames) + snpPath := filepath.Join(tabDir, snapshotName(epoch)) + tmpPathFile := snpPath + ".tmp" + require.False(t, fileSystem.IsExist(tmpPathFile), "temp file must be removed after rename") + require.True(t, fileSystem.IsExist(snpPath), "final snapshot file must exist") + data, readErr := fileSystem.Read(snpPath) + require.NoError(t, readErr) + var decoded []string + require.NoError(t, json.Unmarshal(data, &decoded)) + require.Equal(t, partNames, decoded) +} + +func TestTraceMustWriteSnapshotWithMultipleParts(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + partNames := []string{partName(0x1), partName(0x2), partName(0xff)} + tst.mustWriteSnapshot(1, partNames) + parts, readErr := tst.readSnapshot(1) + require.NoError(t, readErr) + require.Equal(t, []uint64{0x1, 0x2, 0xff}, parts) +} + +func TestTraceReadSnapshotReturnsErrorOnEmptyFile(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) + require.Contains(t, readErr.Error(), "cannot parse") +} + +func TestTraceReadSnapshotReturnsErrorOnInvalidJSON(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte("{invalid}"), snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) + require.Contains(t, readErr.Error(), "cannot parse") +} + +func TestTraceReadSnapshotReturnsErrorOnInvalidPartName(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + partNamesData, marshalErr := json.Marshal([]string{"not-hex"}) + require.NoError(t, marshalErr) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write(partNamesData, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + _, readErr := tst.readSnapshot(1) + require.Error(t, readErr) +} + +func TestTraceReadSnapshotSucceedsOnValidFile(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + tst.mustWriteSnapshot(1, []string{partName(0xa), partName(0xb)}) + parts, readErr := tst.readSnapshot(1) + require.NoError(t, readErr) + require.Equal(t, []uint64{0xa, 0xb}, parts) +} + +func TestTraceMustReadSnapshotPanicsOnError(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst := &tsTable{fileSystem: fileSystem, root: tabDir} + require.Panics(t, func() { _ = tst.mustReadSnapshot(1) }) +} + +func TestTraceInitTSTableEmptyDirectory(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), traceSnapshotOption(), nil) + require.NotNil(t, tst) + require.Greater(t, epoch, uint64(0)) + require.Nil(t, tst.snapshot) +} + +func TestTraceInitTSTableEmptyTableWhenAllSnapshotsCorrupt(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + snpPath := filepath.Join(tabDir, snapshotName(1)) + _, writeErr := fileSystem.Write([]byte{}, snpPath, 0o600) + require.NoError(t, writeErr) + tst, epoch := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), traceSnapshotOption(), nil) + require.NotNil(t, tst) + require.Greater(t, epoch, uint64(0)) + require.Nil(t, tst.snapshot) + require.False(t, fileSystem.IsExist(snpPath), "corrupt snapshot file must be deleted") +} + +func TestTraceTolerantLoaderFallbackToOlderSnapshot(t *testing.T) { + fileSystem := fs.NewLocalFileSystem() + tmpPath, deferFn := test.Space(require.New(t)) + defer deferFn() + tabDir := filepath.Join(tmpPath, "tab") + fileSystem.MkdirPanicIfExist(tabDir, 0o755) + tst, err := newTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), + timestamp.TimeRange{}, traceSnapshotOption(), nil) + require.NoError(t, err) + tst.mustAddTraces(tsTS1, nil) + time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + dd := fileSystem.ReadDir(tabDir) + for _, d := range dd { + if d.IsDir() && d.Name() != sidxDirName && d.Name() != storage.FailedPartsDirName { + return true + } + } + return false + }, flags.EventuallyTimeout, time.Millisecond, "wait for part") + tst.Close() + snapshots := make([]uint64, 0) + for _, e := range fileSystem.ReadDir(tabDir) { + if filepath.Ext(e.Name()) == snapshotSuffix { + parsed, parseErr := parseSnapshot(e.Name()) + if parseErr == nil { + snapshots = append(snapshots, parsed) + } + } + } + require.GreaterOrEqual(t, len(snapshots), 1) + sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] > snapshots[j] }) + validEpoch := snapshots[0] + corruptEpoch := validEpoch + 1 + corruptPath := filepath.Join(tabDir, snapshotName(corruptEpoch)) + _, writeErr := fileSystem.Write([]byte{}, corruptPath, 0o600) + require.NoError(t, writeErr) + tst2, epoch2 := initTSTable(fileSystem, tabDir, common.Position{}, logger.GetLogger("test"), traceSnapshotOption(), nil) + require.NotNil(t, tst2) + require.Equal(t, validEpoch, epoch2, "should load older valid snapshot when newest is corrupt") + require.NotNil(t, tst2.snapshot) + require.Equal(t, validEpoch, tst2.snapshot.epoch) +} diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go index cc47bae77..0fce82c54 100644 --- a/banyand/trace/tstable.go +++ b/banyand/trace/tstable.go @@ -69,8 +69,11 @@ type tsTable struct { shardID common.ShardID } -func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { - parts := tst.mustReadSnapshot(epoch) +func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) error { + parts, err := tst.readSnapshot(epoch) + if err != nil { + return err + } snp := snapshot{ epoch: epoch, } @@ -87,9 +90,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { tst.gc.removePart(id) continue } - err := validatePartMetadata(tst.fileSystem, partPath(tst.root, id)) - if err != nil { - tst.l.Info().Err(err).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it") + validateErr := validatePartMetadata(tst.fileSystem, partPath(tst.root, id)) + if validateErr != nil { + tst.l.Info().Err(validateErr).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it") tst.gc.removePart(id) needToPersist = true continue @@ -104,7 +107,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { tst.gc.registerSnapshot(&snp) tst.gc.clean() if len(snp.parts) < 1 { - return + return nil } snp.incRef() tst.snapshot = &snp @@ -112,6 +115,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { if needToPersist { tst.persistSnapshot(&snp) } + return nil } func (tst *tsTable) startLoop(cur uint64) { @@ -153,38 +157,56 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) { logger.Panicf("cannot marshal partNames to JSON: %s", err) } snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) - lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm) + snapshotTempPath := snapshotPath + ".tmp" + lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, storage.FilePerm) if err != nil { - logger.Panicf("cannot create lock file %s: %s", snapshotPath, err) + logger.Panicf("cannot create lock file %s: %s", snapshotTempPath, err) } n, err := lf.Write(data) if err != nil { - logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err) + _ = lf.Close() + logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, err) } if n != len(data) { - logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotPath, n, len(data)) + _ = lf.Close() + logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotTempPath, n, len(data)) } + if closeErr := lf.Close(); closeErr != nil { + logger.Panicf("cannot close snapshot temp file %s: %s", snapshotTempPath, closeErr) + } + if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); renameErr != nil { + logger.Panicf("cannot rename snapshot %s to %s: %s", snapshotTempPath, snapshotPath, renameErr) + } + tst.fileSystem.SyncPath(tst.root) } -func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { +func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) { snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) data, err := tst.fileSystem.Read(snapshotPath) if err != nil { - logger.Panicf("cannot read %s: %s", snapshotPath, err) + return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err) } var partNames []string if err := json.Unmarshal(data, &partNames); err != nil { - logger.Panicf("cannot parse %s: %s", snapshotPath, err) + return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err) } var result []uint64 for i := range partNames { - e, err := parseEpoch(partNames[i]) - if err != nil { - logger.Panicf("cannot parse %s: %s", partNames[i], err) + e, parseErr := parseEpoch(partNames[i]) + if parseErr != nil { + return nil, fmt.Errorf("cannot parse %s: %w", partNames[i], parseErr) } result = append(result, e) } - return result + return result, nil +} + +func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { + parts, err := tst.readSnapshot(snapshot) + if err != nil { + logger.Panicf("%s", err) + } + return parts } // initTSTable initializes a tsTable and loads parts/snapshots, but does not start any background loops. @@ -262,9 +284,20 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, sort.Slice(loadedSnapshots, func(i, j int) bool { return loadedSnapshots[i] > loadedSnapshots[j] }) - epoch := loadedSnapshots[0] - tst.loadSnapshot(epoch, loadedParts) - return &tst, epoch + var failedSnapshots []uint64 + for _, epoch := range loadedSnapshots { + loadErr := tst.loadSnapshot(epoch, loadedParts) + if loadErr == nil { + return &tst, epoch + } + tst.l.Warn().Err(loadErr).Uint64("epoch", epoch).Msg("cannot load snapshot, trying next older") + failedSnapshots = append(failedSnapshots, epoch) + } + for _, id := range failedSnapshots { + tst.l.Info().Str("path", filepath.Join(rootPath, snapshotName(id))).Msg("delete unreadable snapshot file") + fileSystem.MustRMAll(filepath.Join(rootPath, snapshotName(id))) + } + return &tst, uint64(time.Now().UnixNano()) } func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go index 9bd585e89..2254fb29b 100644 --- a/pkg/fs/file_system.go +++ b/pkg/fs/file_system.go @@ -114,6 +114,8 @@ type FileSystem interface { Read(name string) ([]byte, error) // Delete the file. DeleteFile(name string) error + // Rename renames oldPath to newPath atomically. + Rename(oldPath, newPath string) error // Delete the directory. MustRMAll(path string) // SyncPath the directory of file. diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go index f025653f5..aa654050c 100644 --- a/pkg/fs/local_file_system.go +++ b/pkg/fs/local_file_system.go @@ -256,6 +256,29 @@ func (fs *localFileSystem) Read(name string) ([]byte, error) { } } +// Rename renames oldPath to newPath atomically. +func (fs *localFileSystem) Rename(oldPath, newPath string) error { + if err := os.Rename(oldPath, newPath); err != nil { + if os.IsNotExist(err) { + return &FileSystemError{ + Code: IsNotExistError, + Message: fmt.Sprintf("Rename failed, file does not exist, old: %s, new: %s, error: %s", oldPath, newPath, err), + } + } + if os.IsPermission(err) { + return &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("Rename failed, permission denied, old: %s, new: %s, error: %s", oldPath, newPath, err), + } + } + return &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Rename failed, old: %s, new: %s, error: %s", oldPath, newPath, err), + } + } + return nil +} + // DeleteFile is used to delete the file. func (fs *localFileSystem) DeleteFile(name string) error { err := os.Remove(name)
