This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/sidx-timerange
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ef40900ab9f0170e32c06a48b4501ebe612681be
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri Mar 13 12:55:54 2026 +0800

    Enhance SIDX with optional timestamp support for part selection and 
validation
    
    - Introduced MinTimestamp and MaxTimestamp fields in partMetadata and 
updated related structures.
    - Updated ConvertToMemPart to accept optional timestamp parameters.
    - Modified filtering logic in filterPartsByTimeRange to prioritize 
timestamp overlap.
    - Enhanced validation checks for timestamp ranges in QueryRequest and 
ScanQueryRequest.
    - Added tests for timestamp validation and serialization in partMetadata.
    - Updated relevant functions to handle timestamp ranges in queries and 
scans.
    
    This change improves the flexibility of part selection based on both key 
and timestamp ranges.
---
 banyand/cmd/dump/sidx.go                      |  20 +-
 banyand/internal/sidx/interfaces.go           |  27 +-
 banyand/internal/sidx/merge.go                |  22 ++
 banyand/internal/sidx/metadata.go             |  21 +-
 banyand/internal/sidx/metadata_test.go        |  59 ++++
 banyand/internal/sidx/part_wrapper.go         |  31 +-
 banyand/internal/sidx/part_wrapper_test.go    |  35 +++
 banyand/internal/sidx/query.go                |  17 +-
 banyand/internal/sidx/query_benchmark_test.go |   2 +-
 banyand/internal/sidx/scan_query.go           |  24 +-
 banyand/internal/sidx/sidx.go                 |   4 +-
 banyand/internal/sidx/sidx_test.go            |   8 +-
 banyand/internal/sidx/snapshot_test.go        |   2 +-
 banyand/internal/sidx/timestamp_test.go       | 394 ++++++++++++++++++++++++++
 banyand/trace/streaming_pipeline_test.go      |   4 +-
 banyand/trace/write_liaison.go                |   4 +-
 banyand/trace/write_standalone.go             |   4 +-
 17 files changed, 625 insertions(+), 53 deletions(-)

diff --git a/banyand/cmd/dump/sidx.go b/banyand/cmd/dump/sidx.go
index c084119b1..50cd17f71 100644
--- a/banyand/cmd/dump/sidx.go
+++ b/banyand/cmd/dump/sidx.go
@@ -280,26 +280,30 @@ func filterPartsByTimeRange(sidxPath string, partIDs 
[]uint64, minKey, maxKey in
                partPath := filepath.Join(sidxPath, fmt.Sprintf("%016x", 
partID))
                manifestPath := filepath.Join(partPath, "manifest.json")
 
-               // Read manifest.json
                manifestData, err := fileSystem.Read(manifestPath)
                if err != nil {
-                       // Skip parts that don't have a manifest
                        continue
                }
 
-               // Parse manifest to get minKey and maxKey
                var manifest struct {
-                       MinKey int64 `json:"minKey"`
-                       MaxKey int64 `json:"maxKey"`
+                       MinTimestamp *int64 `json:"minTimestamp,omitempty"`
+                       MaxTimestamp *int64 `json:"maxTimestamp,omitempty"`
+                       MinKey       int64  `json:"minKey"`
+                       MaxKey       int64  `json:"maxKey"`
                }
-
                if err := json.Unmarshal(manifestData, &manifest); err != nil {
                        fmt.Fprintf(os.Stderr, "Warning: failed to parse 
manifest for part %016x: %v\n", partID, err)
                        continue
                }
 
-               // Check if part overlaps with the requested time range
-               if manifest.MaxKey >= minKey && manifest.MinKey <= maxKey {
+               // Prefer timestamp overlap when manifest has 
minTimestamp/maxTimestamp; otherwise fallback to key range
+               overlaps := false
+               if manifest.MinTimestamp != nil && manifest.MaxTimestamp != nil 
{
+                       overlaps = *manifest.MaxTimestamp >= minKey && 
*manifest.MinTimestamp <= maxKey
+               } else {
+                       overlaps = manifest.MaxKey >= minKey && manifest.MinKey 
<= maxKey
+               }
+               if overlaps {
                        filteredParts = append(filteredParts, partID)
                }
        }
diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index d0c3e5d50..e15d545de 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -62,7 +62,8 @@ type SIDX interface {
        // IntroduceMerged introduces a merged map and a new part to the SIDX 
instance.
        IntroduceMerged(nextIntroduction *MergerIntroduction) func()
        // ConvertToMemPart converts a write request to a memPart.
-       ConvertToMemPart(reqs []WriteRequest, segmentID int64) (*MemPart, error)
+       // minTimestamp and maxTimestamp are optional; when provided they are 
stored in manifest.json for part selection.
+       ConvertToMemPart(reqs []WriteRequest, segmentID int64, minTimestamp, 
maxTimestamp *int64) (*MemPart, error)
        // StreamingQuery executes the query and streams batched QueryResponse 
objects.
        // The returned QueryResponse channel contains ordered batches limited 
by req.MaxBatchSize
        // unique Data elements (when positive). The error channel delivers any 
fatal execution error.
@@ -107,6 +108,8 @@ type QueryRequest struct {
        Order         *index.OrderBy
        MinKey        *int64
        MaxKey        *int64
+       MinTimestamp  *int64
+       MaxTimestamp  *int64
        SeriesIDs     []common.SeriesID
        TagProjection []model.TagProjection
        MaxBatchSize  int
@@ -126,6 +129,8 @@ type ScanQueryRequest struct {
        OnProgress    ScanProgressFunc
        MinKey        *int64
        MaxKey        *int64
+       MinTimestamp  *int64
+       MaxTimestamp  *int64
        TagProjection []model.TagProjection
        MaxBatchSize  int
 }
@@ -351,6 +356,9 @@ func (qr QueryRequest) Validate() error {
        if qr.MinKey != nil && qr.MaxKey != nil && *qr.MinKey > *qr.MaxKey {
                return fmt.Errorf("MinKey cannot be greater than MaxKey")
        }
+       if qr.MinTimestamp != nil && qr.MaxTimestamp != nil && *qr.MinTimestamp 
> *qr.MaxTimestamp {
+               return fmt.Errorf("MinTimestamp cannot be greater than 
MaxTimestamp")
+       }
        return nil
 }
 
@@ -362,6 +370,9 @@ func (sqr ScanQueryRequest) Validate() error {
        if sqr.MinKey != nil && sqr.MaxKey != nil && *sqr.MinKey > *sqr.MaxKey {
                return fmt.Errorf("MinKey cannot be greater than MaxKey")
        }
+       if sqr.MinTimestamp != nil && sqr.MaxTimestamp != nil && 
*sqr.MinTimestamp > *sqr.MaxTimestamp {
+               return fmt.Errorf("MinTimestamp cannot be greater than 
MaxTimestamp")
+       }
        return nil
 }
 
@@ -374,6 +385,8 @@ func (qr *QueryRequest) Reset() {
        qr.MaxBatchSize = 0
        qr.MinKey = nil
        qr.MaxKey = nil
+       qr.MinTimestamp = nil
+       qr.MaxTimestamp = nil
 }
 
 // CopyFrom copies the QueryRequest from other to qr.
@@ -413,6 +426,18 @@ func (qr *QueryRequest) CopyFrom(other *QueryRequest) {
        } else {
                qr.MaxKey = nil
        }
+       if other.MinTimestamp != nil {
+               minTS := *other.MinTimestamp
+               qr.MinTimestamp = &minTS
+       } else {
+               qr.MinTimestamp = nil
+       }
+       if other.MaxTimestamp != nil {
+               maxTS := *other.MaxTimestamp
+               qr.MaxTimestamp = &maxTS
+       } else {
+               qr.MaxTimestamp = nil
+       }
 }
 
 // Interface Usage Examples and Best Practices
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index a537e3881..157709698 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -103,6 +103,28 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}, par
        if err != nil {
                return nil, err
        }
+       // Aggregate optional timestamp range from merged parts
+       var minVal, maxVal int64
+       var hasMinTS, hasMaxTS bool
+       for i := range parts {
+               p := parts[i].p.partMetadata
+               if p.MinTimestamp != nil {
+                       if !hasMinTS || *p.MinTimestamp < minVal {
+                               minVal = *p.MinTimestamp
+                               hasMinTS = true
+                       }
+               }
+               if p.MaxTimestamp != nil {
+                       if !hasMaxTS || *p.MaxTimestamp > maxVal {
+                               maxVal = *p.MaxTimestamp
+                               hasMaxTS = true
+                       }
+               }
+       }
+       if hasMinTS && hasMaxTS {
+               pm.MinTimestamp = &minVal
+               pm.MaxTimestamp = &maxVal
+       }
        pm.mustWriteMetadata(fileSystem, dstPath)
        fileSystem.SyncPath(dstPath)
        p := mustOpenPart(partID, dstPath, fileSystem)
diff --git a/banyand/internal/sidx/metadata.go 
b/banyand/internal/sidx/metadata.go
index ba4a6f5f0..58e0c252c 100644
--- a/banyand/internal/sidx/metadata.go
+++ b/banyand/internal/sidx/metadata.go
@@ -36,19 +36,16 @@ import (
 
 // partMetadata contains metadata for an entire part (replaces 
timestamp-specific metadata from stream module).
 type partMetadata struct {
-       // Size information
+       MinTimestamp          *int64 `json:"minTimestamp,omitempty"`
+       MaxTimestamp          *int64 `json:"maxTimestamp,omitempty"`
        CompressedSizeBytes   uint64 `json:"compressedSizeBytes"`
        UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"`
        TotalCount            uint64 `json:"totalCount"`
        BlocksCount           uint64 `json:"blocksCount"`
-
-       // Key range
-       MinKey int64 `json:"minKey"` // Minimum user key in part
-       MaxKey int64 `json:"maxKey"` // Maximum user key in part
-
-       // Identity
-       ID        uint64 `json:"id"`        // Unique part identifier
-       SegmentID int64  `json:"segmentID"` // Segment identifier
+       MinKey                int64  `json:"minKey"`
+       MaxKey                int64  `json:"maxKey"`
+       ID                    uint64 `json:"id"`
+       SegmentID             int64  `json:"segmentID"`
 }
 
 func validatePartMetadata(fileSystem fs.FileSystem, partPath string) error {
@@ -140,6 +137,8 @@ func (pm *partMetadata) reset() {
        pm.BlocksCount = 0
        pm.MinKey = 0
        pm.MaxKey = 0
+       pm.MinTimestamp = nil
+       pm.MaxTimestamp = nil
        pm.ID = 0
 }
 
@@ -192,6 +191,10 @@ func (pm *partMetadata) validate() error {
        if pm.MinKey > pm.MaxKey {
                return fmt.Errorf("invalid key range: MinKey (%d) > MaxKey 
(%d)", pm.MinKey, pm.MaxKey)
        }
+       if pm.MinTimestamp != nil && pm.MaxTimestamp != nil && *pm.MinTimestamp 
> *pm.MaxTimestamp {
+               return fmt.Errorf("invalid timestamp range: minTimestamp (%d) > 
maxTimestamp (%d)",
+                       *pm.MinTimestamp, *pm.MaxTimestamp)
+       }
        if pm.CompressedSizeBytes > pm.UncompressedSizeBytes {
                return fmt.Errorf("invalid size: compressed (%d) > uncompressed 
(%d)",
                        pm.CompressedSizeBytes, pm.UncompressedSizeBytes)
diff --git a/banyand/internal/sidx/metadata_test.go 
b/banyand/internal/sidx/metadata_test.go
index 248bec420..09fe0b0c8 100644
--- a/banyand/internal/sidx/metadata_test.go
+++ b/banyand/internal/sidx/metadata_test.go
@@ -104,6 +104,37 @@ func TestPartMetadata_Validation(t *testing.T) {
                        },
                        expectErr: false,
                },
+               {
+                       name: "invalid timestamp range - MinTimestamp > 
MaxTimestamp",
+                       metadata: &partMetadata{
+                               CompressedSizeBytes:   100,
+                               UncompressedSizeBytes: 200,
+                               TotalCount:            10,
+                               BlocksCount:           2,
+                               MinKey:                1,
+                               MaxKey:                100,
+                               ID:                    1,
+                               MinTimestamp:          ptrInt64(200),
+                               MaxTimestamp:          ptrInt64(100),
+                       },
+                       expectErr: true,
+                       errMsg:    "invalid timestamp range",
+               },
+               {
+                       name: "valid metadata with optional timestamps",
+                       metadata: &partMetadata{
+                               CompressedSizeBytes:   100,
+                               UncompressedSizeBytes: 200,
+                               TotalCount:            10,
+                               BlocksCount:           2,
+                               MinKey:                1,
+                               MaxKey:                100,
+                               MinTimestamp:          ptrInt64(100),
+                               MaxTimestamp:          ptrInt64(200),
+                               ID:                    1,
+                       },
+                       expectErr: false,
+               },
        }
 
        for _, tt := range tests {
@@ -453,6 +484,34 @@ func TestPartMetadata_Serialization(t *testing.T) {
        assert.Equal(t, original.ID, restored.ID)
 }
 
+func TestPartMetadata_SerializationWithTimestamps(t *testing.T) {
+       minTS := int64(1000)
+       maxTS := int64(2000)
+       original := &partMetadata{
+               CompressedSizeBytes:   1000,
+               UncompressedSizeBytes: 2000,
+               TotalCount:            50,
+               BlocksCount:           5,
+               MinKey:                10,
+               MaxKey:                1000,
+               MinTimestamp:          &minTS,
+               MaxTimestamp:          &maxTS,
+               ID:                    12345,
+       }
+
+       data, err := original.marshal()
+       require.NoError(t, err)
+       assert.Contains(t, string(data), "minTimestamp")
+       assert.Contains(t, string(data), "maxTimestamp")
+
+       restored, err := unmarshalPartMetadata(data)
+       require.NoError(t, err)
+       require.NotNil(t, restored.MinTimestamp)
+       require.NotNil(t, restored.MaxTimestamp)
+       assert.Equal(t, *original.MinTimestamp, *restored.MinTimestamp)
+       assert.Equal(t, *original.MaxTimestamp, *restored.MaxTimestamp)
+}
+
 func TestBlockMetadata_Serialization(t *testing.T) {
        original := &blockMetadata{
                seriesID:  common.SeriesID(123),
diff --git a/banyand/internal/sidx/part_wrapper.go 
b/banyand/internal/sidx/part_wrapper.go
index 7d7ecf84c..be6fd7971 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -185,29 +185,34 @@ func (pw *partWrapper) overlapsKeyRange(minKey, maxKey 
int64) bool {
        if pw.p == nil {
                return false
        }
-
-       // Validate input range
        if minKey > maxKey {
                return false
        }
-
-       // Check if part metadata is available
        if pw.p.partMetadata == nil {
-               // If no metadata available, assume overlap to be safe
-               // This ensures we don't skip parts that might contain relevant 
data
                return true
        }
-
        pm := pw.p.partMetadata
-
-       // Check for non-overlapping ranges using De Morgan's law:
-       // Two ranges [a,b] and [c,d] don't overlap if: b < c OR a > d
-       // Therefore, they DO overlap if: NOT(b < c OR a > d) = (b >= c AND a 
<= d)
-       // Simplified: part.MaxKey >= query.MinKey AND part.MinKey <= 
query.MaxKey
        if pm.MaxKey < minKey || pm.MinKey > maxKey {
                return false
        }
+       return true
+}
 
-       // Ranges overlap
+// overlapsTimestampRange checks if the part overlaps with the given timestamp 
range.
+// Returns true when part has no timestamp metadata (fallback) or when ranges 
overlap.
+func (pw *partWrapper) overlapsTimestampRange(minTS, maxTS int64) bool {
+       if pw.p == nil {
+               return false
+       }
+       if pw.p.partMetadata == nil {
+               return true
+       }
+       pm := pw.p.partMetadata
+       if pm.MinTimestamp == nil || pm.MaxTimestamp == nil {
+               return true
+       }
+       if *pm.MaxTimestamp < minTS || *pm.MinTimestamp > maxTS {
+               return false
+       }
        return true
 }
diff --git a/banyand/internal/sidx/part_wrapper_test.go 
b/banyand/internal/sidx/part_wrapper_test.go
index c2267808b..565cdf8f5 100644
--- a/banyand/internal/sidx/part_wrapper_test.go
+++ b/banyand/internal/sidx/part_wrapper_test.go
@@ -574,3 +574,38 @@ func TestPartWrapper_OverlapsKeyRange_EdgeCases(t 
*testing.T) {
                assert.False(t, pw.overlapsKeyRange(25, 15))
        })
 }
+
+func TestPartWrapper_OverlapsTimestampRange(t *testing.T) {
+       minTS := int64(100)
+       maxTS := int64(200)
+       tests := []struct { //nolint:govet // test table struct field order 
prioritizes readability
+               name      string
+               partMinTS *int64
+               partMaxTS *int64
+               queryMin  int64
+               queryMax  int64
+               expected  bool
+       }{
+               {name: "overlap", partMinTS: &minTS, partMaxTS: &maxTS, 
queryMin: 50, queryMax: 150, expected: true},
+               {name: "no_overlap_left", partMinTS: &minTS, partMaxTS: &maxTS, 
queryMin: 1, queryMax: 50, expected: false},
+               {name: "no_overlap_right", partMinTS: &minTS, partMaxTS: 
&maxTS, queryMin: 250, queryMax: 300, expected: false},
+               {name: "nil_timestamps_fallback", partMinTS: nil, partMaxTS: 
nil, queryMin: 1, queryMax: 1000, expected: true},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       p := &part{
+                               path: "/test/part/001",
+                               partMetadata: &partMetadata{
+                                       ID:           1,
+                                       MinKey:       1,
+                                       MaxKey:       1000,
+                                       MinTimestamp: tt.partMinTS,
+                                       MaxTimestamp: tt.partMaxTS,
+                               },
+                       }
+                       pw := newPartWrapper(nil, p)
+                       assert.Equal(t, tt.expected, 
pw.overlapsTimestampRange(tt.queryMin, tt.queryMax))
+               })
+       }
+}
diff --git a/banyand/internal/sidx/query.go b/banyand/internal/sidx/query.go
index 65f55ff91..98d940ac2 100644
--- a/banyand/internal/sidx/query.go
+++ b/banyand/internal/sidx/query.go
@@ -160,8 +160,7 @@ func (s *sidx) prepareStreamingResources(
 
        minKey, maxKey := extractKeyRange(req)
        asc := extractOrdering(req)
-
-       parts := selectPartsForQuery(snap, minKey, maxKey)
+       parts := selectPartsForQuery(snap, minKey, maxKey, req.MinTimestamp, 
req.MaxTimestamp)
        if span != nil {
                span.Tagf("min_key", "%d", minKey)
                span.Tagf("max_key", "%d", maxKey)
@@ -484,15 +483,17 @@ func extractOrdering(req QueryRequest) bool {
        return req.Order.Sort != modelv1.Sort_SORT_DESC
 }
 
-// selectPartsForQuery selects relevant parts from snapshot based on key range.
-func selectPartsForQuery(snap *Snapshot, minKey, maxKey int64) []*part {
+// selectPartsForQuery selects relevant parts from snapshot based on key range 
and optional timestamp range.
+func selectPartsForQuery(snap *Snapshot, minKey, maxKey int64, minTS, maxTS 
*int64) []*part {
        var selectedParts []*part
-
        for _, pw := range snap.parts {
-               if pw.overlapsKeyRange(minKey, maxKey) {
-                       selectedParts = append(selectedParts, pw.p)
+               if !pw.overlapsKeyRange(minKey, maxKey) {
+                       continue
                }
+               if minTS != nil && maxTS != nil && 
!pw.overlapsTimestampRange(*minTS, *maxTS) {
+                       continue
+               }
+               selectedParts = append(selectedParts, pw.p)
        }
-
        return selectedParts
 }
diff --git a/banyand/internal/sidx/query_benchmark_test.go 
b/banyand/internal/sidx/query_benchmark_test.go
index 5e6587904..31f2eca3b 100644
--- a/banyand/internal/sidx/query_benchmark_test.go
+++ b/banyand/internal/sidx/query_benchmark_test.go
@@ -229,7 +229,7 @@ func createBenchmarkSIDX(b *testing.B) SIDX {
 func introduceBenchmarkMemPart(tb testing.TB, sidx SIDX, reqs []WriteRequest, 
segmentID int64, partID uint64) {
        tb.Helper()
 
-       memPart, err := sidx.ConvertToMemPart(reqs, segmentID)
+       memPart, err := sidx.ConvertToMemPart(reqs, segmentID, nil, nil)
        if err != nil {
                tb.Fatalf("failed to convert requests to memPart: %v", err)
        }
diff --git a/banyand/internal/sidx/scan_query.go 
b/banyand/internal/sidx/scan_query.go
index 93a14592e..d625aea89 100644
--- a/banyand/internal/sidx/scan_query.go
+++ b/banyand/internal/sidx/scan_query.go
@@ -52,6 +52,9 @@ func (s *sidx) ScanQuery(ctx context.Context, req 
ScanQueryRequest) ([]*QueryRes
                maxKey = *req.MaxKey
        }
 
+       // Select parts that overlap key and optional timestamp range
+       partsToScan := selectPartsForScan(snap, minKey, maxKey, 
req.MinTimestamp, req.MaxTimestamp)
+
        var results []*QueryResponse
 
        // Prepare Tags map if projection is specified
@@ -73,9 +76,9 @@ func (s *sidx) ScanQuery(ctx context.Context, req 
ScanQueryRequest) ([]*QueryRes
                Tags:    tagsMap,
        }
 
-       // Scan all parts
-       totalParts := len(snap.parts)
-       for partIdx, pw := range snap.parts {
+       // Scan selected parts
+       totalParts := len(partsToScan)
+       for partIdx, pw := range partsToScan {
                var err error
                if currentBatch, err = s.scanPart(ctx, pw, req, minKey, maxKey, 
&results, currentBatch, maxBatchSize); err != nil {
                        return nil, err
@@ -102,6 +105,21 @@ func (s *sidx) ScanQuery(ctx context.Context, req 
ScanQueryRequest) ([]*QueryRes
        return results, nil
 }
 
+// selectPartsForScan selects parts that overlap key and optional timestamp 
range for scan queries.
+func selectPartsForScan(snap *Snapshot, minKey, maxKey int64, minTS, maxTS 
*int64) []*partWrapper {
+       var selected []*partWrapper
+       for _, pw := range snap.parts {
+               if !pw.overlapsKeyRange(minKey, maxKey) {
+                       continue
+               }
+               if minTS != nil && maxTS != nil && 
!pw.overlapsTimestampRange(*minTS, *maxTS) {
+                       continue
+               }
+               selected = append(selected, pw)
+       }
+       return selected
+}
+
 func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req 
ScanQueryRequest,
        minKey, maxKey int64, results *[]*QueryResponse, currentBatch 
*QueryResponse,
        maxBatchSize int,
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 7778610e1..8cd05755d 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -77,7 +77,7 @@ func NewSIDX(fileSystem fs.FileSystem, opts *Options) (SIDX, 
error) {
 }
 
 // ConvertToMemPart converts a write request to a memPart.
-func (s *sidx) ConvertToMemPart(reqs []WriteRequest, segmentID int64) 
(*MemPart, error) {
+func (s *sidx) ConvertToMemPart(reqs []WriteRequest, segmentID int64, 
minTimestamp, maxTimestamp *int64) (*MemPart, error) {
        // Validate requests
        for _, req := range reqs {
                if err := req.Validate(); err != nil {
@@ -100,6 +100,8 @@ func (s *sidx) ConvertToMemPart(reqs []WriteRequest, 
segmentID int64) (*MemPart,
        mp := GenerateMemPart()
        mp.mustInitFromElements(es)
        mp.partMetadata.SegmentID = segmentID
+       mp.partMetadata.MinTimestamp = minTimestamp
+       mp.partMetadata.MaxTimestamp = maxTimestamp
        return mp, nil
 }
 
diff --git a/banyand/internal/sidx/sidx_test.go 
b/banyand/internal/sidx/sidx_test.go
index bd7f7bba8..fab62b89a 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -163,7 +163,7 @@ func createTestSIDXWithOptions(t *testing.T, tweak 
func(*Options)) SIDX {
 
 func writeTestData(t *testing.T, sidx SIDX, reqs []WriteRequest, segmentID 
int64, partID uint64) {
        // Convert write requests to MemPart
-       memPart, err := sidx.ConvertToMemPart(reqs, segmentID)
+       memPart, err := sidx.ConvertToMemPart(reqs, segmentID, nil, nil)
        require.NoError(t, err)
        require.NotNil(t, memPart)
 
@@ -315,7 +315,7 @@ func TestSIDX_Write_Validation(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       _, err := sidx.ConvertToMemPart([]WriteRequest{tt.req}, 
12) // Test with segmentID=12
+                       _, err := sidx.ConvertToMemPart([]WriteRequest{tt.req}, 
12, nil, nil) // Test with segmentID=12
                        if tt.expectErr {
                                assert.Error(t, err)
                        } else {
@@ -588,7 +588,7 @@ func TestSIDX_ConcurrentWrites(t *testing.T) {
                        }
 
                        // Convert to MemPart and introduce
-                       memPart, err := sidx.ConvertToMemPart(reqs, 
int64(goroutineID+12)) // Test with varied segmentID
+                       memPart, err := sidx.ConvertToMemPart(reqs, 
int64(goroutineID+12), nil, nil) // Test with varied segmentID
                        if err != nil {
                                errors <- err
                                return
@@ -670,7 +670,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) {
                                        fmt.Sprintf("writer-%d-data-%d", 
writerID, writeCount),
                                )
                                // Convert to MemPart and introduce (ignore 
errors during concurrent stress)
-                               if memPart, err := 
sidx.ConvertToMemPart([]WriteRequest{req}, int64(writerID+13)); err == nil { // 
Test with varied segmentID
+                               if memPart, err := 
sidx.ConvertToMemPart([]WriteRequest{req}, int64(writerID+13), nil, nil); err 
== nil { // Test with varied segmentID
                                        
sidx.IntroduceMemPart(uint64(writerID+13), memPart) // Test with varied partID
                                }
                                writeCount++
diff --git a/banyand/internal/sidx/snapshot_test.go 
b/banyand/internal/sidx/snapshot_test.go
index 361995456..f870a70c1 100644
--- a/banyand/internal/sidx/snapshot_test.go
+++ b/banyand/internal/sidx/snapshot_test.go
@@ -32,7 +32,7 @@ import (
 // writeTestData is a helper function for writing test data using the new 
pattern.
 func writeTestDataToSIDX(t *testing.T, sidx SIDX, reqs []WriteRequest, 
segmentID int64, partID uint64) {
        // Convert write requests to MemPart
-       memPart, err := sidx.ConvertToMemPart(reqs, segmentID)
+       memPart, err := sidx.ConvertToMemPart(reqs, segmentID, nil, nil)
        if err != nil {
                t.Errorf("ConvertToMemPart failed: %v", err)
                return
diff --git a/banyand/internal/sidx/timestamp_test.go 
b/banyand/internal/sidx/timestamp_test.go
new file mode 100644
index 000000000..fc7aaa9d6
--- /dev/null
+++ b/banyand/internal/sidx/timestamp_test.go
@@ -0,0 +1,394 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "context"
+       "encoding/json"
+       "path/filepath"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+// writeTestDataWithTimeRange writes test data with optional min/max 
timestamps (back-compat: nil means no timestamps).
+func writeTestDataWithTimeRange(t *testing.T, sidx SIDX, reqs []WriteRequest, 
segmentID int64, partID uint64, minTS, maxTS *int64) {
+       t.Helper()
+       memPart, err := sidx.ConvertToMemPart(reqs, segmentID, minTS, maxTS)
+       require.NoError(t, err)
+       require.NotNil(t, memPart)
+       sidx.IntroduceMemPart(partID, memPart)
+}
+
+func ptrInt64Ts(v int64) *int64 {
+       val := v
+       return &val
+}
+
+// TestConvertToMemPart_Timestamps verifies ConvertToMemPart with and without 
timestamps.
+func TestConvertToMemPart_Timestamps(t *testing.T) {
+       sidx := createTestSIDX(t)
+       defer func() {
+               assert.NoError(t, sidx.Close())
+       }()
+
+       reqs := []WriteRequest{
+               createTestWriteRequest(1, 1000, "data1"),
+               createTestWriteRequest(1, 2000, "data2"),
+       }
+
+       t.Run("nil_timestamps_back_compat", func(t *testing.T) {
+               mp, err := sidx.ConvertToMemPart(reqs, 1, nil, nil)
+               require.NoError(t, err)
+               require.NotNil(t, mp)
+               require.Nil(t, mp.partMetadata.MinTimestamp)
+               require.Nil(t, mp.partMetadata.MaxTimestamp)
+               ReleaseMemPart(mp)
+       })
+
+       t.Run("with_timestamps", func(t *testing.T) {
+               minTS := int64(500)
+               maxTS := int64(2500)
+               mp, err := sidx.ConvertToMemPart(reqs, 1, &minTS, &maxTS)
+               require.NoError(t, err)
+               require.NotNil(t, mp)
+               require.NotNil(t, mp.partMetadata.MinTimestamp)
+               require.NotNil(t, mp.partMetadata.MaxTimestamp)
+               assert.Equal(t, int64(500), *mp.partMetadata.MinTimestamp)
+               assert.Equal(t, int64(2500), *mp.partMetadata.MaxTimestamp)
+               ReleaseMemPart(mp)
+       })
+}
+
+// TestManifest_BackCompat verifies manifest.json omits timestamps when nil 
(back-compat).
+func TestManifest_BackCompat(t *testing.T) {
+       dir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       opts := NewDefaultOptions()
+       opts.Memory = protector.NewMemory(observability.NewBypassRegistry())
+       opts.Path = dir
+
+       sidxIface, err := NewSIDX(fileSystem, opts)
+       require.NoError(t, err)
+       raw := sidxIface.(*sidx)
+       defer func() {
+               assert.NoError(t, raw.Close())
+       }()
+
+       reqs := []WriteRequest{
+               createTestWriteRequest(1, 100, "data"),
+       }
+
+       t.Run("manifest_without_timestamps", func(t *testing.T) {
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 1, nil, nil)
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               manifestPath := filepath.Join(dir, "0000000000000001", 
manifestFilename)
+               data, readErr := fileSystem.Read(manifestPath)
+               require.NoError(t, readErr)
+
+               var m struct {
+                       MinTimestamp *int64 `json:"minTimestamp,omitempty"`
+                       MaxTimestamp *int64 `json:"maxTimestamp,omitempty"`
+                       MinKey       int64  `json:"minKey"`
+                       MaxKey       int64  `json:"maxKey"`
+               }
+               require.NoError(t, json.Unmarshal(data, &m))
+               require.Nil(t, m.MinTimestamp)
+               require.Nil(t, m.MaxTimestamp)
+               assert.Equal(t, int64(100), m.MinKey)
+               assert.Equal(t, int64(100), m.MaxKey)
+       })
+
+       t.Run("manifest_with_timestamps", func(t *testing.T) {
+               minTS := int64(1000)
+               maxTS := int64(2000)
+               writeTestDataWithTimeRange(t, raw, reqs, 2, 2, &minTS, &maxTS)
+               flushIntro, err := raw.Flush(map[uint64]struct{}{2: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               manifestPath := filepath.Join(dir, "0000000000000002", 
manifestFilename)
+               data, readErr := fileSystem.Read(manifestPath)
+               require.NoError(t, readErr)
+
+               var m struct {
+                       MinTimestamp *int64 `json:"minTimestamp,omitempty"`
+                       MaxTimestamp *int64 `json:"maxTimestamp,omitempty"`
+               }
+               require.NoError(t, json.Unmarshal(data, &m))
+               require.NotNil(t, m.MinTimestamp)
+               require.NotNil(t, m.MaxTimestamp)
+               assert.Equal(t, int64(1000), *m.MinTimestamp)
+               assert.Equal(t, int64(2000), *m.MaxTimestamp)
+       })
+}
+
+// TestQuery_TimestampSelection verifies timestamp-aware part selection and 
key-range fallback.
+func TestQuery_TimestampSelection(t *testing.T) {
+       reqs := []WriteRequest{
+               createTestWriteRequest(1, 1000, "data1000"),
+               createTestWriteRequest(1, 2000, "data2000"),
+       }
+
+       t.Run("back_compat_key_range_only", func(t *testing.T) {
+               sidx := createTestSIDX(t)
+               defer func() {
+                       assert.NoError(t, sidx.Close())
+               }()
+               writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, nil, nil)
+               waitForIntroducerLoop()
+
+               minKey := int64(500)
+               maxKey := int64(2500)
+               qr := QueryRequest{
+                       SeriesIDs: []common.SeriesID{1},
+                       MinKey:    &minKey,
+                       MaxKey:    &maxKey,
+               }
+               resultsCh, errCh := sidx.StreamingQuery(context.Background(), 
qr)
+               var keys []int64
+               for res := range resultsCh {
+                       require.NoError(t, res.Error)
+                       keys = append(keys, res.Keys...)
+               }
+               select {
+               case err, ok := <-errCh:
+                       if ok {
+                               require.NoError(t, err)
+                       }
+               default:
+               }
+               assert.GreaterOrEqual(t, len(keys), 0, "key-range filter should 
work without timestamps")
+       })
+
+       t.Run("timestamp_filter_overlaps", func(t *testing.T) {
+               sidx := createTestSIDX(t)
+               defer func() {
+                       assert.NoError(t, sidx.Close())
+               }()
+               minTS := ptrInt64Ts(1000)
+               maxTS := ptrInt64Ts(3000)
+               writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, minTS, maxTS)
+               waitForIntroducerLoop()
+
+               qMinTS := ptrInt64Ts(1500)
+               qMaxTS := ptrInt64Ts(2500)
+               qr := QueryRequest{
+                       SeriesIDs:    []common.SeriesID{1},
+                       MinTimestamp: qMinTS,
+                       MaxTimestamp: qMaxTS,
+               }
+               resultsCh, errCh := sidx.StreamingQuery(context.Background(), 
qr)
+               var count int
+               for res := range resultsCh {
+                       require.NoError(t, res.Error)
+                       count += res.Len()
+               }
+               select {
+               case err, ok := <-errCh:
+                       if ok {
+                               require.NoError(t, err)
+                       }
+               default:
+               }
+               assert.GreaterOrEqual(t, count, 0, "query with overlapping 
timestamp range")
+       })
+
+       t.Run("timestamp_filter_no_overlap_excludes_part", func(t *testing.T) {
+               sidx := createTestSIDX(t)
+               defer func() {
+                       assert.NoError(t, sidx.Close())
+               }()
+               minTS := ptrInt64Ts(1000)
+               maxTS := ptrInt64Ts(2000)
+               writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, minTS, maxTS)
+               waitForIntroducerLoop()
+
+               qMinTS := ptrInt64Ts(5000)
+               qMaxTS := ptrInt64Ts(6000)
+               qr := QueryRequest{
+                       SeriesIDs:    []common.SeriesID{1},
+                       MinTimestamp: qMinTS,
+                       MaxTimestamp: qMaxTS,
+               }
+               resultsCh, errCh := sidx.StreamingQuery(context.Background(), 
qr)
+               var count int
+               for res := range resultsCh {
+                       require.NoError(t, res.Error)
+                       count += res.Len()
+               }
+               select {
+               case err, ok := <-errCh:
+                       if ok {
+                               require.NoError(t, err)
+                       }
+               default:
+               }
+               assert.Equal(t, 0, count, "parts outside timestamp range should 
be excluded")
+       })
+}
+
+// TestScanQuery_TimestampSelection verifies ScanQuery timestamp-aware part 
selection.
+func TestScanQuery_TimestampSelection(t *testing.T) {
+       reqs := []WriteRequest{
+               createTestWriteRequest(1, 100, "data"),
+       }
+
+       t.Run("back_compat_no_timestamp_filter", func(t *testing.T) {
+               sidx := createTestSIDX(t)
+               defer func() {
+                       assert.NoError(t, sidx.Close())
+               }()
+               writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, nil, nil)
+               waitForIntroducerLoop()
+
+               sqr := ScanQueryRequest{}
+               res, err := sidx.ScanQuery(context.Background(), sqr)
+               require.NoError(t, err)
+               assert.GreaterOrEqual(t, len(res), 0)
+       })
+
+       t.Run("timestamp_filter_excludes_non_overlapping", func(t *testing.T) {
+               sidx := createTestSIDX(t)
+               defer func() {
+                       assert.NoError(t, sidx.Close())
+               }()
+               minTS := ptrInt64Ts(1000)
+               maxTS := ptrInt64Ts(2000)
+               writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, minTS, maxTS)
+               waitForIntroducerLoop()
+
+               sqr := ScanQueryRequest{
+                       MinTimestamp: ptrInt64Ts(5000),
+                       MaxTimestamp: ptrInt64Ts(6000),
+               }
+               res, err := sidx.ScanQuery(context.Background(), sqr)
+               require.NoError(t, err)
+               total := 0
+               for _, r := range res {
+                       total += r.Len()
+               }
+               assert.Equal(t, 0, total, "parts outside timestamp range should 
be excluded in scan")
+       })
+}
+
+// TestMerge_TimestampPropagation verifies merge aggregates timestamps and 
back-compat when absent.
+func TestMerge_TimestampPropagation(t *testing.T) {
+       reqs := []WriteRequest{
+               createTestWriteRequest(1, 100, "data"),
+       }
+
+       t.Run("merge_parts_without_timestamps_back_compat", func(t *testing.T) {
+               dir := t.TempDir()
+               fileSystem := fs.NewLocalFileSystem()
+               opts := NewDefaultOptions()
+               opts.Memory = 
protector.NewMemory(observability.NewBypassRegistry())
+               opts.Path = dir
+               opts.AvailablePartIDs = []uint64{1, 2, 10}
+
+               sidxIface, err := NewSIDX(fileSystem, opts)
+               require.NoError(t, err)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 1, nil, nil)
+               writeTestDataWithTimeRange(t, raw, reqs, 2, 2, nil, nil)
+               waitForIntroducerLoop()
+
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}, 2: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               mergeIntro, mergeErr := raw.Merge(nil, map[uint64]struct{}{1: 
{}, 2: {}}, 10)
+               require.NoError(t, mergeErr)
+               require.NotNil(t, mergeIntro)
+               raw.IntroduceMerged(mergeIntro)()
+
+               manifestPath := filepath.Join(dir, "000000000000000a", 
manifestFilename)
+               data, readErr := fileSystem.Read(manifestPath)
+               require.NoError(t, readErr)
+
+               var m struct {
+                       MinTimestamp *int64 `json:"minTimestamp,omitempty"`
+                       MaxTimestamp *int64 `json:"maxTimestamp,omitempty"`
+               }
+               require.NoError(t, json.Unmarshal(data, &m))
+               require.Nil(t, m.MinTimestamp)
+               require.Nil(t, m.MaxTimestamp)
+       })
+
+       t.Run("merge_parts_with_timestamps_aggregates", func(t *testing.T) {
+               dir := t.TempDir()
+               fileSystem := fs.NewLocalFileSystem()
+               opts := NewDefaultOptions()
+               opts.Memory = 
protector.NewMemory(observability.NewBypassRegistry())
+               opts.Path = dir
+               opts.AvailablePartIDs = []uint64{1, 2, 10}
+
+               sidxIface, err := NewSIDX(fileSystem, opts)
+               require.NoError(t, err)
+               raw := sidxIface.(*sidx)
+               defer func() {
+                       assert.NoError(t, raw.Close())
+               }()
+
+               min1, max1 := int64(100), int64(200)
+               min2, max2 := int64(150), int64(300)
+               writeTestDataWithTimeRange(t, raw, reqs, 1, 1, &min1, &max1)
+               writeTestDataWithTimeRange(t, raw, reqs, 2, 2, &min2, &max2)
+               waitForIntroducerLoop()
+
+               flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}, 2: {}})
+               require.NoError(t, err)
+               raw.IntroduceFlushed(flushIntro)
+               flushIntro.Release()
+
+               mergeIntro, mergeErr := raw.Merge(nil, map[uint64]struct{}{1: 
{}, 2: {}}, 10)
+               require.NoError(t, mergeErr)
+               require.NotNil(t, mergeIntro)
+               raw.IntroduceMerged(mergeIntro)()
+
+               manifestPath := filepath.Join(dir, "000000000000000a", 
manifestFilename)
+               data, readErr := fileSystem.Read(manifestPath)
+               require.NoError(t, readErr)
+
+               var m struct {
+                       MinTimestamp *int64 `json:"minTimestamp,omitempty"`
+                       MaxTimestamp *int64 `json:"maxTimestamp,omitempty"`
+               }
+               require.NoError(t, json.Unmarshal(data, &m))
+               require.NotNil(t, m.MinTimestamp)
+               require.NotNil(t, m.MaxTimestamp)
+               assert.Equal(t, int64(100), *m.MinTimestamp)
+               assert.Equal(t, int64(300), *m.MaxTimestamp)
+       })
+}
diff --git a/banyand/trace/streaming_pipeline_test.go 
b/banyand/trace/streaming_pipeline_test.go
index 863c32206..f1435c246 100644
--- a/banyand/trace/streaming_pipeline_test.go
+++ b/banyand/trace/streaming_pipeline_test.go
@@ -59,7 +59,7 @@ func (f *fakeSIDX) StreamingQuery(ctx context.Context, _ 
sidx.QueryRequest) (<-c
 func (f *fakeSIDX) IntroduceMemPart(uint64, *sidx.MemPart)          { 
panic("not implemented") }
 func (f *fakeSIDX) IntroduceFlushed(*sidx.FlusherIntroduction)      {}
 func (f *fakeSIDX) IntroduceMerged(*sidx.MergerIntroduction) func() { return 
func() {} }
-func (f *fakeSIDX) ConvertToMemPart([]sidx.WriteRequest, int64) 
(*sidx.MemPart, error) {
+func (f *fakeSIDX) ConvertToMemPart([]sidx.WriteRequest, int64, *int64, 
*int64) (*sidx.MemPart, error) {
        panic("not implemented")
 }
 
@@ -654,7 +654,7 @@ func (f *fakeSIDXInfinite) StreamingQuery(ctx 
context.Context, _ sidx.QueryReque
 func (f *fakeSIDXInfinite) IntroduceMemPart(uint64, *sidx.MemPart)          { 
panic("not implemented") }
 func (f *fakeSIDXInfinite) IntroduceFlushed(*sidx.FlusherIntroduction)      {}
 func (f *fakeSIDXInfinite) IntroduceMerged(*sidx.MergerIntroduction) func() { 
return func() {} }
-func (f *fakeSIDXInfinite) ConvertToMemPart([]sidx.WriteRequest, int64) 
(*sidx.MemPart, error) {
+func (f *fakeSIDXInfinite) ConvertToMemPart([]sidx.WriteRequest, int64, 
*int64, *int64) (*sidx.MemPart, error) {
        panic("not implemented")
 }
 
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 34bf7c719..3c2ec1f99 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -222,8 +222,10 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx 
instance")
                                                continue
                                        }
+                                       minTS := es.timeRange.Start.UnixNano()
+                                       maxTS := es.timeRange.End.UnixNano()
                                        var siMemPart *sidx.MemPart
-                                       if siMemPart, err = 
sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano()); err != 
nil {
+                                       if siMemPart, err = 
sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano(), &minTS, 
&maxTS); err != nil {
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary 
index")
                                                continue
                                        }
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index 077d1e86a..eb3c16db6 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -435,8 +435,10 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx 
instance")
                                                continue
                                        }
+                                       minTS := es.timeRange.Start.UnixNano()
+                                       maxTS := es.timeRange.End.UnixNano()
                                        var siMemPart *sidx.MemPart
-                                       if siMemPart, err = 
sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano()); err != 
nil {
+                                       if siMemPart, err = 
sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano(), &minTS, 
&maxTS); err != nil {
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary 
index")
                                                continue
                                        }


Reply via email to