This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/segmeta
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c9a45dc9cd121263ead466a01a154b4448ee6715
Author: Hongtao Gao <[email protected]>
AuthorDate: Tue Apr 7 00:28:59 2026 +0000

    docs: add implementation plan for stable segment end time
---
 .../plans/2026-04-07-stable-segment-end-time.md    | 545 +++++++++++++++++++++
 1 file changed, 545 insertions(+)

diff --git a/docs/superpowers/plans/2026-04-07-stable-segment-end-time.md 
b/docs/superpowers/plans/2026-04-07-stable-segment-end-time.md
new file mode 100644
index 000000000..495886235
--- /dev/null
+++ b/docs/superpowers/plans/2026-04-07-stable-segment-end-time.md
@@ -0,0 +1,545 @@
+# Stable Segment End Time Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use 
superpowers:subagent-driven-development (recommended) or 
superpowers:executing-plans to implement this plan task-by-task. Steps use 
checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Persist segment end time in per-segment metadata so boundaries don't 
shift across restarts or config changes.
+
+**Architecture:** Extend the existing `metadata` file from a plain version 
string to JSON containing both version and end time. New `readSegmentMeta()` 
function handles both old and new formats. The write path serializes JSON; the 
read path reads it with fallback to current computation for old segments.
+
+**Tech Stack:** Go standard library (`encoding/json`, `time`), existing test 
framework (`testify`).
+
+---
+
+### Task 1: Add `segmentMeta` and `readSegmentMeta()` to version.go
+
+**Files:**
+- Modify: `banyand/internal/storage/version.go`
+- Test: `banyand/internal/storage/version_test.go` (new file)
+
+- [ ] **Step 1: Write tests for `readSegmentMeta`**
+
+Create `banyand/internal/storage/version_test.go`:
+
+```go
+package storage
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestReadSegmentMeta_NewFormat(t *testing.T) {
+       data := 
[]byte(`{"version":"1.5.0","endTime":"2026-04-07T00:00:00+08:00"}`)
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.5.0", meta.Version)
+       assert.Equal(t, "2026-04-07T00:00:00+08:00", meta.EndTime)
+}
+
+func TestReadSegmentMeta_OldFormat(t *testing.T) {
+       data := []byte("1.4.0")
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.4.0", meta.Version)
+       assert.Equal(t, "", meta.EndTime)
+}
+
+func TestReadSegmentMeta_OldFormatWithNewline(t *testing.T) {
+       data := []byte("1.4.0\n")
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.4.0", meta.Version)
+       assert.Equal(t, "", meta.EndTime)
+}
+
+func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) {
+       data := 
[]byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`)
+       _, err := readSegmentMeta(data)
+       assert.Error(t, err)
+}
+
+func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) {
+       data := []byte(`{"version":"1.5.0"}`)
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.5.0", meta.Version)
+       assert.Equal(t, "", meta.EndTime)
+}
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `go test ./banyand/internal/storage/ -run TestReadSegmentMeta -v`
+Expected: FAIL — `readSegmentMeta` undefined.
+
+- [ ] **Step 3: Add `segmentMeta` struct and `readSegmentMeta()` to 
`version.go`**
+
+Add after the `checkVersion` function in `banyand/internal/storage/version.go`:
+
+```go
+type segmentMeta struct {
+       Version string `json:"version"`
+       EndTime string `json:"endTime,omitempty"`
+}
+
+func readSegmentMeta(data []byte) (segmentMeta, error) {
+       var meta segmentMeta
+       trimmed := strings.TrimSpace(string(data))
+       if len(trimmed) > 0 && trimmed[0] == '{' {
+               if unmarshalErr := json.Unmarshal(data, &meta); unmarshalErr != 
nil {
+                       return segmentMeta{}, unmarshalErr
+               }
+       } else {
+               meta.Version = trimmed
+       }
+       if checkErr := checkVersion(meta.Version); checkErr != nil {
+               return segmentMeta{}, checkErr
+       }
+       return meta, nil
+}
+```
+
+Note: `encoding/json` and `strings` are already imported in `version.go`.
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `go test ./banyand/internal/storage/ -run TestReadSegmentMeta -v`
+Expected: PASS — all 5 tests green.
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add banyand/internal/storage/version.go 
banyand/internal/storage/version_test.go
+git commit -m "feat(storage): add segmentMeta struct and readSegmentMeta 
helper"
+```
+
+---
+
+### Task 2: Bump version to 1.5.0
+
+**Files:**
+- Modify: `banyand/internal/storage/version.go:31`
+- Modify: `banyand/internal/storage/versions.yml`
+
+- [ ] **Step 1: Update `currentVersion` in `version.go`**
+
+Change line 31 from:
+```go
+currentVersion             = "1.4.0"
+```
+to:
+```go
+currentVersion             = "1.5.0"
+```
+
+- [ ] **Step 2: Add `1.5.0` to `versions.yml`**
+
+Read the current `versions.yml` and append `"1.5.0"` to the versions list.
+
+- [ ] **Step 3: Verify existing tests still pass**
+
+Run: `go test ./banyand/internal/storage/ -run TestReadSegmentMeta -v`
+Expected: PASS — old-format test still passes because `"1.4.0"` remains in the 
compatible list.
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add banyand/internal/storage/version.go 
banyand/internal/storage/versions.yml
+git commit -m "feat(storage): bump metadata version to 1.5.0"
+```
+
+---
+
+### Task 3: Update `create()` to write JSON metadata
+
+**Files:**
+- Modify: `banyand/internal/storage/segment.go:540-544`
+
+- [ ] **Step 1: Write a test that creates a segment and verifies metadata 
format**
+
+Add to `banyand/internal/storage/segment_test.go`:
+
+```go
+func TestCreateSegmentWritesJSONMetadata(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-segment-metadata")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       group := "test-group"
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 7},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now().UTC()
+       startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+
+       seg, err := sc.create(startTime)
+       require.NoError(t, err)
+       require.NotNil(t, seg)
+
+       // Read metadata from disk and verify it's JSON with endTime
+       suffix := startTime.Format(dayFormat)
+       metadataPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix), 
metadataFilename)
+       rawMeta, readErr := os.ReadFile(metadataPath)
+       require.NoError(t, readErr)
+
+       meta, parseErr := readSegmentMeta(rawMeta)
+       require.NoError(t, parseErr)
+       assert.Equal(t, currentVersion, meta.Version)
+       assert.NotEmpty(t, meta.EndTime, "endTime should be persisted in 
metadata")
+
+       // Verify the endTime matches the segment's End
+       expectedEnd := startTime.Add(24 * time.Hour)
+       assert.Equal(t, expectedEnd.Format(time.RFC3339Nano), meta.EndTime)
+
+       seg.DecRef()
+}
+```
+
+- [ ] **Step 2: Run test to verify it fails**
+
+Run: `go test ./banyand/internal/storage/ -run 
TestCreateSegmentWritesJSONMetadata -v`
+Expected: FAIL — metadata is still plain text, `readSegmentMeta` parses it as 
old format so `EndTime` is empty.
+
+- [ ] **Step 3: Update `create()` to write JSON metadata**
+
+In `banyand/internal/storage/segment.go`, replace lines 540-544:
+
+```go
+       segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
sc.format(start)))
+       sc.lfs.MkdirPanicIfExist(segPath, DirPerm)
+       data := []byte(currentVersion)
+       metadataPath := filepath.Join(segPath, metadataFilename)
+       lf, err := sc.lfs.CreateLockFile(metadataPath, FilePerm)
+```
+
+with:
+
+```go
+       segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
sc.format(start)))
+       sc.lfs.MkdirPanicIfExist(segPath, DirPerm)
+       meta := segmentMeta{
+               Version: currentVersion,
+               EndTime: end.Format(time.RFC3339Nano),
+       }
+       data, marshalErr := json.Marshal(meta)
+       if marshalErr != nil {
+               logger.Panicf("cannot marshal segment metadata: %s", marshalErr)
+       }
+       metadataPath := filepath.Join(segPath, metadataFilename)
+       lf, err := sc.lfs.CreateLockFile(metadataPath, FilePerm)
+```
+
+Add `"encoding/json"` to the imports in `segment.go` if not already present.
+
+- [ ] **Step 4: Run test to verify it passes**
+
+Run: `go test ./banyand/internal/storage/ -run 
TestCreateSegmentWritesJSONMetadata -v`
+Expected: PASS.
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add banyand/internal/storage/segment.go 
banyand/internal/storage/segment_test.go
+git commit -m "feat(storage): write JSON metadata with endTime on segment 
creation"
+```
+
+---
+
+### Task 4: Update `open()` to read `endTime` from metadata
+
+**Files:**
+- Modify: `banyand/internal/storage/segment.go:477-510`
+
+- [ ] **Step 1: Write a test that loads segments with persisted endTime**
+
+Add to `banyand/internal/storage/segment_test.go`:
+
+```go
+func TestOpenReadsPersistedEndTime(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-open-endtime")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       group := "test-group"
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 
30},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now().UTC()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       day2 := day1.Add(24 * time.Hour)
+
+       // Manually create segment directories with JSON metadata
+       // day1 with a specific endTime that differs from the default NextTime
+       customEnd := day1.Add(12 * time.Hour) // 12 hours instead of 24
+       segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+       meta1 := segmentMeta{Version: currentVersion, EndTime: 
customEnd.Format(time.RFC3339Nano)}
+       meta1Data, _ := json.Marshal(meta1)
+       require.NoError(t, os.WriteFile(filepath.Join(segPath1, 
metadataFilename), meta1Data, FilePerm))
+
+       // day2 with standard endTime
+       segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+       meta2 := segmentMeta{Version: currentVersion, EndTime: day2.Add(24 * 
time.Hour).Format(time.RFC3339Nano)}
+       meta2Data, _ := json.Marshal(meta2)
+       require.NoError(t, os.WriteFile(filepath.Join(segPath2, 
metadataFilename), meta2Data, FilePerm))
+
+       // Open the controller (loads segments from disk)
+       openErr := sc.open()
+       require.NoError(t, openErr)
+
+       // Verify segments loaded correctly
+       require.Len(t, sc.lst, 2)
+
+       // First segment should have the custom endTime from metadata
+       assert.Equal(t, customEnd, sc.lst[0].End, "segment 0 should use 
persisted endTime")
+       // Second segment should have its endTime from metadata
+       assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "segment 1 
should use persisted endTime")
+
+       for _, seg := range sc.lst {
+               seg.DecRef()
+       }
+}
+
+func TestOpenFallbackOldFormatMetadata(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-open-fallback")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       group := "test-group"
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 
30},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now().UTC()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       day2 := day1.Add(24 * time.Hour)
+
+       // Create segment with OLD format metadata (plain version string)
+       segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segPath1, 
metadataFilename), []byte("1.4.0"), FilePerm))
+
+       segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segPath2, 
metadataFilename), []byte("1.4.0"), FilePerm))
+
+       // Open should succeed with fallback end time computation
+       openErr := sc.open()
+       require.NoError(t, openErr)
+
+       require.Len(t, sc.lst, 2)
+
+       // First segment's end should be day2's start (fallback: end = next 
segment's start)
+       assert.Equal(t, day2, sc.lst[0].End, "old format should use fallback 
end time")
+       // Second segment's end should be day2 + 24h (fallback: end = 
NextTime(start))
+       assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "last segment 
should use NextTime fallback")
+
+       for _, seg := range sc.lst {
+               seg.DecRef()
+       }
+}
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `go test ./banyand/internal/storage/ -run 
"TestOpenReadsPersistedEndTime|TestOpenFallbackOldFormatMetadata" -v`
+Expected: FAIL — `open()` still computes end times from adjacent segments, 
ignoring metadata.
+
+- [ ] **Step 3: Update `open()` to read `endTime` from metadata**
+
+In `banyand/internal/storage/segment.go`, replace the callback body inside 
`open()` (lines 482-501):
+
+```go
+               suffix := sc.format(start)
+               segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
+               metadataPath := path.Join(segmentPath, metadataFilename)
+               version, err := sc.lfs.Read(metadataPath)
+               if err != nil {
+                       if errors.Is(err, fs.ErrNotExist) {
+                               emptySegments = append(emptySegments, 
segmentPath)
+                               return nil
+                       }
+                       return err
+               }
+               if len(version) == 0 {
+                       emptySegments = append(emptySegments, segmentPath)
+                       return nil
+               }
+               if err = checkVersion(convert.BytesToString(version)); err != 
nil {
+                       return err
+               }
+               _, err = sc.load(start, end, sc.location)
+               return err
+```
+
+with:
+
+```go
+               suffix := sc.format(start)
+               segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
+               metadataPath := path.Join(segmentPath, metadataFilename)
+               rawMeta, readErr := sc.lfs.Read(metadataPath)
+               if readErr != nil {
+                       if errors.Is(readErr, fs.ErrNotExist) {
+                               emptySegments = append(emptySegments, 
segmentPath)
+                               return nil
+                       }
+                       return readErr
+               }
+               if len(rawMeta) == 0 {
+                       emptySegments = append(emptySegments, segmentPath)
+                       return nil
+               }
+               meta, parseErr := readSegmentMeta(rawMeta)
+               if parseErr != nil {
+                       return parseErr
+               }
+               segmentEnd := end
+               if meta.EndTime != "" {
+                       parsedEnd, timeErr := time.Parse(time.RFC3339Nano, 
meta.EndTime)
+                       if timeErr != nil {
+                               return timeErr
+                       }
+                       segmentEnd = parsedEnd
+               }
+               _, err = sc.load(start, segmentEnd, sc.location)
+               return err
+```
+
+- [ ] **Step 4: Run the new tests to verify they pass**
+
+Run: `go test ./banyand/internal/storage/ -run 
"TestOpenReadsPersistedEndTime|TestOpenFallbackOldFormatMetadata" -v`
+Expected: PASS.
+
+- [ ] **Step 5: Run all existing segment tests to verify no regression**
+
+Run: `go test ./banyand/internal/storage/ -run TestSegment -v`
+Expected: PASS — existing tests use `openSegment()` directly, not `open()`, so 
they're unaffected.
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add banyand/internal/storage/segment.go 
banyand/internal/storage/segment_test.go
+git commit -m "feat(storage): read persisted endTime from segment metadata on 
load"
+```
+
+---
+
+### Task 5: Run full test suite and verify
+
+**Files:** None changed — verification only.
+
+- [ ] **Step 1: Run the full storage package test suite**
+
+Run: `go test ./banyand/internal/storage/ -v -count=1`
+Expected: All tests PASS.
+
+- [ ] **Step 2: Run the linter**
+
+Run: `make lint`
+Expected: No new lint errors.
+
+- [ ] **Step 3: Final commit (if any fixes needed)**
+
+Only if lint or test fixes were needed.

Reply via email to