This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug-empty-segment in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit d7a49318cc566c55e3f59cac462398d5c1467f2c Author: Hongtao Gao <[email protected]> AuthorDate: Thu Apr 9 07:46:51 2026 +0000 fix(storage): prevent epoch segment creation from zero timestamps Zero-value MinTimestamp in distributed sync paths caused seg-19700101 directories to be created with valid metadata, persisting across restarts and corrupting retention logic. Defense-in-depth fix: - Reject timestamps before year 2000 in segmentController.create() - Clean up existing epoch segments on TSDB open - Validate MinTimestamp > 0 at all sync entry points (trace/measure/stream) - Guard Tick() against zero timestamps in rotation Closes: apache/skywalking#13792 --- banyand/internal/storage/rotation.go | 3 ++ banyand/internal/storage/rotation_test.go | 11 ++--- banyand/internal/storage/segment.go | 29 +++++++---- banyand/internal/storage/tsdb_test.go | 80 +++++++++++++++++++++++-------- banyand/measure/write_data.go | 6 +++ banyand/measure/write_index.go | 5 ++ banyand/stream/write_data.go | 9 ++++ banyand/stream/write_index.go | 9 ++++ banyand/trace/write_data.go | 6 +++ banyand/trace/write_index.go | 5 ++ 10 files changed, 125 insertions(+), 38 deletions(-) diff --git a/banyand/internal/storage/rotation.go b/banyand/internal/storage/rotation.go index fd30f3edd..3923cc99c 100644 --- a/banyand/internal/storage/rotation.go +++ b/banyand/internal/storage/rotation.go @@ -32,6 +32,9 @@ var ( ) func (d *database[T, O]) Tick(ts int64) { + if ts <= 0 { + return + } if (ts - timeEventSnapDuration) < d.latestTickTime.Load() { return } diff --git a/banyand/internal/storage/rotation_test.go b/banyand/internal/storage/rotation_test.go index b9251a357..4acb789e8 100644 --- a/banyand/internal/storage/rotation_test.go +++ b/banyand/internal/storage/rotation_test.go @@ -208,25 +208,20 @@ var MockTSTableCreator = func(_ fs.FileSystem, _ string, _ common.Position, return &MockTSTable{}, nil } -type SnapshotMockTSTable struct { - timeRange timestamp.TimeRange -} +type SnapshotMockTSTable struct{} func (m *SnapshotMockTSTable) Close() error { return nil } func (m *SnapshotMockTSTable) Collect(_ Metrics) {} func (m *SnapshotMockTSTable) TakeFileSnapshot(_ string) (bool, error) { - if m.timeRange.Start.Equal(time.Unix(0, 0)) { - return false, ErrNoCurrentSnapshot - } return true, nil } var SnapshotMockTSTableCreator = func(_ fs.FileSystem, _ string, _ common.Position, - _ *logger.Logger, timeRange timestamp.TimeRange, _, _ any, + _ *logger.Logger, _ timestamp.TimeRange, _, _ any, ) (*SnapshotMockTSTable, error) { - return &SnapshotMockTSTable{timeRange: timeRange}, nil + return &SnapshotMockTSTable{}, nil } type MockMetrics struct{} diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 3c88e7d6f..8818d8dd0 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -40,8 +40,12 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -// ErrSegmentClosed is returned when trying to access a closed segment. -var ErrSegmentClosed = errors.New("segment closed") +var ( + // ErrSegmentClosed is returned when trying to access a closed segment. + ErrSegmentClosed = errors.New("segment closed") + // ErrInvalidSegmentTimestamp is returned when a segment timestamp is clearly invalid (e.g., epoch). + ErrInvalidSegmentTimestamp = errors.New("invalid segment timestamp: before year 2000") +) var _ Cache = (*segmentCache)(nil) @@ -477,21 +481,25 @@ func (sc *segmentController[T, O]) parse(value string) (time.Time, error) { func (sc *segmentController[T, O]) open() error { sc.Lock() defer sc.Unlock() - emptySegments := make([]string, 0) + invalidSegments := make([]string, 0) err := loadSegments(sc.location, segPathPrefix, sc, sc.getOptions().SegmentInterval, func(start, end time.Time) error { suffix := sc.format(start) segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, suffix)) + if start.Year() < 2000 { + invalidSegments = append(invalidSegments, segmentPath) + return nil + } metadataPath := path.Join(segmentPath, metadataFilename) rawMeta, readErr := sc.lfs.Read(metadataPath) if readErr != nil { if errors.Is(readErr, fs.ErrNotExist) { - emptySegments = append(emptySegments, segmentPath) + invalidSegments = append(invalidSegments, segmentPath) return nil } return readErr } if len(rawMeta) == 0 { - emptySegments = append(emptySegments, segmentPath) + invalidSegments = append(invalidSegments, segmentPath) return nil } meta, parseErr := readSegmentMeta(rawMeta) @@ -509,16 +517,19 @@ func (sc *segmentController[T, O]) open() error { _, loadErr := sc.load(start, segmentEnd, sc.location) return loadErr }) - if len(emptySegments) > 0 { - sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments found, removing them.") - for i := range emptySegments { - sc.lfs.MustRMAll(emptySegments[i]) + if len(invalidSegments) > 0 { + sc.l.Warn().Strs("segments", invalidSegments).Msg("invalid segments found (empty or epoch-dated), removing them") + for i := range invalidSegments { + sc.lfs.MustRMAll(invalidSegments[i]) } } return err } func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], error) { + if start.Year() < 2000 { + return nil, ErrInvalidSegmentTimestamp + } sc.Lock() defer sc.Unlock() last := len(sc.lst) - 1 diff --git a/banyand/internal/storage/tsdb_test.go b/banyand/internal/storage/tsdb_test.go index 08b285d7a..ac8089ba7 100644 --- a/banyand/internal/storage/tsdb_test.go +++ b/banyand/internal/storage/tsdb_test.go @@ -27,6 +27,7 @@ import ( "go.uber.org/mock/gomock" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/test" @@ -309,12 +310,10 @@ func TestTakeFileSnapshot(t *testing.T) { require.NoError(t, tsdb.Close()) }) - t.Run("Take snapshot skips shard with no current snapshot", func(t *testing.T) { + t.Run("Epoch timestamp segment creation is rejected", func(t *testing.T) { dir, defFn := test.Space(require.New(t)) defer defFn() - snapshotDir := filepath.Join(dir, "snapshot") - opts := TSDBOpts[*SnapshotMockTSTable, any]{ Location: dir, SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, @@ -335,29 +334,68 @@ func TestTakeFileSnapshot(t *testing.T) { require.NoError(t, err) defer tsdb.Close() - normalSeg, err := tsdb.CreateSegmentIfNotExist(ts) - require.NoError(t, err) - normalSegLocation := normalSeg.(*segment[*SnapshotMockTSTable, any]).location + normalSeg, segErr := tsdb.CreateSegmentIfNotExist(ts) + require.NoError(t, segErr) normalSeg.DecRef() - epochSeg, err := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0)) - require.NoError(t, err) - epochSegLocation := epochSeg.(*segment[*SnapshotMockTSTable, any]).location - epochSeg.DecRef() + _, epochErr := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0)) + require.ErrorIs(t, epochErr, ErrInvalidSegmentTimestamp, + "creating a segment with epoch timestamp should be rejected") + }) +} - created, snapshotErr := tsdb.TakeFileSnapshot(snapshotDir) - require.NoError(t, snapshotErr, - "snapshot should not fail due to empty shard in epoch segment") - require.True(t, created) +func TestEpochSegmentCleanupOnOpen(t *testing.T) { + logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + }) - normalSegDir := filepath.Join(snapshotDir, filepath.Base(normalSegLocation)) - require.DirExists(t, normalSegDir, - "normal segment should be present in snapshot") + dir, defFn := test.Space(require.New(t)) + defer defFn() - epochSegDir := filepath.Join(snapshotDir, filepath.Base(epochSegLocation)) - require.DirExists(t, epochSegDir, - "epoch segment directory should still be created") - }) + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 7}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + // Create a TSDB with a valid segment + sc := NewServiceCache() + tsdb1, err := OpenTSDB(ctx, opts, sc, group) + require.NoError(t, err) + + seg, segErr := tsdb1.CreateSegmentIfNotExist(ts) + require.NoError(t, segErr) + seg.DecRef() + require.NoError(t, tsdb1.Close()) + + // Manually create a fake epoch segment directory with metadata to simulate the bug + epochSegDir := filepath.Join(dir, "seg-19700101") + lfs := fs.NewLocalFileSystem() + lfs.MkdirIfNotExist(epochSegDir, DirPerm) + metaPath := filepath.Join(epochSegDir, metadataFilename) + lf, lockErr := lfs.CreateLockFile(metaPath, FilePerm) + require.NoError(t, lockErr) + _, writeErr := lf.Write([]byte(`{"version":"v1","end_time":"1970-01-02T00:00:00Z"}`)) + require.NoError(t, writeErr) + + require.DirExists(t, epochSegDir, "epoch segment should exist before reopening") + + // Reopen TSDB - it should clean up the epoch segment + tsdb2, err := OpenTSDB(ctx, opts, sc, group) + require.NoError(t, err) + defer tsdb2.Close() + + require.NoDirExists(t, epochSegDir, "epoch segment should be removed on open") } func TestTSDBCollect(t *testing.T) { diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go index ef4fe160a..cc5b855b7 100644 --- a/banyand/measure/write_data.go +++ b/banyand/measure/write_data.go @@ -81,6 +81,9 @@ func (s *syncCallback) CheckHealth() *common.Error { // CreatePartHandler implements queue.ChunkedSyncHandler. func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + if ctx.MinTimestamp <= 0 { + return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync context for group %s", ctx.MinTimestamp, ctx.Group) + } tsdb, err := s.schemaRepo.loadTSDB(ctx.Group) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") @@ -236,6 +239,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error { // CreatePartHandler implements queue.ChunkedSyncHandler for series index synchronization. func (s *syncSeriesCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + if ctx.MinTimestamp <= 0 { + return nil, fmt.Errorf("invalid MinTimestamp %d in series sync context for group %s", ctx.MinTimestamp, ctx.Group) + } tsdb, err := s.schemaRepo.loadTSDB(ctx.Group) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") diff --git a/banyand/measure/write_index.go b/banyand/measure/write_index.go index b7146074d..7babb7ba9 100644 --- a/banyand/measure/write_index.go +++ b/banyand/measure/write_index.go @@ -82,6 +82,11 @@ func (i *indexCallback) Rev(_ context.Context, message bus.Message) (resp bus.Me return } + if timestamp <= 0 { + i.l.Error().Int64("timestamp", timestamp).Str("group", group).Msg("invalid timestamp in index insert message, skipping") + return + } + // Get TSDB by group name using schemaRepo tsdb, err := i.schemaRepo.loadTSDB(group) if err != nil { diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go index 93c5dfe08..1eeaadded 100644 --- a/banyand/stream/write_data.go +++ b/banyand/stream/write_data.go @@ -80,6 +80,9 @@ func (s *syncCallback) CheckHealth() *common.Error { // CreatePartHandler implements queue.ChunkedSyncHandler. func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + if ctx.MinTimestamp <= 0 { + return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync context for group %s", ctx.MinTimestamp, ctx.Group) + } tsdb, err := s.schemaRepo.loadTSDB(ctx.Group) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") @@ -201,6 +204,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error { // CreatePartHandler implements queue.ChunkedSyncHandler for series index synchronization. func (s *syncSeriesCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + if ctx.MinTimestamp <= 0 { + return nil, fmt.Errorf("invalid MinTimestamp %d in series sync context for group %s", ctx.MinTimestamp, ctx.Group) + } tsdb, err := s.schemaRepo.loadTSDB(ctx.Group) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") @@ -306,6 +312,9 @@ func (s *syncElementIndexCallback) CheckHealth() *common.Error { // CreatePartHandler implements queue.ChunkedSyncHandler for element index synchronization. func (s *syncElementIndexCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + if ctx.MinTimestamp <= 0 { + return nil, fmt.Errorf("invalid MinTimestamp %d in element index sync context for group %s", ctx.MinTimestamp, ctx.Group) + } tsdb, err := s.schemaRepo.loadTSDB(ctx.Group) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") diff --git a/banyand/stream/write_index.go b/banyand/stream/write_index.go index 0d4e3200a..e39dfd6a6 100644 --- a/banyand/stream/write_index.go +++ b/banyand/stream/write_index.go @@ -79,6 +79,11 @@ func (s *seriesIndexCallback) Rev(_ context.Context, message bus.Message) (resp return } + if timestamp <= 0 { + s.l.Error().Int64("timestamp", timestamp).Str("group", group).Msg("invalid timestamp in series index message, skipping") + return + } + // Get TSDB by group name using schemaRepo tsdb, err := s.schemaRepo.loadTSDB(group) if err != nil { @@ -170,6 +175,10 @@ func (l *localIndexCallback) Rev(_ context.Context, message bus.Message) (resp b } // Use the first document's timestamp to get the segment + if documents[0].Timestamp <= 0 { + l.l.Error().Int64("timestamp", documents[0].Timestamp).Str("group", group).Msg("invalid document timestamp in local index message, skipping") + return + } firstDocTime := time.Unix(0, documents[0].Timestamp) segment, err := tsdb.CreateSegmentIfNotExist(firstDocTime) if err != nil { diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go index 63c56c2df..c7290c349 100644 --- a/banyand/trace/write_data.go +++ b/banyand/trace/write_data.go @@ -171,6 +171,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error { } func (s *syncSeriesCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + if ctx.MinTimestamp <= 0 { + return nil, fmt.Errorf("invalid MinTimestamp %d in series sync context for group %s", ctx.MinTimestamp, ctx.Group) + } tsdb, err := s.schemaRepo.loadTSDB(ctx.Group) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") @@ -242,6 +245,9 @@ func (s *syncChunkCallback) CheckHealth() *common.Error { // CreatePartHandler implements queue.ChunkedSyncHandler. func (s *syncChunkCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + if ctx.MinTimestamp <= 0 { + return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync context for group %s", ctx.MinTimestamp, ctx.Group) + } tsdb, err := s.schemaRepo.loadTSDB(ctx.Group) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") diff --git a/banyand/trace/write_index.go b/banyand/trace/write_index.go index c5235ed23..c032f79b3 100644 --- a/banyand/trace/write_index.go +++ b/banyand/trace/write_index.go @@ -79,6 +79,11 @@ func (s *seriesIndexCallback) Rev(_ context.Context, message bus.Message) (resp return } + if timestamp <= 0 { + s.l.Error().Int64("timestamp", timestamp).Str("group", group).Msg("invalid timestamp in series index message, skipping") + return + } + // Get TSDB by group name using schemaRepo tsdb, err := s.schemaRepo.loadTSDB(group) if err != nil {
