This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug-empty-mints
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c3d20cb13739e77e27258404e22571ff5cd4c4fc
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri Apr 10 11:04:14 2026 +0000

    fix(sidx): use MinTimestamp/MaxTimestamp instead of SegmentID in streaming 
sync
    
    SIDX StreamingParts was incorrectly setting MinTimestamp from
    partMetadata.SegmentID and omitting MaxTimestamp entirely. This caused
    the receiving node to reject parts with "invalid MinTimestamp 0" when
    SegmentID was zero, and previously created corrupt seg-19700101
    directories before validation was added in #1059.
---
 CHANGES.md                         |   1 +
 banyand/internal/sidx/sync.go      |  12 ++-
 banyand/internal/sidx/sync_test.go | 166 +++++++++++++++++++++++++++++++++++++
 3 files changed, 176 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 48c2d04fb..939f46a83 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -29,6 +29,7 @@ Release Notes.
 - ui: fix query editor refresh/reset behavior and BydbQL keyword highlighting.
 - Disable the rotation task on warm and cold nodes to prevent incorrect 
segment boundaries during lifecycle migration.
 - Prevent epoch-dated segment directories (seg-19700101) from being created by 
zero timestamps in distributed sync paths.
+- Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the 
actual timestamp, causing sync failures on the receiving node.
 
 ### Chores
 
diff --git a/banyand/internal/sidx/sync.go b/banyand/internal/sidx/sync.go
index 465df54eb..7a0ae0523 100644
--- a/banyand/internal/sidx/sync.go
+++ b/banyand/internal/sidx/sync.go
@@ -74,7 +74,7 @@ func (s *sidx) StreamingParts(partIDsToSync 
map[uint64]struct{}, group string, s
                        part := pw.p
                        files, release := createPartFileReaders(part)
                        releaseFuncs = append(releaseFuncs, release)
-                       streamingParts = append(streamingParts, 
queue.StreamingPartData{
+                       spd := queue.StreamingPartData{
                                ID:                    part.partMetadata.ID,
                                Group:                 group,
                                ShardID:               shardID,
@@ -84,11 +84,17 @@ func (s *sidx) StreamingParts(partIDsToSync 
map[uint64]struct{}, group string, s
                                UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
                                TotalCount:            
part.partMetadata.TotalCount,
                                BlocksCount:           
part.partMetadata.BlocksCount,
-                               MinTimestamp:          
part.partMetadata.SegmentID,
                                MinKey:                part.partMetadata.MinKey,
                                MaxKey:                part.partMetadata.MaxKey,
                                PartType:              name,
-                       })
+                       }
+                       if part.partMetadata.MinTimestamp != nil {
+                               spd.MinTimestamp = 
*part.partMetadata.MinTimestamp
+                       }
+                       if part.partMetadata.MaxTimestamp != nil {
+                               spd.MaxTimestamp = 
*part.partMetadata.MaxTimestamp
+                       }
+                       streamingParts = append(streamingParts, spd)
                }
        }
 
diff --git a/banyand/internal/sidx/sync_test.go 
b/banyand/internal/sidx/sync_test.go
new file mode 100644
index 000000000..0d73d0f64
--- /dev/null
+++ b/banyand/internal/sidx/sync_test.go
@@ -0,0 +1,166 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+// TestStreamingParts_Timestamps verifies that StreamingParts propagates
+// MinTimestamp and MaxTimestamp from partMetadata (not SegmentID) to
+// StreamingPartData. This prevents zero-timestamp segments from being
+// created on the receiving node during distributed sync.
+func TestStreamingParts_Timestamps(t *testing.T) {
+       reqs := []WriteRequest{
+               createTestWriteRequest(1, 100, "data1"),
+               createTestWriteRequest(1, 200, "data2"),
+       }
+
+       t.Run("with_timestamps_set", func(t *testing.T) {
+               sidxIface := createTestSIDX(t)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               minTS := int64(1700000000)
+               maxTS := int64(1700001000)
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 1, &minTS, &maxTS)
+
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               partIDs := map[uint64]struct{}{1: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+
+               require.Len(t, parts, 1)
+               assert.Equal(t, uint64(1), parts[0].ID)
+               assert.Equal(t, int64(1700000000), parts[0].MinTimestamp,
+                       "MinTimestamp should come from 
partMetadata.MinTimestamp, not SegmentID")
+               assert.Equal(t, int64(1700001000), parts[0].MaxTimestamp,
+                       "MaxTimestamp should come from 
partMetadata.MaxTimestamp")
+               assert.Equal(t, "test-group", parts[0].Group)
+               assert.Equal(t, uint32(0), parts[0].ShardID)
+               assert.Equal(t, data.TopicTracePartSync.String(), 
parts[0].Topic)
+               assert.Equal(t, "test-sidx", parts[0].PartType)
+       })
+
+       t.Run("nil_timestamps_default_to_zero", func(t *testing.T) {
+               sidxIface := createTestSIDX(t)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 1, nil, nil)
+
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               partIDs := map[uint64]struct{}{1: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+
+               require.Len(t, parts, 1)
+               assert.Equal(t, int64(0), parts[0].MinTimestamp,
+                       "MinTimestamp should default to 0 when 
partMetadata.MinTimestamp is nil")
+               assert.Equal(t, int64(0), parts[0].MaxTimestamp,
+                       "MaxTimestamp should default to 0 when 
partMetadata.MaxTimestamp is nil")
+       })
+
+       t.Run("nil_snapshot_returns_nil", func(t *testing.T) {
+               sidxIface := createTestSIDX(t)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               partIDs := map[uint64]struct{}{1: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+               assert.Nil(t, parts)
+       })
+
+       t.Run("multiple_parts_sorted_by_id", func(t *testing.T) {
+               dir := t.TempDir()
+               fileSystem := fs.NewLocalFileSystem()
+               opts := NewDefaultOptions()
+               opts.Memory = 
protector.NewMemory(observability.NewBypassRegistry())
+               opts.Path = dir
+
+               sidxIface, err := NewSIDX(fileSystem, opts)
+               require.NoError(t, err)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               min1, max1 := int64(1700000000), int64(1700001000)
+               min2, max2 := int64(1800000000), int64(1800001000)
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 2, &min1, &max1)
+               writeTestDataWithTimeRange(t, raw, reqs, 2, 3, &min2, &max2)
+
+               flushIntro, flushErr := raw.Flush(map[uint64]struct{}{2: {}, 3: 
{}})
+               require.NoError(t, flushErr)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               partIDs := map[uint64]struct{}{2: {}, 3: {}}
+               parts, releaseFuncs := raw.StreamingParts(partIDs, 
"test-group", 0, "test-sidx")
+               defer func() {
+                       for _, release := range releaseFuncs {
+                               release()
+                       }
+               }()
+
+               require.Len(t, parts, 2)
+               // Parts should be sorted by ID
+               assert.Equal(t, uint64(2), parts[0].ID)
+               assert.Equal(t, int64(1700000000), parts[0].MinTimestamp)
+               assert.Equal(t, int64(1700001000), parts[0].MaxTimestamp)
+               assert.Equal(t, uint64(3), parts[1].ID)
+               assert.Equal(t, int64(1800000000), parts[1].MinTimestamp)
+               assert.Equal(t, int64(1800001000), parts[1].MaxTimestamp)
+       })
+}
+

Reply via email to