This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b8e1f72d3 fix(sidx): use MinTimestamp/MaxTimestamp instead of 
SegmentID in streaming sync (#1060)
b8e1f72d3 is described below

commit b8e1f72d3dd4c7026e752a17e875211ead4ccb21
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Apr 11 07:57:50 2026 +0800

    fix(sidx): use MinTimestamp/MaxTimestamp instead of SegmentID in streaming 
sync (#1060)
    
    * fix(sidx): use MinTimestamp/MaxTimestamp instead of SegmentID in 
streaming sync
    
    SIDX StreamingParts was incorrectly setting MinTimestamp from
    partMetadata.SegmentID and omitting MaxTimestamp entirely. This caused
    the receiving node to reject parts with "invalid MinTimestamp 0" when
    SegmentID was zero, and previously created corrupt seg-19700101
    directories before validation was added in #1059.
---
 CHANGES.md                         |   1 +
 banyand/internal/sidx/sync.go      |  19 +++-
 banyand/internal/sidx/sync_test.go | 191 +++++++++++++++++++++++++++++++++++++
 3 files changed, 208 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 48c2d04fb..939f46a83 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -29,6 +29,7 @@ Release Notes.
 - ui: fix query editor refresh/reset behavior and BydbQL keyword highlighting.
 - Disable the rotation task on warm and cold nodes to prevent incorrect 
segment boundaries during lifecycle migration.
 - Prevent epoch-dated segment directories (seg-19700101) from being created by 
zero timestamps in distributed sync paths.
+- Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the 
actual timestamp, causing sync failures on the receiving node.
 
 ### Chores
 
diff --git a/banyand/internal/sidx/sync.go b/banyand/internal/sidx/sync.go
index 465df54eb..96352364f 100644
--- a/banyand/internal/sidx/sync.go
+++ b/banyand/internal/sidx/sync.go
@@ -74,7 +74,7 @@ func (s *sidx) StreamingParts(partIDsToSync 
map[uint64]struct{}, group string, s
                        part := pw.p
                        files, release := createPartFileReaders(part)
                        releaseFuncs = append(releaseFuncs, release)
-                       streamingParts = append(streamingParts, 
queue.StreamingPartData{
+                       spd := queue.StreamingPartData{
                                ID:                    part.partMetadata.ID,
                                Group:                 group,
                                ShardID:               shardID,
@@ -84,11 +84,24 @@ func (s *sidx) StreamingParts(partIDsToSync 
map[uint64]struct{}, group string, s
                                UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
                                TotalCount:            
part.partMetadata.TotalCount,
                                BlocksCount:           
part.partMetadata.BlocksCount,
-                               MinTimestamp:          
part.partMetadata.SegmentID,
                                MinKey:                part.partMetadata.MinKey,
                                MaxKey:                part.partMetadata.MaxKey,
                                PartType:              name,
-                       })
+                       }
+                       switch {
+                       case part.partMetadata.MinTimestamp != nil:
+                               spd.MinTimestamp = 
*part.partMetadata.MinTimestamp
+                       case part.partMetadata.SegmentID > 0:
+                               spd.MinTimestamp = part.partMetadata.SegmentID
+                       default:
+                               logger.Panicf("sidx streaming parts: %s, part 
%d has no valid timestamp (MinTimestamp=nil, SegmentID=0)", name, 
part.partMetadata.ID)
+                       }
+                       if part.partMetadata.MaxTimestamp != nil {
+                               spd.MaxTimestamp = 
*part.partMetadata.MaxTimestamp
+                       } else {
+                               spd.MaxTimestamp = spd.MinTimestamp
+                       }
+                       streamingParts = append(streamingParts, spd)
                }
        }
 
diff --git a/banyand/internal/sidx/sync_test.go 
b/banyand/internal/sidx/sync_test.go
new file mode 100644
index 000000000..ff6a32435
--- /dev/null
+++ b/banyand/internal/sidx/sync_test.go
@@ -0,0 +1,191 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+// TestStreamingParts_Timestamps verifies that StreamingParts propagates
+// MinTimestamp and MaxTimestamp from partMetadata (not SegmentID) to
+// StreamingPartData. This prevents zero-timestamp segments from being
+// created on the receiving node during distributed sync.
+func TestStreamingParts_Timestamps(t *testing.T) {
+       reqs := []WriteRequest{
+               createTestWriteRequest(1, 100, "data1"),
+               createTestWriteRequest(1, 200, "data2"),
+       }
+
+       t.Run("with_timestamps_set", func(t *testing.T) {
+               sidxIface := createTestSIDX(t)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               minTS := int64(1700000000)
+               maxTS := int64(1700001000)
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 1, &minTS, &maxTS)
+
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               partIDs := map[uint64]struct{}{1: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+
+               require.Len(t, parts, 1)
+               assert.Equal(t, uint64(1), parts[0].ID)
+               assert.Equal(t, int64(1700000000), parts[0].MinTimestamp,
+                       "MinTimestamp should come from 
partMetadata.MinTimestamp, not SegmentID")
+               assert.Equal(t, int64(1700001000), parts[0].MaxTimestamp,
+                       "MaxTimestamp should come from 
partMetadata.MaxTimestamp")
+               assert.Equal(t, "test-group", parts[0].Group)
+               assert.Equal(t, uint32(0), parts[0].ShardID)
+               assert.Equal(t, data.TopicTracePartSync.String(), 
parts[0].Topic)
+               assert.Equal(t, "test-sidx", parts[0].PartType)
+       })
+
+       t.Run("nil_timestamps_fallback_to_segment_id", func(t *testing.T) {
+               sidxIface := createTestSIDX(t)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               // segmentID=1000, nil timestamps — should fall back to 
SegmentID
+               writeTestDataWithTimeRange(t, raw, reqs, 1000, 1, nil, nil)
+
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               partIDs := map[uint64]struct{}{1: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+
+               require.Len(t, parts, 1)
+               assert.Equal(t, int64(1000), parts[0].MinTimestamp,
+                       "MinTimestamp should fall back to SegmentID when 
partMetadata.MinTimestamp is nil")
+               assert.Equal(t, int64(1000), parts[0].MaxTimestamp,
+                       "MaxTimestamp should fall back to MinTimestamp when 
partMetadata.MaxTimestamp is nil")
+       })
+
+       t.Run("nil_timestamps_zero_segment_id_panics", func(t *testing.T) {
+               sidxIface := createTestSIDX(t)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               // segmentID=0, nil timestamps — should panic
+               writeTestDataWithTimeRange(t, raw, reqs, 0, 1, nil, nil)
+
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               partIDs := map[uint64]struct{}{1: {}}
+               assert.Panics(t, func() {
+                       parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+                       _ = parts
+               }, "Should panic when both MinTimestamp and SegmentID are zero")
+       })
+
+       t.Run("nil_snapshot_returns_nil", func(t *testing.T) {
+               sidxIface := createTestSIDX(t)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               partIDs := map[uint64]struct{}{1: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+               assert.Nil(t, parts)
+       })
+
+       t.Run("multiple_parts_sorted_by_id", func(t *testing.T) {
+               dir := t.TempDir()
+               fileSystem := fs.NewLocalFileSystem()
+               opts := NewDefaultOptions()
+               opts.Memory = 
protector.NewMemory(observability.NewBypassRegistry())
+               opts.Path = dir
+
+               sidxIface, err := NewSIDX(fileSystem, opts)
+               require.NoError(t, err)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               min1, max1 := int64(1700000000), int64(1700001000)
+               min2, max2 := int64(1800000000), int64(1800001000)
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 2, &min1, &max1)
+               writeTestDataWithTimeRange(t, raw, reqs, 2, 3, &min2, &max2)
+
+               flushIntro, flushErr := raw.Flush(map[uint64]struct{}{2: {}, 3: 
{}})
+               require.NoError(t, flushErr)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               partIDs := map[uint64]struct{}{2: {}, 3: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+
+               require.Len(t, parts, 2)
+               // Parts should be sorted by ID
+               assert.Equal(t, uint64(2), parts[0].ID)
+               assert.Equal(t, int64(1700000000), parts[0].MinTimestamp)
+               assert.Equal(t, int64(1700001000), parts[0].MaxTimestamp)
+               assert.Equal(t, uint64(3), parts[1].ID)
+               assert.Equal(t, int64(1800000000), parts[1].MinTimestamp)
+               assert.Equal(t, int64(1800001000), parts[1].MaxTimestamp)
+       })
+}

Reply via email to