This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 29e87c9dc Stable Segment Endtime (#1051)
29e87c9dc is described below

commit 29e87c9dc8077acf6883e663a35d0ac5f630a3e2
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Apr 8 18:06:01 2026 +0800

    Stable Segment Endtime (#1051)
---
 .gitignore                               |   5 +-
 CHANGES.md                               |   1 +
 banyand/internal/storage/segment.go      |  38 ++++--
 banyand/internal/storage/segment_test.go | 219 +++++++++++++++++++++++++++++++
 banyand/internal/storage/version.go      |  23 +++-
 banyand/internal/storage/version_test.go |  63 +++++++++
 banyand/internal/storage/versions.yml    |   1 +
 7 files changed, 334 insertions(+), 16 deletions(-)

diff --git a/.gitignore b/.gitignore
index e433fb61c..6f4a80ab7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -75,12 +75,9 @@ gomock_reflect*
 .cursor/
 
 # Claude
-Claude.md
 .claude/
-CLAUDE.md
+.omc/
 
 # eBPF generated files and binaries
 fodc/agent/internal/ktm/iomonitor/ebpf/generated/vmlinux.h
 
-# Codex
-AGENTS.md
diff --git a/CHANGES.md b/CHANGES.md
index e6c355c75..d7959c781 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
 - Add log query e2e test.
 - Sync lifecycle e2e test from SkyWalking stages test.
 - Add periodic health check for property schema connection.
+- Persist segment end time in per-segment metadata so boundaries don't shift 
across restarts or config changes.
 
 ### Bug Fixes
 
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index f9ad6a122..3c88e7d6f 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -19,6 +19,7 @@ package storage
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "io/fs"
        "path"
@@ -33,7 +34,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
        banyanfs "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -482,23 +482,32 @@ func (sc *segmentController[T, O]) open() error {
                suffix := sc.format(start)
                segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
                metadataPath := path.Join(segmentPath, metadataFilename)
-               version, err := sc.lfs.Read(metadataPath)
-               if err != nil {
-                       if errors.Is(err, fs.ErrNotExist) {
+               rawMeta, readErr := sc.lfs.Read(metadataPath)
+               if readErr != nil {
+                       if errors.Is(readErr, fs.ErrNotExist) {
                                emptySegments = append(emptySegments, 
segmentPath)
                                return nil
                        }
-                       return err
+                       return readErr
                }
-               if len(version) == 0 {
+               if len(rawMeta) == 0 {
                        emptySegments = append(emptySegments, segmentPath)
                        return nil
                }
-               if err = checkVersion(convert.BytesToString(version)); err != 
nil {
-                       return err
+               meta, parseErr := readSegmentMeta(rawMeta)
+               if parseErr != nil {
+                       return parseErr
                }
-               _, err = sc.load(start, end, sc.location)
-               return err
+               segmentEnd := end
+               if meta.EndTime != "" {
+                       parsedEnd, timeErr := time.Parse(time.RFC3339Nano, 
meta.EndTime)
+                       if timeErr != nil {
+                               return timeErr
+                       }
+                       segmentEnd = parsedEnd
+               }
+               _, loadErr := sc.load(start, segmentEnd, sc.location)
+               return loadErr
        })
        if len(emptySegments) > 0 {
                sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments 
found, removing them.")
@@ -539,7 +548,14 @@ func (sc *segmentController[T, O]) create(start time.Time) 
(*segment[T, O], erro
        }
        segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
sc.format(start)))
        sc.lfs.MkdirPanicIfExist(segPath, DirPerm)
-       data := []byte(currentVersion)
+       meta := segmentMeta{
+               Version: currentVersion,
+               EndTime: end.Format(time.RFC3339Nano),
+       }
+       data, marshalErr := json.Marshal(meta)
+       if marshalErr != nil {
+               logger.Panicf("cannot marshal segment metadata: %s", marshalErr)
+       }
        metadataPath := filepath.Join(segPath, metadataFilename)
        lf, err := sc.lfs.CreateLockFile(metadataPath, FilePerm)
        if err != nil {
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index 7bcc1063b..9a55dd358 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -19,6 +19,7 @@ package storage
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "os"
        "path/filepath"
@@ -641,3 +642,221 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t 
*testing.T) {
                        "Remaining segment %d should be from the expected 
date", i)
        }
 }
+
+func TestCreateSegmentWritesJSONMetadata(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-segment-metadata")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 7},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now().UTC()
+       startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+
+       seg, createErr := sc.create(startTime)
+       require.NoError(t, createErr)
+       require.NotNil(t, seg)
+
+       // Read metadata from disk and verify it's JSON with endTime
+       suffix := startTime.Format(dayFormat)
+       metadataPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix), 
metadataFilename)
+       rawMeta, readErr := os.ReadFile(metadataPath)
+       require.NoError(t, readErr)
+
+       meta, parseErr := readSegmentMeta(rawMeta)
+       require.NoError(t, parseErr)
+       assert.Equal(t, currentVersion, meta.Version)
+       assert.NotEmpty(t, meta.EndTime, "endTime should be persisted in 
metadata")
+
+       // Verify the endTime matches the segment's End
+       expectedEnd := startTime.Add(24 * time.Hour)
+       assert.Equal(t, expectedEnd.Format(time.RFC3339Nano), meta.EndTime)
+
+       seg.DecRef()
+}
+
+func TestOpenReadsPersistedEndTime(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-open-endtime")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       group := "test-group"
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 
30},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now().UTC()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       day2 := day1.Add(24 * time.Hour)
+
+       // Manually create segment directories with JSON metadata.
+       // day1 with a specific endTime that differs from the default NextTime.
+       customEnd := day1.Add(12 * time.Hour)
+       segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+       meta1 := segmentMeta{Version: currentVersion, EndTime: 
customEnd.Format(time.RFC3339Nano)}
+       meta1Data, marshalErr := json.Marshal(meta1)
+       require.NoError(t, marshalErr)
+       require.NoError(t, os.WriteFile(filepath.Join(segPath1, 
metadataFilename), meta1Data, FilePerm))
+
+       // day2 with standard endTime.
+       segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+       meta2 := segmentMeta{Version: currentVersion, EndTime: day2.Add(24 * 
time.Hour).Format(time.RFC3339Nano)}
+       meta2Data, marshalErr := json.Marshal(meta2)
+       require.NoError(t, marshalErr)
+       require.NoError(t, os.WriteFile(filepath.Join(segPath2, 
metadataFilename), meta2Data, FilePerm))
+
+       // Open the controller (loads segments from disk).
+       openErr := sc.open()
+       require.NoError(t, openErr)
+
+       // Verify segments loaded correctly.
+       require.Len(t, sc.lst, 2)
+
+       // First segment should have the custom endTime from metadata.
+       assert.Equal(t, customEnd, sc.lst[0].End, "segment 0 should use 
persisted endTime")
+       // Second segment should have its endTime from metadata.
+       assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "segment 1 
should use persisted endTime")
+
+       for _, seg := range sc.lst {
+               seg.DecRef()
+       }
+}
+
+func TestOpenFallbackOldFormatMetadata(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-open-fallback")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       group := "test-group"
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 
30},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+
+       now := time.Now()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.Local)
+       day2 := day1.Add(24 * time.Hour)
+
+       // Create segments with OLD format metadata (plain version string).
+       segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segPath1, 
metadataFilename), []byte("1.4.0"), FilePerm))
+
+       segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segPath2, 
metadataFilename), []byte("1.4.0"), FilePerm))
+
+       // Open should succeed with fallback end time computation.
+       openErr := sc.open()
+       require.NoError(t, openErr)
+
+       require.Len(t, sc.lst, 2)
+
+       // First segment's end should be day2's start (fallback: end = next 
segment's start).
+       assert.Equal(t, day2, sc.lst[0].End, "old format should use fallback 
end time")
+       // Second segment's end should be day2 + 24h (fallback: end = 
NextTime(start)).
+       assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "last segment 
should use NextTime fallback")
+
+       for _, seg := range sc.lst {
+               seg.DecRef()
+       }
+}
diff --git a/banyand/internal/storage/version.go 
b/banyand/internal/storage/version.go
index 55a92f04b..ddea79a33 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -28,7 +28,7 @@ import (
 
 const (
        metadataFilename           = "metadata"
-       currentVersion             = "1.4.0"
+       currentVersion             = "1.5.0"
        compatibleVersionsKey      = "versions"
        compatibleVersionsFilename = "versions.yml"
 )
@@ -49,6 +49,27 @@ func checkVersion(version string) error {
        return errors.WithMessagef(errVersionIncompatible, "incompatible 
version %s, supported versions: %s", version, strings.Join(compatibleVersions, 
", "))
 }
 
+type segmentMeta struct {
+       Version string `json:"version"`
+       EndTime string `json:"endTime,omitempty"`
+}
+
+func readSegmentMeta(data []byte) (segmentMeta, error) {
+       var meta segmentMeta
+       trimmed := strings.TrimSpace(string(data))
+       if len(trimmed) > 0 && trimmed[0] == '{' {
+               if unmarshalErr := json.Unmarshal(data, &meta); unmarshalErr != 
nil {
+                       return segmentMeta{}, unmarshalErr
+               }
+       } else {
+               meta.Version = trimmed
+       }
+       if checkErr := checkVersion(meta.Version); checkErr != nil {
+               return segmentMeta{}, checkErr
+       }
+       return meta, nil
+}
+
 func readCompatibleVersions() []string {
        i, err := versionFS.ReadFile(compatibleVersionsFilename)
        if err != nil {
diff --git a/banyand/internal/storage/version_test.go 
b/banyand/internal/storage/version_test.go
new file mode 100644
index 000000000..726e6d8a2
--- /dev/null
+++ b/banyand/internal/storage/version_test.go
@@ -0,0 +1,63 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestReadSegmentMeta_NewFormat(t *testing.T) {
+       data := 
[]byte(`{"version":"1.4.0","endTime":"2026-04-07T00:00:00+08:00"}`)
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.4.0", meta.Version)
+       assert.Equal(t, "2026-04-07T00:00:00+08:00", meta.EndTime)
+}
+
+func TestReadSegmentMeta_OldFormat(t *testing.T) {
+       data := []byte("1.4.0")
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.4.0", meta.Version)
+       assert.Equal(t, "", meta.EndTime)
+}
+
+func TestReadSegmentMeta_OldFormatWithNewline(t *testing.T) {
+       data := []byte("1.4.0\n")
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.4.0", meta.Version)
+       assert.Equal(t, "", meta.EndTime)
+}
+
+func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) {
+       data := 
[]byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`)
+       _, err := readSegmentMeta(data)
+       assert.Error(t, err)
+}
+
+func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) {
+       data := []byte(`{"version":"1.4.0"}`)
+       meta, err := readSegmentMeta(data)
+       require.NoError(t, err)
+       assert.Equal(t, "1.4.0", meta.Version)
+       assert.Equal(t, "", meta.EndTime)
+}
diff --git a/banyand/internal/storage/versions.yml 
b/banyand/internal/storage/versions.yml
index e5cb937c5..ddac13c49 100644
--- a/banyand/internal/storage/versions.yml
+++ b/banyand/internal/storage/versions.yml
@@ -15,3 +15,4 @@
 
 versions:
 - 1.4.0
+- 1.5.0

Reply via email to