This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 133a932ca fix(storage): prevent epoch segment creation from zero 
timestamps (#1059)
133a932ca is described below

commit 133a932ca739110492078bb2d5ea4c9d06504529
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Apr 10 10:27:42 2026 +0800

    fix(storage): prevent epoch segment creation from zero timestamps (#1059)
    
    * fix(storage): prevent epoch segment creation from zero timestamps
    
    Zero-value MinTimestamp in distributed sync paths caused seg-19700101
    directories to be created with valid metadata, persisting across restarts
    and corrupting retention logic.
    
    Defense-in-depth fix:
    - Reject timestamps before year 2000 in segmentController.create()
    - Clean up existing epoch segments on TSDB open
    - Validate MinTimestamp > 0 at all sync entry points (trace/measure/stream)
    - Guard Tick() against zero timestamps in rotation
---
 CHANGES.md                                |  1 +
 banyand/internal/storage/rotation.go      |  3 ++
 banyand/internal/storage/rotation_test.go | 55 ++++++++++++++++++---
 banyand/internal/storage/segment.go       | 34 +++++++++----
 banyand/internal/storage/tsdb_test.go     | 81 +++++++++++++++++++++++--------
 banyand/measure/write_data.go             |  6 +++
 banyand/measure/write_index.go            |  5 ++
 banyand/stream/write_data.go              |  9 ++++
 banyand/stream/write_index.go             |  9 ++++
 banyand/trace/write_data.go               |  6 +++
 banyand/trace/write_index.go              |  5 ++
 11 files changed, 176 insertions(+), 38 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3bbc2e08c..1a0c07073 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,6 +23,7 @@ Release Notes.
 - Fix snapshot error when there is no data in a segment.
 - ui: fix query editor refresh/reset behavior and BydbQL keyword highlighting.
 - Disable the rotation task on warm and cold nodes to prevent incorrect 
segment boundaries during lifecycle migration.
+- Prevent epoch-dated segment directories (seg-19700101) from being created by 
zero timestamps in distributed sync paths.
 
 ## 0.10.0
 
diff --git a/banyand/internal/storage/rotation.go 
b/banyand/internal/storage/rotation.go
index fd30f3edd..3923cc99c 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -32,6 +32,9 @@ var (
 )
 
 func (d *database[T, O]) Tick(ts int64) {
+       if ts <= 0 {
+               return
+       }
        if (ts - timeEventSnapDuration) < d.latestTickTime.Load() {
                return
        }
diff --git a/banyand/internal/storage/rotation_test.go 
b/banyand/internal/storage/rotation_test.go
index b9251a357..9577c6736 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -208,25 +208,20 @@ var MockTSTableCreator = func(_ fs.FileSystem, _ string, 
_ common.Position,
        return &MockTSTable{}, nil
 }
 
-type SnapshotMockTSTable struct {
-       timeRange timestamp.TimeRange
-}
+type SnapshotMockTSTable struct{}
 
 func (m *SnapshotMockTSTable) Close() error { return nil }
 
 func (m *SnapshotMockTSTable) Collect(_ Metrics) {}
 
 func (m *SnapshotMockTSTable) TakeFileSnapshot(_ string) (bool, error) {
-       if m.timeRange.Start.Equal(time.Unix(0, 0)) {
-               return false, ErrNoCurrentSnapshot
-       }
        return true, nil
 }
 
 var SnapshotMockTSTableCreator = func(_ fs.FileSystem, _ string, _ 
common.Position,
-       _ *logger.Logger, timeRange timestamp.TimeRange, _, _ any,
+       _ *logger.Logger, _ timestamp.TimeRange, _, _ any,
 ) (*SnapshotMockTSTable, error) {
-       return &SnapshotMockTSTable{timeRange: timeRange}, nil
+       return &SnapshotMockTSTable{}, nil
 }
 
 type MockMetrics struct{}
@@ -307,3 +302,47 @@ func TestRotationDisabled(t *testing.T) {
                assert.Equal(t, "2024-05-07 00:00:00", 
segments[1].End.Format("2006-01-02 15:04:05"))
        })
 }
+
+func TestTickRejectsZeroTimestamp(t *testing.T) {
+       logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })
+
+       dir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       opts := TSDBOpts[*MockTSTable, any]{
+               Location:        dir,
+               SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+               TTL:             IntervalRule{Unit: DAY, Num: 7},
+               ShardNum:        1,
+               TSTableCreator:  MockTSTableCreator,
+       }
+
+       ctx := context.Background()
+       mc := timestamp.NewMockClock()
+       ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 
00:00:00", time.Local)
+       require.NoError(t, err)
+       mc.Set(ts)
+       ctx = timestamp.SetClock(ctx, mc)
+
+       tsdb, err := OpenTSDB(ctx, opts, NewServiceCache(), group)
+       require.NoError(t, err)
+       defer tsdb.Close()
+
+       db := tsdb.(*database[*MockTSTable, any])
+
+       // Tick with a valid timestamp to set latestTickTime
+       validTS := ts.UnixNano()
+       tsdb.Tick(validTS)
+       require.Equal(t, validTS, db.latestTickTime.Load(), "valid tick should 
update latestTickTime")
+
+       // Tick with zero should be rejected and not update latestTickTime
+       tsdb.Tick(0)
+       require.Equal(t, validTS, db.latestTickTime.Load(), "Tick(0) should not 
update latestTickTime")
+
+       // Tick with negative should also be rejected
+       tsdb.Tick(-1)
+       require.Equal(t, validTS, db.latestTickTime.Load(), "Tick(-1) should 
not update latestTickTime")
+}
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 3c88e7d6f..04a333507 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -40,8 +40,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-// ErrSegmentClosed is returned when trying to access a closed segment.
-var ErrSegmentClosed = errors.New("segment closed")
+var (
+       // ErrSegmentClosed is returned when trying to access a closed segment.
+       ErrSegmentClosed = errors.New("segment closed")
+       // ErrInvalidSegmentTimestamp is returned when a segment timestamp is 
zero or near-epoch,
+       // indicating a corrupted timestamp (e.g., unset MinTimestamp flowing 
through sync paths).
+       ErrInvalidSegmentTimestamp = errors.New("invalid segment timestamp: 
epoch or near-epoch time is not valid for APM data")
+)
 
 var _ Cache = (*segmentCache)(nil)
 
@@ -477,21 +482,27 @@ func (sc *segmentController[T, O]) parse(value string) 
(time.Time, error) {
 func (sc *segmentController[T, O]) open() error {
        sc.Lock()
        defer sc.Unlock()
-       emptySegments := make([]string, 0)
+       invalidSegments := make([]string, 0)
        err := loadSegments(sc.location, segPathPrefix, sc, 
sc.getOptions().SegmentInterval, func(start, end time.Time) error {
                suffix := sc.format(start)
                segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
+               // Detect epoch/near-epoch segments created by zero 
MinTimestamp in sync paths.
+               // BanyanDB is an APM database -- no legitimate data predates 
the year 2000.
+               if start.UnixNano() <= 0 {
+                       invalidSegments = append(invalidSegments, segmentPath)
+                       return nil
+               }
                metadataPath := path.Join(segmentPath, metadataFilename)
                rawMeta, readErr := sc.lfs.Read(metadataPath)
                if readErr != nil {
                        if errors.Is(readErr, fs.ErrNotExist) {
-                               emptySegments = append(emptySegments, 
segmentPath)
+                               invalidSegments = append(invalidSegments, 
segmentPath)
                                return nil
                        }
                        return readErr
                }
                if len(rawMeta) == 0 {
-                       emptySegments = append(emptySegments, segmentPath)
+                       invalidSegments = append(invalidSegments, segmentPath)
                        return nil
                }
                meta, parseErr := readSegmentMeta(rawMeta)
@@ -509,16 +520,21 @@ func (sc *segmentController[T, O]) open() error {
                _, loadErr := sc.load(start, segmentEnd, sc.location)
                return loadErr
        })
-       if len(emptySegments) > 0 {
-               sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments 
found, removing them.")
-               for i := range emptySegments {
-                       sc.lfs.MustRMAll(emptySegments[i])
+       if len(invalidSegments) > 0 {
+               sc.l.Warn().Strs("segments", invalidSegments).Msg("invalid 
segments found (empty or epoch-dated), removing them")
+               for i := range invalidSegments {
+                       sc.lfs.MustRMAll(invalidSegments[i])
                }
        }
        return err
 }
 
 func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], 
error) {
+       // Reject epoch/near-epoch timestamps caused by zero MinTimestamp 
flowing through sync paths.
+       // BanyanDB is an APM database -- no legitimate data predates the year 
2000.
+       if start.UnixNano() <= 0 {
+               return nil, ErrInvalidSegmentTimestamp
+       }
        sc.Lock()
        defer sc.Unlock()
        last := len(sc.lst) - 1
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index 08b285d7a..cf87cb3f5 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -27,6 +27,7 @@ import (
        "go.uber.org/mock/gomock"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -309,12 +310,10 @@ func TestTakeFileSnapshot(t *testing.T) {
                require.NoError(t, tsdb.Close())
        })
 
-       t.Run("Take snapshot skips shard with no current snapshot", func(t 
*testing.T) {
+       t.Run("Epoch timestamp segment creation is rejected", func(t 
*testing.T) {
                dir, defFn := test.Space(require.New(t))
                defer defFn()
 
-               snapshotDir := filepath.Join(dir, "snapshot")
-
                opts := TSDBOpts[*SnapshotMockTSTable, any]{
                        Location:        dir,
                        SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
@@ -335,29 +334,69 @@ func TestTakeFileSnapshot(t *testing.T) {
                require.NoError(t, err)
                defer tsdb.Close()
 
-               normalSeg, err := tsdb.CreateSegmentIfNotExist(ts)
-               require.NoError(t, err)
-               normalSegLocation := normalSeg.(*segment[*SnapshotMockTSTable, 
any]).location
+               normalSeg, segErr := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, segErr)
                normalSeg.DecRef()
 
-               epochSeg, err := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0))
-               require.NoError(t, err)
-               epochSegLocation := epochSeg.(*segment[*SnapshotMockTSTable, 
any]).location
-               epochSeg.DecRef()
+               _, epochErr := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0))
+               require.ErrorIs(t, epochErr, ErrInvalidSegmentTimestamp,
+                       "creating a segment with epoch timestamp should be 
rejected")
+       })
+}
 
-               created, snapshotErr := tsdb.TakeFileSnapshot(snapshotDir)
-               require.NoError(t, snapshotErr,
-                       "snapshot should not fail due to empty shard in epoch 
segment")
-               require.True(t, created)
+func TestEpochSegmentCleanupOnOpen(t *testing.T) {
+       logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })
 
-               normalSegDir := filepath.Join(snapshotDir, 
filepath.Base(normalSegLocation))
-               require.DirExists(t, normalSegDir,
-                       "normal segment should be present in snapshot")
+       dir, defFn := test.Space(require.New(t))
+       defer defFn()
 
-               epochSegDir := filepath.Join(snapshotDir, 
filepath.Base(epochSegLocation))
-               require.DirExists(t, epochSegDir,
-                       "epoch segment directory should still be created")
-       })
+       opts := TSDBOpts[*MockTSTable, any]{
+               Location:        dir,
+               SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+               TTL:             IntervalRule{Unit: DAY, Num: 7},
+               ShardNum:        1,
+               TSTableCreator:  MockTSTableCreator,
+       }
+
+       ctx := context.Background()
+       mc := timestamp.NewMockClock()
+       ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 
00:00:00", time.Local)
+       require.NoError(t, err)
+       mc.Set(ts)
+       ctx = timestamp.SetClock(ctx, mc)
+
+       // Create a TSDB with a valid segment
+       sc := NewServiceCache()
+       tsdb1, err := OpenTSDB(ctx, opts, sc, group)
+       require.NoError(t, err)
+
+       seg, segErr := tsdb1.CreateSegmentIfNotExist(ts)
+       require.NoError(t, segErr)
+       seg.DecRef()
+       require.NoError(t, tsdb1.Close())
+
+       // Manually create a fake epoch segment directory with metadata to 
simulate the bug
+       epochSegDir := filepath.Join(dir, "seg-19700101")
+       lfs := fs.NewLocalFileSystem()
+       lfs.MkdirIfNotExist(epochSegDir, DirPerm)
+       metaPath := filepath.Join(epochSegDir, metadataFilename)
+       lf, lockErr := lfs.CreateLockFile(metaPath, FilePerm)
+       require.NoError(t, lockErr)
+       _, writeErr := 
lf.Write([]byte(`{"version":"v1","end_time":"1970-01-02T00:00:00Z"}`))
+       require.NoError(t, writeErr)
+       require.NoError(t, lf.Close())
+
+       require.DirExists(t, epochSegDir, "epoch segment should exist before 
reopening")
+
+       // Reopen TSDB - it should clean up the epoch segment
+       tsdb2, err := OpenTSDB(ctx, opts, sc, group)
+       require.NoError(t, err)
+       defer tsdb2.Close()
+
+       require.NoDirExists(t, epochSegDir, "epoch segment should be removed on 
open")
 }
 
 func TestTSDBCollect(t *testing.T) {
diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go
index ef4fe160a..cc5b855b7 100644
--- a/banyand/measure/write_data.go
+++ b/banyand/measure/write_data.go
@@ -81,6 +81,9 @@ func (s *syncCallback) CheckHealth() *common.Error {
 
 // CreatePartHandler implements queue.ChunkedSyncHandler.
 func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) 
(queue.PartHandler, error) {
+       if ctx.MinTimestamp <= 0 {
+               return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync 
context for group %s", ctx.MinTimestamp, ctx.Group)
+       }
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
@@ -236,6 +239,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error {
 
 // CreatePartHandler implements queue.ChunkedSyncHandler for series index 
synchronization.
 func (s *syncSeriesCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       if ctx.MinTimestamp <= 0 {
+               return nil, fmt.Errorf("invalid MinTimestamp %d in series sync 
context for group %s", ctx.MinTimestamp, ctx.Group)
+       }
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
diff --git a/banyand/measure/write_index.go b/banyand/measure/write_index.go
index b7146074d..7babb7ba9 100644
--- a/banyand/measure/write_index.go
+++ b/banyand/measure/write_index.go
@@ -82,6 +82,11 @@ func (i *indexCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                return
        }
 
+       if timestamp <= 0 {
+               i.l.Error().Int64("timestamp", timestamp).Str("group", 
group).Msg("invalid timestamp in index insert message, skipping")
+               return
+       }
+
        // Get TSDB by group name using schemaRepo
        tsdb, err := i.schemaRepo.loadTSDB(group)
        if err != nil {
diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go
index 93c5dfe08..1eeaadded 100644
--- a/banyand/stream/write_data.go
+++ b/banyand/stream/write_data.go
@@ -80,6 +80,9 @@ func (s *syncCallback) CheckHealth() *common.Error {
 
 // CreatePartHandler implements queue.ChunkedSyncHandler.
 func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) 
(queue.PartHandler, error) {
+       if ctx.MinTimestamp <= 0 {
+               return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync 
context for group %s", ctx.MinTimestamp, ctx.Group)
+       }
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
@@ -201,6 +204,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error {
 
 // CreatePartHandler implements queue.ChunkedSyncHandler for series index 
synchronization.
 func (s *syncSeriesCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       if ctx.MinTimestamp <= 0 {
+               return nil, fmt.Errorf("invalid MinTimestamp %d in series sync 
context for group %s", ctx.MinTimestamp, ctx.Group)
+       }
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
@@ -306,6 +312,9 @@ func (s *syncElementIndexCallback) CheckHealth() 
*common.Error {
 
 // CreatePartHandler implements queue.ChunkedSyncHandler for element index 
synchronization.
 func (s *syncElementIndexCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       if ctx.MinTimestamp <= 0 {
+               return nil, fmt.Errorf("invalid MinTimestamp %d in element 
index sync context for group %s", ctx.MinTimestamp, ctx.Group)
+       }
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
diff --git a/banyand/stream/write_index.go b/banyand/stream/write_index.go
index 0d4e3200a..e39dfd6a6 100644
--- a/banyand/stream/write_index.go
+++ b/banyand/stream/write_index.go
@@ -79,6 +79,11 @@ func (s *seriesIndexCallback) Rev(_ context.Context, message 
bus.Message) (resp
                return
        }
 
+       if timestamp <= 0 {
+               s.l.Error().Int64("timestamp", timestamp).Str("group", 
group).Msg("invalid timestamp in series index message, skipping")
+               return
+       }
+
        // Get TSDB by group name using schemaRepo
        tsdb, err := s.schemaRepo.loadTSDB(group)
        if err != nil {
@@ -170,6 +175,10 @@ func (l *localIndexCallback) Rev(_ context.Context, 
message bus.Message) (resp b
        }
 
        // Use the first document's timestamp to get the segment
+       if documents[0].Timestamp <= 0 {
+               l.l.Error().Int64("timestamp", 
documents[0].Timestamp).Str("group", group).Msg("invalid document timestamp in 
local index message, skipping")
+               return
+       }
        firstDocTime := time.Unix(0, documents[0].Timestamp)
        segment, err := tsdb.CreateSegmentIfNotExist(firstDocTime)
        if err != nil {
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 63c56c2df..c7290c349 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -171,6 +171,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error {
 }
 
 func (s *syncSeriesCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       if ctx.MinTimestamp <= 0 {
+               return nil, fmt.Errorf("invalid MinTimestamp %d in series sync 
context for group %s", ctx.MinTimestamp, ctx.Group)
+       }
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
@@ -242,6 +245,9 @@ func (s *syncChunkCallback) CheckHealth() *common.Error {
 
 // CreatePartHandler implements queue.ChunkedSyncHandler.
 func (s *syncChunkCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       if ctx.MinTimestamp <= 0 {
+               return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync 
context for group %s", ctx.MinTimestamp, ctx.Group)
+       }
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
diff --git a/banyand/trace/write_index.go b/banyand/trace/write_index.go
index c5235ed23..c032f79b3 100644
--- a/banyand/trace/write_index.go
+++ b/banyand/trace/write_index.go
@@ -79,6 +79,11 @@ func (s *seriesIndexCallback) Rev(_ context.Context, message 
bus.Message) (resp
                return
        }
 
+       if timestamp <= 0 {
+               s.l.Error().Int64("timestamp", timestamp).Str("group", 
group).Msg("invalid timestamp in series index message, skipping")
+               return
+       }
+
        // Get TSDB by group name using schemaRepo
        tsdb, err := s.schemaRepo.loadTSDB(group)
        if err != nil {

Reply via email to