This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/segmeta in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a9f0e4167691636b6dddb74ede878633a6587cbb Author: Hongtao Gao <[email protected]> AuthorDate: Tue Apr 7 01:02:11 2026 +0000 feat(storage): read persisted endTime from segment metadata on load --- banyand/internal/storage/segment.go | 28 ++++-- banyand/internal/storage/segment_test.go | 151 +++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+), 10 deletions(-) diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 4de7ab50b..3c88e7d6f 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -34,7 +34,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - "github.com/apache/skywalking-banyandb/pkg/convert" banyanfs "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -483,23 +482,32 @@ func (sc *segmentController[T, O]) open() error { suffix := sc.format(start) segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, suffix)) metadataPath := path.Join(segmentPath, metadataFilename) - version, err := sc.lfs.Read(metadataPath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { + rawMeta, readErr := sc.lfs.Read(metadataPath) + if readErr != nil { + if errors.Is(readErr, fs.ErrNotExist) { emptySegments = append(emptySegments, segmentPath) return nil } - return err + return readErr } - if len(version) == 0 { + if len(rawMeta) == 0 { emptySegments = append(emptySegments, segmentPath) return nil } - if err = checkVersion(convert.BytesToString(version)); err != nil { - return err + meta, parseErr := readSegmentMeta(rawMeta) + if parseErr != nil { + return parseErr } - _, err = sc.load(start, end, sc.location) - return err + segmentEnd := end + if meta.EndTime != "" { + parsedEnd, timeErr := time.Parse(time.RFC3339Nano, meta.EndTime) + if timeErr != nil { + return timeErr + } + segmentEnd = parsedEnd + } + _, loadErr := sc.load(start, segmentEnd, sc.location) + return loadErr }) if len(emptySegments) > 0 { sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments found, removing them.") diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go index 85f28c259..12d3dbd50 100644 --- a/banyand/internal/storage/segment_test.go +++ b/banyand/internal/storage/segment_test.go @@ -19,6 +19,7 @@ package storage import ( "context" + "encoding/json" "fmt" "os" "path/filepath" @@ -707,3 +708,153 @@ func TestCreateSegmentWritesJSONMetadata(t *testing.T) { seg.DecRef() } + +func TestOpenReadsPersistedEndTime(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-open-endtime") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + group := "test-group" + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 1, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 30}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, + nil, + 5*time.Minute, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, + group, + ) + + now := time.Now().UTC() + day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + day2 := day1.Add(24 * time.Hour) + + // Manually create segment directories with JSON metadata. + // day1 with a specific endTime that differs from the default NextTime. + customEnd := day1.Add(12 * time.Hour) + segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath1, DirPerm)) + meta1 := segmentMeta{Version: currentVersion, EndTime: customEnd.Format(time.RFC3339Nano)} + meta1Data, _ := json.Marshal(meta1) + require.NoError(t, os.WriteFile(filepath.Join(segPath1, metadataFilename), meta1Data, FilePerm)) + + // day2 with standard endTime. + segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath2, DirPerm)) + meta2 := segmentMeta{Version: currentVersion, EndTime: day2.Add(24 * time.Hour).Format(time.RFC3339Nano)} + meta2Data, _ := json.Marshal(meta2) + require.NoError(t, os.WriteFile(filepath.Join(segPath2, metadataFilename), meta2Data, FilePerm)) + + // Open the controller (loads segments from disk). + openErr := sc.open() + require.NoError(t, openErr) + + // Verify segments loaded correctly. + require.Len(t, sc.lst, 2) + + // First segment should have the custom endTime from metadata. + assert.Equal(t, customEnd, sc.lst[0].End, "segment 0 should use persisted endTime") + // Second segment should have its endTime from metadata. + assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "segment 1 should use persisted endTime") + + for _, seg := range sc.lst { + seg.DecRef() + } +} + +func TestOpenFallbackOldFormatMetadata(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-open-fallback") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + group := "test-group" + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 1, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 30}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, + nil, + 5*time.Minute, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, + group, + ) + + now := time.Now().UTC() + day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + day2 := day1.Add(24 * time.Hour) + + // Create segments with OLD format metadata (plain version string). + segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath1, DirPerm)) + require.NoError(t, os.WriteFile(filepath.Join(segPath1, metadataFilename), []byte("1.4.0"), FilePerm)) + + segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath2, DirPerm)) + require.NoError(t, os.WriteFile(filepath.Join(segPath2, metadataFilename), []byte("1.4.0"), FilePerm)) + + // Open should succeed with fallback end time computation. + openErr := sc.open() + require.NoError(t, openErr) + + require.Len(t, sc.lst, 2) + + // First segment's end should be day2's start (fallback: end = next segment's start). + assert.Equal(t, day2.Local(), sc.lst[0].End, "old format should use fallback end time") + // Second segment's end should be day2 + 24h (fallback: end = NextTime(start)). + assert.Equal(t, day2.Add(24*time.Hour).Local(), sc.lst[1].End, "last segment should use NextTime fallback") + + for _, seg := range sc.lst { + seg.DecRef() + } +}
