This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/segmeta
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a9f0e4167691636b6dddb74ede878633a6587cbb
Author: Hongtao Gao <[email protected]>
AuthorDate: Tue Apr 7 01:02:11 2026 +0000

    feat(storage): read persisted endTime from segment metadata on load
---
 banyand/internal/storage/segment.go      |  28 ++++--
 banyand/internal/storage/segment_test.go | 151 +++++++++++++++++++++++++++++++
 2 files changed, 169 insertions(+), 10 deletions(-)

diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 4de7ab50b..3c88e7d6f 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -34,7 +34,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
        banyanfs "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -483,23 +482,32 @@ func (sc *segmentController[T, O]) open() error {
                suffix := sc.format(start)
                segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
                metadataPath := path.Join(segmentPath, metadataFilename)
-               version, err := sc.lfs.Read(metadataPath)
-               if err != nil {
-                       if errors.Is(err, fs.ErrNotExist) {
+               rawMeta, readErr := sc.lfs.Read(metadataPath)
+               if readErr != nil {
+                       if errors.Is(readErr, fs.ErrNotExist) {
                                emptySegments = append(emptySegments, 
segmentPath)
                                return nil
                        }
-                       return err
+                       return readErr
                }
-               if len(version) == 0 {
+               if len(rawMeta) == 0 {
                        emptySegments = append(emptySegments, segmentPath)
                        return nil
                }
-               if err = checkVersion(convert.BytesToString(version)); err != 
nil {
-                       return err
+               meta, parseErr := readSegmentMeta(rawMeta)
+               if parseErr != nil {
+                       return parseErr
                }
-               _, err = sc.load(start, end, sc.location)
-               return err
+               segmentEnd := end
+               if meta.EndTime != "" {
+                       parsedEnd, timeErr := time.Parse(time.RFC3339Nano, 
meta.EndTime)
+                       if timeErr != nil {
+                               return timeErr
+                       }
+                       segmentEnd = parsedEnd
+               }
+               _, loadErr := sc.load(start, segmentEnd, sc.location)
+               return loadErr
        })
        if len(emptySegments) > 0 {
                sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments 
found, removing them.")
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index 85f28c259..12d3dbd50 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -19,6 +19,7 @@ package storage
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "os"
        "path/filepath"
@@ -707,3 +708,153 @@ func TestCreateSegmentWritesJSONMetadata(t *testing.T) {
 
        seg.DecRef()
 }
+
+func TestOpenReadsPersistedEndTime(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-open-endtime")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       group := "test-group"
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 
30},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now().UTC()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       day2 := day1.Add(24 * time.Hour)
+
+       // Manually create segment directories with JSON metadata.
+       // day1 with a specific endTime that differs from the default NextTime.
+       customEnd := day1.Add(12 * time.Hour)
+       segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+       meta1 := segmentMeta{Version: currentVersion, EndTime: 
customEnd.Format(time.RFC3339Nano)}
+       meta1Data, _ := json.Marshal(meta1)
+       require.NoError(t, os.WriteFile(filepath.Join(segPath1, 
metadataFilename), meta1Data, FilePerm))
+
+       // day2 with standard endTime.
+       segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+       meta2 := segmentMeta{Version: currentVersion, EndTime: day2.Add(24 * 
time.Hour).Format(time.RFC3339Nano)}
+       meta2Data, _ := json.Marshal(meta2)
+       require.NoError(t, os.WriteFile(filepath.Join(segPath2, 
metadataFilename), meta2Data, FilePerm))
+
+       // Open the controller (loads segments from disk).
+       openErr := sc.open()
+       require.NoError(t, openErr)
+
+       // Verify segments loaded correctly.
+       require.Len(t, sc.lst, 2)
+
+       // First segment should have the custom endTime from metadata.
+       assert.Equal(t, customEnd, sc.lst[0].End, "segment 0 should use 
persisted endTime")
+       // Second segment should have its endTime from metadata.
+       assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "segment 1 
should use persisted endTime")
+
+       for _, seg := range sc.lst {
+               seg.DecRef()
+       }
+}
+
+func TestOpenFallbackOldFormatMetadata(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-open-fallback")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       group := "test-group"
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 
30},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now().UTC()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       day2 := day1.Add(24 * time.Hour)
+
+       // Create segments with OLD format metadata (plain version string).
+       segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segPath1, 
metadataFilename), []byte("1.4.0"), FilePerm))
+
+       segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segPath2, 
metadataFilename), []byte("1.4.0"), FilePerm))
+
+       // Open should succeed with fallback end time computation.
+       openErr := sc.open()
+       require.NoError(t, openErr)
+
+       require.Len(t, sc.lst, 2)
+
+       // First segment's end should be day2's start (fallback: end = next 
segment's start).
+       assert.Equal(t, day2.Local(), sc.lst[0].End, "old format should use 
fallback end time")
+       // Second segment's end should be day2 + 24h (fallback: end = 
NextTime(start)).
+       assert.Equal(t, day2.Add(24*time.Hour).Local(), sc.lst[1].End, "last 
segment should use NextTime fallback")
+
+       for _, seg := range sc.lst {
+               seg.DecRef()
+       }
+}

Reply via email to