This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 133a932ca fix(storage): prevent epoch segment creation from zero
timestamps (#1059)
133a932ca is described below
commit 133a932ca739110492078bb2d5ea4c9d06504529
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Apr 10 10:27:42 2026 +0800
fix(storage): prevent epoch segment creation from zero timestamps (#1059)
* fix(storage): prevent epoch segment creation from zero timestamps
Zero-value MinTimestamp in distributed sync paths caused seg-19700101
directories to be created with valid metadata, persisting across restarts
and corrupting retention logic.
Defense-in-depth fix:
- Reject timestamps before year 2000 in segmentController.create()
- Clean up existing epoch segments on TSDB open
- Validate MinTimestamp > 0 at all sync entry points (trace/measure/stream)
- Guard Tick() against zero timestamps in rotation
---
CHANGES.md | 1 +
banyand/internal/storage/rotation.go | 3 ++
banyand/internal/storage/rotation_test.go | 55 ++++++++++++++++++---
banyand/internal/storage/segment.go | 34 +++++++++----
banyand/internal/storage/tsdb_test.go | 81 +++++++++++++++++++++++--------
banyand/measure/write_data.go | 6 +++
banyand/measure/write_index.go | 5 ++
banyand/stream/write_data.go | 9 ++++
banyand/stream/write_index.go | 9 ++++
banyand/trace/write_data.go | 6 +++
banyand/trace/write_index.go | 5 ++
11 files changed, 176 insertions(+), 38 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3bbc2e08c..1a0c07073 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,6 +23,7 @@ Release Notes.
- Fix snapshot error when there is no data in a segment.
- ui: fix query editor refresh/reset behavior and BydbQL keyword highlighting.
- Disable the rotation task on warm and cold nodes to prevent incorrect
segment boundaries during lifecycle migration.
+- Prevent epoch-dated segment directories (seg-19700101) from being created by
zero timestamps in distributed sync paths.
## 0.10.0
diff --git a/banyand/internal/storage/rotation.go
b/banyand/internal/storage/rotation.go
index fd30f3edd..3923cc99c 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -32,6 +32,9 @@ var (
)
func (d *database[T, O]) Tick(ts int64) {
+ if ts <= 0 {
+ return
+ }
if (ts - timeEventSnapDuration) < d.latestTickTime.Load() {
return
}
diff --git a/banyand/internal/storage/rotation_test.go
b/banyand/internal/storage/rotation_test.go
index b9251a357..9577c6736 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -208,25 +208,20 @@ var MockTSTableCreator = func(_ fs.FileSystem, _ string,
_ common.Position,
return &MockTSTable{}, nil
}
-type SnapshotMockTSTable struct {
- timeRange timestamp.TimeRange
-}
+type SnapshotMockTSTable struct{}
func (m *SnapshotMockTSTable) Close() error { return nil }
func (m *SnapshotMockTSTable) Collect(_ Metrics) {}
func (m *SnapshotMockTSTable) TakeFileSnapshot(_ string) (bool, error) {
- if m.timeRange.Start.Equal(time.Unix(0, 0)) {
- return false, ErrNoCurrentSnapshot
- }
return true, nil
}
var SnapshotMockTSTableCreator = func(_ fs.FileSystem, _ string, _
common.Position,
- _ *logger.Logger, timeRange timestamp.TimeRange, _, _ any,
+ _ *logger.Logger, _ timestamp.TimeRange, _, _ any,
) (*SnapshotMockTSTable, error) {
- return &SnapshotMockTSTable{timeRange: timeRange}, nil
+ return &SnapshotMockTSTable{}, nil
}
type MockMetrics struct{}
@@ -307,3 +302,47 @@ func TestRotationDisabled(t *testing.T) {
assert.Equal(t, "2024-05-07 00:00:00",
segments[1].End.Format("2006-01-02 15:04:05"))
})
}
+
+func TestTickRejectsZeroTimestamp(t *testing.T) {
+ logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })
+
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01
00:00:00", time.Local)
+ require.NoError(t, err)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ tsdb, err := OpenTSDB(ctx, opts, NewServiceCache(), group)
+ require.NoError(t, err)
+ defer tsdb.Close()
+
+ db := tsdb.(*database[*MockTSTable, any])
+
+ // Tick with a valid timestamp to set latestTickTime
+ validTS := ts.UnixNano()
+ tsdb.Tick(validTS)
+ require.Equal(t, validTS, db.latestTickTime.Load(), "valid tick should
update latestTickTime")
+
+ // Tick with zero should be rejected and not update latestTickTime
+ tsdb.Tick(0)
+ require.Equal(t, validTS, db.latestTickTime.Load(), "Tick(0) should not
update latestTickTime")
+
+ // Tick with negative should also be rejected
+ tsdb.Tick(-1)
+ require.Equal(t, validTS, db.latestTickTime.Load(), "Tick(-1) should
not update latestTickTime")
+}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 3c88e7d6f..04a333507 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -40,8 +40,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-// ErrSegmentClosed is returned when trying to access a closed segment.
-var ErrSegmentClosed = errors.New("segment closed")
+var (
+ // ErrSegmentClosed is returned when trying to access a closed segment.
+ ErrSegmentClosed = errors.New("segment closed")
+ // ErrInvalidSegmentTimestamp is returned when a segment timestamp is
zero or near-epoch,
+ // indicating a corrupted timestamp (e.g., unset MinTimestamp flowing
through sync paths).
+ ErrInvalidSegmentTimestamp = errors.New("invalid segment timestamp:
epoch or near-epoch time is not valid for APM data")
+)
var _ Cache = (*segmentCache)(nil)
@@ -477,21 +482,27 @@ func (sc *segmentController[T, O]) parse(value string)
(time.Time, error) {
func (sc *segmentController[T, O]) open() error {
sc.Lock()
defer sc.Unlock()
- emptySegments := make([]string, 0)
+ invalidSegments := make([]string, 0)
err := loadSegments(sc.location, segPathPrefix, sc,
sc.getOptions().SegmentInterval, func(start, end time.Time) error {
suffix := sc.format(start)
segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate,
suffix))
+ // Detect epoch/near-epoch segments created by zero
MinTimestamp in sync paths.
+ // BanyanDB is an APM database -- no legitimate data predates
the year 2000.
+ if start.UnixNano() <= 0 {
+ invalidSegments = append(invalidSegments, segmentPath)
+ return nil
+ }
metadataPath := path.Join(segmentPath, metadataFilename)
rawMeta, readErr := sc.lfs.Read(metadataPath)
if readErr != nil {
if errors.Is(readErr, fs.ErrNotExist) {
- emptySegments = append(emptySegments,
segmentPath)
+ invalidSegments = append(invalidSegments,
segmentPath)
return nil
}
return readErr
}
if len(rawMeta) == 0 {
- emptySegments = append(emptySegments, segmentPath)
+ invalidSegments = append(invalidSegments, segmentPath)
return nil
}
meta, parseErr := readSegmentMeta(rawMeta)
@@ -509,16 +520,21 @@ func (sc *segmentController[T, O]) open() error {
_, loadErr := sc.load(start, segmentEnd, sc.location)
return loadErr
})
- if len(emptySegments) > 0 {
- sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments
found, removing them.")
- for i := range emptySegments {
- sc.lfs.MustRMAll(emptySegments[i])
+ if len(invalidSegments) > 0 {
+ sc.l.Warn().Strs("segments", invalidSegments).Msg("invalid
segments found (empty or epoch-dated), removing them")
+ for i := range invalidSegments {
+ sc.lfs.MustRMAll(invalidSegments[i])
}
}
return err
}
func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O],
error) {
+ // Reject epoch/near-epoch timestamps caused by zero MinTimestamp
flowing through sync paths.
+ // BanyanDB is an APM database -- no legitimate data predates the year
2000.
+ if start.UnixNano() <= 0 {
+ return nil, ErrInvalidSegmentTimestamp
+ }
sc.Lock()
defer sc.Unlock()
last := len(sc.lst) - 1
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
index 08b285d7a..cf87cb3f5 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -27,6 +27,7 @@ import (
"go.uber.org/mock/gomock"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -309,12 +310,10 @@ func TestTakeFileSnapshot(t *testing.T) {
require.NoError(t, tsdb.Close())
})
- t.Run("Take snapshot skips shard with no current snapshot", func(t
*testing.T) {
+ t.Run("Epoch timestamp segment creation is rejected", func(t
*testing.T) {
dir, defFn := test.Space(require.New(t))
defer defFn()
- snapshotDir := filepath.Join(dir, "snapshot")
-
opts := TSDBOpts[*SnapshotMockTSTable, any]{
Location: dir,
SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
@@ -335,29 +334,69 @@ func TestTakeFileSnapshot(t *testing.T) {
require.NoError(t, err)
defer tsdb.Close()
- normalSeg, err := tsdb.CreateSegmentIfNotExist(ts)
- require.NoError(t, err)
- normalSegLocation := normalSeg.(*segment[*SnapshotMockTSTable,
any]).location
+ normalSeg, segErr := tsdb.CreateSegmentIfNotExist(ts)
+ require.NoError(t, segErr)
normalSeg.DecRef()
- epochSeg, err := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0))
- require.NoError(t, err)
- epochSegLocation := epochSeg.(*segment[*SnapshotMockTSTable,
any]).location
- epochSeg.DecRef()
+ _, epochErr := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0))
+ require.ErrorIs(t, epochErr, ErrInvalidSegmentTimestamp,
+ "creating a segment with epoch timestamp should be
rejected")
+ })
+}
- created, snapshotErr := tsdb.TakeFileSnapshot(snapshotDir)
- require.NoError(t, snapshotErr,
- "snapshot should not fail due to empty shard in epoch
segment")
- require.True(t, created)
+func TestEpochSegmentCleanupOnOpen(t *testing.T) {
+ logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })
- normalSegDir := filepath.Join(snapshotDir,
filepath.Base(normalSegLocation))
- require.DirExists(t, normalSegDir,
- "normal segment should be present in snapshot")
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
- epochSegDir := filepath.Join(snapshotDir,
filepath.Base(epochSegLocation))
- require.DirExists(t, epochSegDir,
- "epoch segment directory should still be created")
- })
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01
00:00:00", time.Local)
+ require.NoError(t, err)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ // Create a TSDB with a valid segment
+ sc := NewServiceCache()
+ tsdb1, err := OpenTSDB(ctx, opts, sc, group)
+ require.NoError(t, err)
+
+ seg, segErr := tsdb1.CreateSegmentIfNotExist(ts)
+ require.NoError(t, segErr)
+ seg.DecRef()
+ require.NoError(t, tsdb1.Close())
+
+ // Manually create a fake epoch segment directory with metadata to
simulate the bug
+ epochSegDir := filepath.Join(dir, "seg-19700101")
+ lfs := fs.NewLocalFileSystem()
+ lfs.MkdirIfNotExist(epochSegDir, DirPerm)
+ metaPath := filepath.Join(epochSegDir, metadataFilename)
+ lf, lockErr := lfs.CreateLockFile(metaPath, FilePerm)
+ require.NoError(t, lockErr)
+ _, writeErr :=
lf.Write([]byte(`{"version":"v1","end_time":"1970-01-02T00:00:00Z"}`))
+ require.NoError(t, writeErr)
+ require.NoError(t, lf.Close())
+
+ require.DirExists(t, epochSegDir, "epoch segment should exist before
reopening")
+
+ // Reopen TSDB - it should clean up the epoch segment
+ tsdb2, err := OpenTSDB(ctx, opts, sc, group)
+ require.NoError(t, err)
+ defer tsdb2.Close()
+
+ require.NoDirExists(t, epochSegDir, "epoch segment should be removed on
open")
}
func TestTSDBCollect(t *testing.T) {
diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go
index ef4fe160a..cc5b855b7 100644
--- a/banyand/measure/write_data.go
+++ b/banyand/measure/write_data.go
@@ -81,6 +81,9 @@ func (s *syncCallback) CheckHealth() *common.Error {
// CreatePartHandler implements queue.ChunkedSyncHandler.
func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext)
(queue.PartHandler, error) {
+ if ctx.MinTimestamp <= 0 {
+ return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync
context for group %s", ctx.MinTimestamp, ctx.Group)
+ }
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
@@ -236,6 +239,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error {
// CreatePartHandler implements queue.ChunkedSyncHandler for series index
synchronization.
func (s *syncSeriesCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+ if ctx.MinTimestamp <= 0 {
+ return nil, fmt.Errorf("invalid MinTimestamp %d in series sync
context for group %s", ctx.MinTimestamp, ctx.Group)
+ }
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
diff --git a/banyand/measure/write_index.go b/banyand/measure/write_index.go
index b7146074d..7babb7ba9 100644
--- a/banyand/measure/write_index.go
+++ b/banyand/measure/write_index.go
@@ -82,6 +82,11 @@ func (i *indexCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
+ if timestamp <= 0 {
+ i.l.Error().Int64("timestamp", timestamp).Str("group",
group).Msg("invalid timestamp in index insert message, skipping")
+ return
+ }
+
// Get TSDB by group name using schemaRepo
tsdb, err := i.schemaRepo.loadTSDB(group)
if err != nil {
diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go
index 93c5dfe08..1eeaadded 100644
--- a/banyand/stream/write_data.go
+++ b/banyand/stream/write_data.go
@@ -80,6 +80,9 @@ func (s *syncCallback) CheckHealth() *common.Error {
// CreatePartHandler implements queue.ChunkedSyncHandler.
func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext)
(queue.PartHandler, error) {
+ if ctx.MinTimestamp <= 0 {
+ return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync
context for group %s", ctx.MinTimestamp, ctx.Group)
+ }
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
@@ -201,6 +204,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error {
// CreatePartHandler implements queue.ChunkedSyncHandler for series index
synchronization.
func (s *syncSeriesCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+ if ctx.MinTimestamp <= 0 {
+ return nil, fmt.Errorf("invalid MinTimestamp %d in series sync
context for group %s", ctx.MinTimestamp, ctx.Group)
+ }
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
@@ -306,6 +312,9 @@ func (s *syncElementIndexCallback) CheckHealth()
*common.Error {
// CreatePartHandler implements queue.ChunkedSyncHandler for element index
synchronization.
func (s *syncElementIndexCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+ if ctx.MinTimestamp <= 0 {
+ return nil, fmt.Errorf("invalid MinTimestamp %d in element
index sync context for group %s", ctx.MinTimestamp, ctx.Group)
+ }
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
diff --git a/banyand/stream/write_index.go b/banyand/stream/write_index.go
index 0d4e3200a..e39dfd6a6 100644
--- a/banyand/stream/write_index.go
+++ b/banyand/stream/write_index.go
@@ -79,6 +79,11 @@ func (s *seriesIndexCallback) Rev(_ context.Context, message
bus.Message) (resp
return
}
+ if timestamp <= 0 {
+ s.l.Error().Int64("timestamp", timestamp).Str("group",
group).Msg("invalid timestamp in series index message, skipping")
+ return
+ }
+
// Get TSDB by group name using schemaRepo
tsdb, err := s.schemaRepo.loadTSDB(group)
if err != nil {
@@ -170,6 +175,10 @@ func (l *localIndexCallback) Rev(_ context.Context,
message bus.Message) (resp b
}
// Use the first document's timestamp to get the segment
+ if documents[0].Timestamp <= 0 {
+ l.l.Error().Int64("timestamp",
documents[0].Timestamp).Str("group", group).Msg("invalid document timestamp in
local index message, skipping")
+ return
+ }
firstDocTime := time.Unix(0, documents[0].Timestamp)
segment, err := tsdb.CreateSegmentIfNotExist(firstDocTime)
if err != nil {
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 63c56c2df..c7290c349 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -171,6 +171,9 @@ func (s *syncSeriesCallback) CheckHealth() *common.Error {
}
func (s *syncSeriesCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+ if ctx.MinTimestamp <= 0 {
+ return nil, fmt.Errorf("invalid MinTimestamp %d in series sync
context for group %s", ctx.MinTimestamp, ctx.Group)
+ }
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
@@ -242,6 +245,9 @@ func (s *syncChunkCallback) CheckHealth() *common.Error {
// CreatePartHandler implements queue.ChunkedSyncHandler.
func (s *syncChunkCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+ if ctx.MinTimestamp <= 0 {
+ return nil, fmt.Errorf("invalid MinTimestamp %d in chunk sync
context for group %s", ctx.MinTimestamp, ctx.Group)
+ }
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
diff --git a/banyand/trace/write_index.go b/banyand/trace/write_index.go
index c5235ed23..c032f79b3 100644
--- a/banyand/trace/write_index.go
+++ b/banyand/trace/write_index.go
@@ -79,6 +79,11 @@ func (s *seriesIndexCallback) Rev(_ context.Context, message
bus.Message) (resp
return
}
+ if timestamp <= 0 {
+ s.l.Error().Int64("timestamp", timestamp).Str("group",
group).Msg("invalid timestamp in series index message, skipping")
+ return
+ }
+
// Get TSDB by group name using schemaRepo
tsdb, err := s.schemaRepo.loadTSDB(group)
if err != nil {