This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 29e87c9dc Stable Segment Endtime (#1051)
29e87c9dc is described below
commit 29e87c9dc8077acf6883e663a35d0ac5f630a3e2
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Apr 8 18:06:01 2026 +0800
Stable Segment Endtime (#1051)
---
.gitignore | 5 +-
CHANGES.md | 1 +
banyand/internal/storage/segment.go | 38 ++++--
banyand/internal/storage/segment_test.go | 219 +++++++++++++++++++++++++++++++
banyand/internal/storage/version.go | 23 +++-
banyand/internal/storage/version_test.go | 63 +++++++++
banyand/internal/storage/versions.yml | 1 +
7 files changed, 334 insertions(+), 16 deletions(-)
diff --git a/.gitignore b/.gitignore
index e433fb61c..6f4a80ab7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -75,12 +75,9 @@ gomock_reflect*
.cursor/
# Claude
-Claude.md
.claude/
-CLAUDE.md
+.omc/
# eBPF generated files and binaries
fodc/agent/internal/ktm/iomonitor/ebpf/generated/vmlinux.h
-# Codex
-AGENTS.md
diff --git a/CHANGES.md b/CHANGES.md
index e6c355c75..d7959c781 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
- Add log query e2e test.
- Sync lifecycle e2e test from SkyWalking stages test.
- Add periodic health check for property schema connection.
+- Persist segment end time in per-segment metadata so boundaries don't shift
across restarts or config changes.
### Bug Fixes
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index f9ad6a122..3c88e7d6f 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -19,6 +19,7 @@ package storage
import (
"context"
+ "encoding/json"
"fmt"
"io/fs"
"path"
@@ -33,7 +34,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- "github.com/apache/skywalking-banyandb/pkg/convert"
banyanfs "github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -482,23 +482,32 @@ func (sc *segmentController[T, O]) open() error {
suffix := sc.format(start)
segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate,
suffix))
metadataPath := path.Join(segmentPath, metadataFilename)
- version, err := sc.lfs.Read(metadataPath)
- if err != nil {
- if errors.Is(err, fs.ErrNotExist) {
+ rawMeta, readErr := sc.lfs.Read(metadataPath)
+ if readErr != nil {
+ if errors.Is(readErr, fs.ErrNotExist) {
emptySegments = append(emptySegments,
segmentPath)
return nil
}
- return err
+ return readErr
}
- if len(version) == 0 {
+ if len(rawMeta) == 0 {
emptySegments = append(emptySegments, segmentPath)
return nil
}
- if err = checkVersion(convert.BytesToString(version)); err !=
nil {
- return err
+ meta, parseErr := readSegmentMeta(rawMeta)
+ if parseErr != nil {
+ return parseErr
}
- _, err = sc.load(start, end, sc.location)
- return err
+ segmentEnd := end
+ if meta.EndTime != "" {
+ parsedEnd, timeErr := time.Parse(time.RFC3339Nano,
meta.EndTime)
+ if timeErr != nil {
+ return timeErr
+ }
+ segmentEnd = parsedEnd
+ }
+ _, loadErr := sc.load(start, segmentEnd, sc.location)
+ return loadErr
})
if len(emptySegments) > 0 {
sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments
found, removing them.")
@@ -539,7 +548,14 @@ func (sc *segmentController[T, O]) create(start time.Time)
(*segment[T, O], erro
}
segPath := path.Join(sc.location, fmt.Sprintf(segTemplate,
sc.format(start)))
sc.lfs.MkdirPanicIfExist(segPath, DirPerm)
- data := []byte(currentVersion)
+ meta := segmentMeta{
+ Version: currentVersion,
+ EndTime: end.Format(time.RFC3339Nano),
+ }
+ data, marshalErr := json.Marshal(meta)
+ if marshalErr != nil {
+ logger.Panicf("cannot marshal segment metadata: %s", marshalErr)
+ }
metadataPath := filepath.Join(segPath, metadataFilename)
lf, err := sc.lfs.CreateLockFile(metadataPath, FilePerm)
if err != nil {
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index 7bcc1063b..9a55dd358 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -19,6 +19,7 @@ package storage
import (
"context"
+ "encoding/json"
"fmt"
"os"
"path/filepath"
@@ -641,3 +642,221 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t
*testing.T) {
"Remaining segment %d should be from the expected
date", i)
}
}
+
+func TestCreateSegmentWritesJSONMetadata(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-segment-metadata")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{
+ Database: "test-db",
+ Stage: "test-stage",
+ }
+ })
+
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 1,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx,
+ tempDir,
+ l,
+ opts,
+ nil,
+ nil,
+ 5*time.Minute,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache,
+ group,
+ )
+
+ now := time.Now().UTC()
+ startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0,
time.UTC)
+
+ seg, createErr := sc.create(startTime)
+ require.NoError(t, createErr)
+ require.NotNil(t, seg)
+
+ // Read metadata from disk and verify it's JSON with endTime
+ suffix := startTime.Format(dayFormat)
+ metadataPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix),
metadataFilename)
+ rawMeta, readErr := os.ReadFile(metadataPath)
+ require.NoError(t, readErr)
+
+ meta, parseErr := readSegmentMeta(rawMeta)
+ require.NoError(t, parseErr)
+ assert.Equal(t, currentVersion, meta.Version)
+ assert.NotEmpty(t, meta.EndTime, "endTime should be persisted in
metadata")
+
+ // Verify the endTime matches the segment's End
+ expectedEnd := startTime.Add(24 * time.Hour)
+ assert.Equal(t, expectedEnd.Format(time.RFC3339Nano), meta.EndTime)
+
+ seg.DecRef()
+}
+
+func TestOpenReadsPersistedEndTime(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-open-endtime")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{
+ Database: "test-db",
+ Stage: "test-stage",
+ }
+ })
+
+ group := "test-group"
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 1,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num:
30},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx,
+ tempDir,
+ l,
+ opts,
+ nil,
+ nil,
+ 5*time.Minute,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache,
+ group,
+ )
+
+ now := time.Now().UTC()
+ day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0,
time.UTC)
+ day2 := day1.Add(24 * time.Hour)
+
+ // Manually create segment directories with JSON metadata.
+ // day1 with a specific endTime that differs from the default NextTime.
+ customEnd := day1.Add(12 * time.Hour)
+ segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+ meta1 := segmentMeta{Version: currentVersion, EndTime:
customEnd.Format(time.RFC3339Nano)}
+ meta1Data, marshalErr := json.Marshal(meta1)
+ require.NoError(t, marshalErr)
+ require.NoError(t, os.WriteFile(filepath.Join(segPath1,
metadataFilename), meta1Data, FilePerm))
+
+ // day2 with standard endTime.
+ segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+ meta2 := segmentMeta{Version: currentVersion, EndTime: day2.Add(24 *
time.Hour).Format(time.RFC3339Nano)}
+ meta2Data, marshalErr := json.Marshal(meta2)
+ require.NoError(t, marshalErr)
+ require.NoError(t, os.WriteFile(filepath.Join(segPath2,
metadataFilename), meta2Data, FilePerm))
+
+ // Open the controller (loads segments from disk).
+ openErr := sc.open()
+ require.NoError(t, openErr)
+
+ // Verify segments loaded correctly.
+ require.Len(t, sc.lst, 2)
+
+ // First segment should have the custom endTime from metadata.
+ assert.Equal(t, customEnd, sc.lst[0].End, "segment 0 should use
persisted endTime")
+ // Second segment should have its endTime from metadata.
+ assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "segment 1
should use persisted endTime")
+
+ for _, seg := range sc.lst {
+ seg.DecRef()
+ }
+}
+
+func TestOpenFallbackOldFormatMetadata(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-open-fallback")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{
+ Database: "test-db",
+ Stage: "test-stage",
+ }
+ })
+
+ group := "test-group"
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 1,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num:
30},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx,
+ tempDir,
+ l,
+ opts,
+ nil,
+ nil,
+ 5*time.Minute,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache,
+ group,
+ )
+
+ now := time.Now()
+ day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0,
time.Local)
+ day2 := day1.Add(24 * time.Hour)
+
+ // Create segments with OLD format metadata (plain version string).
+ segPath1 := filepath.Join(tempDir, "seg-"+day1.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath1, DirPerm))
+ require.NoError(t, os.WriteFile(filepath.Join(segPath1,
metadataFilename), []byte("1.4.0"), FilePerm))
+
+ segPath2 := filepath.Join(tempDir, "seg-"+day2.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath2, DirPerm))
+ require.NoError(t, os.WriteFile(filepath.Join(segPath2,
metadataFilename), []byte("1.4.0"), FilePerm))
+
+ // Open should succeed with fallback end time computation.
+ openErr := sc.open()
+ require.NoError(t, openErr)
+
+ require.Len(t, sc.lst, 2)
+
+ // First segment's end should be day2's start (fallback: end = next
segment's start).
+ assert.Equal(t, day2, sc.lst[0].End, "old format should use fallback
end time")
+ // Second segment's end should be day2 + 24h (fallback: end =
NextTime(start)).
+ assert.Equal(t, day2.Add(24*time.Hour), sc.lst[1].End, "last segment
should use NextTime fallback")
+
+ for _, seg := range sc.lst {
+ seg.DecRef()
+ }
+}
diff --git a/banyand/internal/storage/version.go
b/banyand/internal/storage/version.go
index 55a92f04b..ddea79a33 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -28,7 +28,7 @@ import (
const (
metadataFilename = "metadata"
- currentVersion = "1.4.0"
+ currentVersion = "1.5.0"
compatibleVersionsKey = "versions"
compatibleVersionsFilename = "versions.yml"
)
@@ -49,6 +49,27 @@ func checkVersion(version string) error {
return errors.WithMessagef(errVersionIncompatible, "incompatible
version %s, supported versions: %s", version, strings.Join(compatibleVersions,
", "))
}
+type segmentMeta struct {
+ Version string `json:"version"`
+ EndTime string `json:"endTime,omitempty"`
+}
+
+func readSegmentMeta(data []byte) (segmentMeta, error) {
+ var meta segmentMeta
+ trimmed := strings.TrimSpace(string(data))
+ if len(trimmed) > 0 && trimmed[0] == '{' {
+ if unmarshalErr := json.Unmarshal(data, &meta); unmarshalErr !=
nil {
+ return segmentMeta{}, unmarshalErr
+ }
+ } else {
+ meta.Version = trimmed
+ }
+ if checkErr := checkVersion(meta.Version); checkErr != nil {
+ return segmentMeta{}, checkErr
+ }
+ return meta, nil
+}
+
func readCompatibleVersions() []string {
i, err := versionFS.ReadFile(compatibleVersionsFilename)
if err != nil {
diff --git a/banyand/internal/storage/version_test.go
b/banyand/internal/storage/version_test.go
new file mode 100644
index 000000000..726e6d8a2
--- /dev/null
+++ b/banyand/internal/storage/version_test.go
@@ -0,0 +1,63 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestReadSegmentMeta_NewFormat(t *testing.T) {
+ data :=
[]byte(`{"version":"1.4.0","endTime":"2026-04-07T00:00:00+08:00"}`)
+ meta, err := readSegmentMeta(data)
+ require.NoError(t, err)
+ assert.Equal(t, "1.4.0", meta.Version)
+ assert.Equal(t, "2026-04-07T00:00:00+08:00", meta.EndTime)
+}
+
+func TestReadSegmentMeta_OldFormat(t *testing.T) {
+ data := []byte("1.4.0")
+ meta, err := readSegmentMeta(data)
+ require.NoError(t, err)
+ assert.Equal(t, "1.4.0", meta.Version)
+ assert.Equal(t, "", meta.EndTime)
+}
+
+func TestReadSegmentMeta_OldFormatWithNewline(t *testing.T) {
+ data := []byte("1.4.0\n")
+ meta, err := readSegmentMeta(data)
+ require.NoError(t, err)
+ assert.Equal(t, "1.4.0", meta.Version)
+ assert.Equal(t, "", meta.EndTime)
+}
+
+func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) {
+ data :=
[]byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`)
+ _, err := readSegmentMeta(data)
+ assert.Error(t, err)
+}
+
+func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) {
+ data := []byte(`{"version":"1.4.0"}`)
+ meta, err := readSegmentMeta(data)
+ require.NoError(t, err)
+ assert.Equal(t, "1.4.0", meta.Version)
+ assert.Equal(t, "", meta.EndTime)
+}
diff --git a/banyand/internal/storage/versions.yml
b/banyand/internal/storage/versions.yml
index e5cb937c5..ddac13c49 100644
--- a/banyand/internal/storage/versions.yml
+++ b/banyand/internal/storage/versions.yml
@@ -15,3 +15,4 @@
versions:
- 1.4.0
+- 1.5.0