This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/sidx-timerange in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ef40900ab9f0170e32c06a48b4501ebe612681be Author: Hongtao Gao <[email protected]> AuthorDate: Fri Mar 13 12:55:54 2026 +0800 Enhance SIDX with optional timestamp support for part selection and validation - Introduced MinTimestamp and MaxTimestamp fields in partMetadata and updated related structures. - Updated ConvertToMemPart to accept optional timestamp parameters. - Modified filtering logic in filterPartsByTimeRange to prioritize timestamp overlap. - Enhanced validation checks for timestamp ranges in QueryRequest and ScanQueryRequest. - Added tests for timestamp validation and serialization in partMetadata. - Updated relevant functions to handle timestamp ranges in queries and scans. This change improves the flexibility of part selection based on both key and timestamp ranges. --- banyand/cmd/dump/sidx.go | 20 +- banyand/internal/sidx/interfaces.go | 27 +- banyand/internal/sidx/merge.go | 22 ++ banyand/internal/sidx/metadata.go | 21 +- banyand/internal/sidx/metadata_test.go | 59 ++++ banyand/internal/sidx/part_wrapper.go | 31 +- banyand/internal/sidx/part_wrapper_test.go | 35 +++ banyand/internal/sidx/query.go | 17 +- banyand/internal/sidx/query_benchmark_test.go | 2 +- banyand/internal/sidx/scan_query.go | 24 +- banyand/internal/sidx/sidx.go | 4 +- banyand/internal/sidx/sidx_test.go | 8 +- banyand/internal/sidx/snapshot_test.go | 2 +- banyand/internal/sidx/timestamp_test.go | 394 ++++++++++++++++++++++++++ banyand/trace/streaming_pipeline_test.go | 4 +- banyand/trace/write_liaison.go | 4 +- banyand/trace/write_standalone.go | 4 +- 17 files changed, 625 insertions(+), 53 deletions(-) diff --git a/banyand/cmd/dump/sidx.go b/banyand/cmd/dump/sidx.go index c084119b1..50cd17f71 100644 --- a/banyand/cmd/dump/sidx.go +++ b/banyand/cmd/dump/sidx.go @@ -280,26 +280,30 @@ func filterPartsByTimeRange(sidxPath string, partIDs []uint64, minKey, maxKey in partPath := filepath.Join(sidxPath, fmt.Sprintf("%016x", partID)) manifestPath := filepath.Join(partPath, "manifest.json") - // Read manifest.json manifestData, err := fileSystem.Read(manifestPath) if err != nil { - // Skip parts that don't have a manifest continue } - // Parse manifest to get minKey and maxKey var manifest struct { - MinKey int64 `json:"minKey"` - MaxKey int64 `json:"maxKey"` + MinTimestamp *int64 `json:"minTimestamp,omitempty"` + MaxTimestamp *int64 `json:"maxTimestamp,omitempty"` + MinKey int64 `json:"minKey"` + MaxKey int64 `json:"maxKey"` } - if err := json.Unmarshal(manifestData, &manifest); err != nil { fmt.Fprintf(os.Stderr, "Warning: failed to parse manifest for part %016x: %v\n", partID, err) continue } - // Check if part overlaps with the requested time range - if manifest.MaxKey >= minKey && manifest.MinKey <= maxKey { + // Prefer timestamp overlap when manifest has minTimestamp/maxTimestamp; otherwise fallback to key range + overlaps := false + if manifest.MinTimestamp != nil && manifest.MaxTimestamp != nil { + overlaps = *manifest.MaxTimestamp >= minKey && *manifest.MinTimestamp <= maxKey + } else { + overlaps = manifest.MaxKey >= minKey && manifest.MinKey <= maxKey + } + if overlaps { filteredParts = append(filteredParts, partID) } } diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index d0c3e5d50..e15d545de 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -62,7 +62,8 @@ type SIDX interface { // IntroduceMerged introduces a merged map and a new part to the SIDX instance. IntroduceMerged(nextIntroduction *MergerIntroduction) func() // ConvertToMemPart converts a write request to a memPart. - ConvertToMemPart(reqs []WriteRequest, segmentID int64) (*MemPart, error) + // minTimestamp and maxTimestamp are optional; when provided they are stored in manifest.json for part selection. + ConvertToMemPart(reqs []WriteRequest, segmentID int64, minTimestamp, maxTimestamp *int64) (*MemPart, error) // StreamingQuery executes the query and streams batched QueryResponse objects. // The returned QueryResponse channel contains ordered batches limited by req.MaxBatchSize // unique Data elements (when positive). The error channel delivers any fatal execution error. @@ -107,6 +108,8 @@ type QueryRequest struct { Order *index.OrderBy MinKey *int64 MaxKey *int64 + MinTimestamp *int64 + MaxTimestamp *int64 SeriesIDs []common.SeriesID TagProjection []model.TagProjection MaxBatchSize int @@ -126,6 +129,8 @@ type ScanQueryRequest struct { OnProgress ScanProgressFunc MinKey *int64 MaxKey *int64 + MinTimestamp *int64 + MaxTimestamp *int64 TagProjection []model.TagProjection MaxBatchSize int } @@ -351,6 +356,9 @@ func (qr QueryRequest) Validate() error { if qr.MinKey != nil && qr.MaxKey != nil && *qr.MinKey > *qr.MaxKey { return fmt.Errorf("MinKey cannot be greater than MaxKey") } + if qr.MinTimestamp != nil && qr.MaxTimestamp != nil && *qr.MinTimestamp > *qr.MaxTimestamp { + return fmt.Errorf("MinTimestamp cannot be greater than MaxTimestamp") + } return nil } @@ -362,6 +370,9 @@ func (sqr ScanQueryRequest) Validate() error { if sqr.MinKey != nil && sqr.MaxKey != nil && *sqr.MinKey > *sqr.MaxKey { return fmt.Errorf("MinKey cannot be greater than MaxKey") } + if sqr.MinTimestamp != nil && sqr.MaxTimestamp != nil && *sqr.MinTimestamp > *sqr.MaxTimestamp { + return fmt.Errorf("MinTimestamp cannot be greater than MaxTimestamp") + } return nil } @@ -374,6 +385,8 @@ func (qr *QueryRequest) Reset() { qr.MaxBatchSize = 0 qr.MinKey = nil qr.MaxKey = nil + qr.MinTimestamp = nil + qr.MaxTimestamp = nil } // CopyFrom copies the QueryRequest from other to qr. @@ -413,6 +426,18 @@ func (qr *QueryRequest) CopyFrom(other *QueryRequest) { } else { qr.MaxKey = nil } + if other.MinTimestamp != nil { + minTS := *other.MinTimestamp + qr.MinTimestamp = &minTS + } else { + qr.MinTimestamp = nil + } + if other.MaxTimestamp != nil { + maxTS := *other.MaxTimestamp + qr.MaxTimestamp = &maxTS + } else { + qr.MaxTimestamp = nil + } } // Interface Usage Examples and Best Practices diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go index a537e3881..157709698 100644 --- a/banyand/internal/sidx/merge.go +++ b/banyand/internal/sidx/merge.go @@ -103,6 +103,28 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, par if err != nil { return nil, err } + // Aggregate optional timestamp range from merged parts + var minVal, maxVal int64 + var hasMinTS, hasMaxTS bool + for i := range parts { + p := parts[i].p.partMetadata + if p.MinTimestamp != nil { + if !hasMinTS || *p.MinTimestamp < minVal { + minVal = *p.MinTimestamp + hasMinTS = true + } + } + if p.MaxTimestamp != nil { + if !hasMaxTS || *p.MaxTimestamp > maxVal { + maxVal = *p.MaxTimestamp + hasMaxTS = true + } + } + } + if hasMinTS && hasMaxTS { + pm.MinTimestamp = &minVal + pm.MaxTimestamp = &maxVal + } pm.mustWriteMetadata(fileSystem, dstPath) fileSystem.SyncPath(dstPath) p := mustOpenPart(partID, dstPath, fileSystem) diff --git a/banyand/internal/sidx/metadata.go b/banyand/internal/sidx/metadata.go index ba4a6f5f0..58e0c252c 100644 --- a/banyand/internal/sidx/metadata.go +++ b/banyand/internal/sidx/metadata.go @@ -36,19 +36,16 @@ import ( // partMetadata contains metadata for an entire part (replaces timestamp-specific metadata from stream module). type partMetadata struct { - // Size information + MinTimestamp *int64 `json:"minTimestamp,omitempty"` + MaxTimestamp *int64 `json:"maxTimestamp,omitempty"` CompressedSizeBytes uint64 `json:"compressedSizeBytes"` UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` TotalCount uint64 `json:"totalCount"` BlocksCount uint64 `json:"blocksCount"` - - // Key range - MinKey int64 `json:"minKey"` // Minimum user key in part - MaxKey int64 `json:"maxKey"` // Maximum user key in part - - // Identity - ID uint64 `json:"id"` // Unique part identifier - SegmentID int64 `json:"segmentID"` // Segment identifier + MinKey int64 `json:"minKey"` + MaxKey int64 `json:"maxKey"` + ID uint64 `json:"id"` + SegmentID int64 `json:"segmentID"` } func validatePartMetadata(fileSystem fs.FileSystem, partPath string) error { @@ -140,6 +137,8 @@ func (pm *partMetadata) reset() { pm.BlocksCount = 0 pm.MinKey = 0 pm.MaxKey = 0 + pm.MinTimestamp = nil + pm.MaxTimestamp = nil pm.ID = 0 } @@ -192,6 +191,10 @@ func (pm *partMetadata) validate() error { if pm.MinKey > pm.MaxKey { return fmt.Errorf("invalid key range: MinKey (%d) > MaxKey (%d)", pm.MinKey, pm.MaxKey) } + if pm.MinTimestamp != nil && pm.MaxTimestamp != nil && *pm.MinTimestamp > *pm.MaxTimestamp { + return fmt.Errorf("invalid timestamp range: minTimestamp (%d) > maxTimestamp (%d)", + *pm.MinTimestamp, *pm.MaxTimestamp) + } if pm.CompressedSizeBytes > pm.UncompressedSizeBytes { return fmt.Errorf("invalid size: compressed (%d) > uncompressed (%d)", pm.CompressedSizeBytes, pm.UncompressedSizeBytes) diff --git a/banyand/internal/sidx/metadata_test.go b/banyand/internal/sidx/metadata_test.go index 248bec420..09fe0b0c8 100644 --- a/banyand/internal/sidx/metadata_test.go +++ b/banyand/internal/sidx/metadata_test.go @@ -104,6 +104,37 @@ func TestPartMetadata_Validation(t *testing.T) { }, expectErr: false, }, + { + name: "invalid timestamp range - MinTimestamp > MaxTimestamp", + metadata: &partMetadata{ + CompressedSizeBytes: 100, + UncompressedSizeBytes: 200, + TotalCount: 10, + BlocksCount: 2, + MinKey: 1, + MaxKey: 100, + ID: 1, + MinTimestamp: ptrInt64(200), + MaxTimestamp: ptrInt64(100), + }, + expectErr: true, + errMsg: "invalid timestamp range", + }, + { + name: "valid metadata with optional timestamps", + metadata: &partMetadata{ + CompressedSizeBytes: 100, + UncompressedSizeBytes: 200, + TotalCount: 10, + BlocksCount: 2, + MinKey: 1, + MaxKey: 100, + MinTimestamp: ptrInt64(100), + MaxTimestamp: ptrInt64(200), + ID: 1, + }, + expectErr: false, + }, } for _, tt := range tests { @@ -453,6 +484,34 @@ func TestPartMetadata_Serialization(t *testing.T) { assert.Equal(t, original.ID, restored.ID) } +func TestPartMetadata_SerializationWithTimestamps(t *testing.T) { + minTS := int64(1000) + maxTS := int64(2000) + original := &partMetadata{ + CompressedSizeBytes: 1000, + UncompressedSizeBytes: 2000, + TotalCount: 50, + BlocksCount: 5, + MinKey: 10, + MaxKey: 1000, + MinTimestamp: &minTS, + MaxTimestamp: &maxTS, + ID: 12345, + } + + data, err := original.marshal() + require.NoError(t, err) + assert.Contains(t, string(data), "minTimestamp") + assert.Contains(t, string(data), "maxTimestamp") + + restored, err := unmarshalPartMetadata(data) + require.NoError(t, err) + require.NotNil(t, restored.MinTimestamp) + require.NotNil(t, restored.MaxTimestamp) + assert.Equal(t, *original.MinTimestamp, *restored.MinTimestamp) + assert.Equal(t, *original.MaxTimestamp, *restored.MaxTimestamp) +} + func TestBlockMetadata_Serialization(t *testing.T) { original := &blockMetadata{ seriesID: common.SeriesID(123), diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go index 7d7ecf84c..be6fd7971 100644 --- a/banyand/internal/sidx/part_wrapper.go +++ b/banyand/internal/sidx/part_wrapper.go @@ -185,29 +185,34 @@ func (pw *partWrapper) overlapsKeyRange(minKey, maxKey int64) bool { if pw.p == nil { return false } - - // Validate input range if minKey > maxKey { return false } - - // Check if part metadata is available if pw.p.partMetadata == nil { - // If no metadata available, assume overlap to be safe - // This ensures we don't skip parts that might contain relevant data return true } - pm := pw.p.partMetadata - - // Check for non-overlapping ranges using De Morgan's law: - // Two ranges [a,b] and [c,d] don't overlap if: b < c OR a > d - // Therefore, they DO overlap if: NOT(b < c OR a > d) = (b >= c AND a <= d) - // Simplified: part.MaxKey >= query.MinKey AND part.MinKey <= query.MaxKey if pm.MaxKey < minKey || pm.MinKey > maxKey { return false } + return true +} - // Ranges overlap +// overlapsTimestampRange checks if the part overlaps with the given timestamp range. +// Returns true when part has no timestamp metadata (fallback) or when ranges overlap. +func (pw *partWrapper) overlapsTimestampRange(minTS, maxTS int64) bool { + if pw.p == nil { + return false + } + if pw.p.partMetadata == nil { + return true + } + pm := pw.p.partMetadata + if pm.MinTimestamp == nil || pm.MaxTimestamp == nil { + return true + } + if *pm.MaxTimestamp < minTS || *pm.MinTimestamp > maxTS { + return false + } return true } diff --git a/banyand/internal/sidx/part_wrapper_test.go b/banyand/internal/sidx/part_wrapper_test.go index c2267808b..565cdf8f5 100644 --- a/banyand/internal/sidx/part_wrapper_test.go +++ b/banyand/internal/sidx/part_wrapper_test.go @@ -574,3 +574,38 @@ func TestPartWrapper_OverlapsKeyRange_EdgeCases(t *testing.T) { assert.False(t, pw.overlapsKeyRange(25, 15)) }) } + +func TestPartWrapper_OverlapsTimestampRange(t *testing.T) { + minTS := int64(100) + maxTS := int64(200) + tests := []struct { //nolint:govet // test table struct field order prioritizes readability + name string + partMinTS *int64 + partMaxTS *int64 + queryMin int64 + queryMax int64 + expected bool + }{ + {name: "overlap", partMinTS: &minTS, partMaxTS: &maxTS, queryMin: 50, queryMax: 150, expected: true}, + {name: "no_overlap_left", partMinTS: &minTS, partMaxTS: &maxTS, queryMin: 1, queryMax: 50, expected: false}, + {name: "no_overlap_right", partMinTS: &minTS, partMaxTS: &maxTS, queryMin: 250, queryMax: 300, expected: false}, + {name: "nil_timestamps_fallback", partMinTS: nil, partMaxTS: nil, queryMin: 1, queryMax: 1000, expected: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &part{ + path: "/test/part/001", + partMetadata: &partMetadata{ + ID: 1, + MinKey: 1, + MaxKey: 1000, + MinTimestamp: tt.partMinTS, + MaxTimestamp: tt.partMaxTS, + }, + } + pw := newPartWrapper(nil, p) + assert.Equal(t, tt.expected, pw.overlapsTimestampRange(tt.queryMin, tt.queryMax)) + }) + } +} diff --git a/banyand/internal/sidx/query.go b/banyand/internal/sidx/query.go index 65f55ff91..98d940ac2 100644 --- a/banyand/internal/sidx/query.go +++ b/banyand/internal/sidx/query.go @@ -160,8 +160,7 @@ func (s *sidx) prepareStreamingResources( minKey, maxKey := extractKeyRange(req) asc := extractOrdering(req) - - parts := selectPartsForQuery(snap, minKey, maxKey) + parts := selectPartsForQuery(snap, minKey, maxKey, req.MinTimestamp, req.MaxTimestamp) if span != nil { span.Tagf("min_key", "%d", minKey) span.Tagf("max_key", "%d", maxKey) @@ -484,15 +483,17 @@ func extractOrdering(req QueryRequest) bool { return req.Order.Sort != modelv1.Sort_SORT_DESC } -// selectPartsForQuery selects relevant parts from snapshot based on key range. -func selectPartsForQuery(snap *Snapshot, minKey, maxKey int64) []*part { +// selectPartsForQuery selects relevant parts from snapshot based on key range and optional timestamp range. +func selectPartsForQuery(snap *Snapshot, minKey, maxKey int64, minTS, maxTS *int64) []*part { var selectedParts []*part - for _, pw := range snap.parts { - if pw.overlapsKeyRange(minKey, maxKey) { - selectedParts = append(selectedParts, pw.p) + if !pw.overlapsKeyRange(minKey, maxKey) { + continue } + if minTS != nil && maxTS != nil && !pw.overlapsTimestampRange(*minTS, *maxTS) { + continue + } + selectedParts = append(selectedParts, pw.p) } - return selectedParts } diff --git a/banyand/internal/sidx/query_benchmark_test.go b/banyand/internal/sidx/query_benchmark_test.go index 5e6587904..31f2eca3b 100644 --- a/banyand/internal/sidx/query_benchmark_test.go +++ b/banyand/internal/sidx/query_benchmark_test.go @@ -229,7 +229,7 @@ func createBenchmarkSIDX(b *testing.B) SIDX { func introduceBenchmarkMemPart(tb testing.TB, sidx SIDX, reqs []WriteRequest, segmentID int64, partID uint64) { tb.Helper() - memPart, err := sidx.ConvertToMemPart(reqs, segmentID) + memPart, err := sidx.ConvertToMemPart(reqs, segmentID, nil, nil) if err != nil { tb.Fatalf("failed to convert requests to memPart: %v", err) } diff --git a/banyand/internal/sidx/scan_query.go b/banyand/internal/sidx/scan_query.go index 93a14592e..d625aea89 100644 --- a/banyand/internal/sidx/scan_query.go +++ b/banyand/internal/sidx/scan_query.go @@ -52,6 +52,9 @@ func (s *sidx) ScanQuery(ctx context.Context, req ScanQueryRequest) ([]*QueryRes maxKey = *req.MaxKey } + // Select parts that overlap key and optional timestamp range + partsToScan := selectPartsForScan(snap, minKey, maxKey, req.MinTimestamp, req.MaxTimestamp) + var results []*QueryResponse // Prepare Tags map if projection is specified @@ -73,9 +76,9 @@ func (s *sidx) ScanQuery(ctx context.Context, req ScanQueryRequest) ([]*QueryRes Tags: tagsMap, } - // Scan all parts - totalParts := len(snap.parts) - for partIdx, pw := range snap.parts { + // Scan selected parts + totalParts := len(partsToScan) + for partIdx, pw := range partsToScan { var err error if currentBatch, err = s.scanPart(ctx, pw, req, minKey, maxKey, &results, currentBatch, maxBatchSize); err != nil { return nil, err @@ -102,6 +105,21 @@ func (s *sidx) ScanQuery(ctx context.Context, req ScanQueryRequest) ([]*QueryRes return results, nil } +// selectPartsForScan selects parts that overlap key and optional timestamp range for scan queries. +func selectPartsForScan(snap *Snapshot, minKey, maxKey int64, minTS, maxTS *int64) []*partWrapper { + var selected []*partWrapper + for _, pw := range snap.parts { + if !pw.overlapsKeyRange(minKey, maxKey) { + continue + } + if minTS != nil && maxTS != nil && !pw.overlapsTimestampRange(*minTS, *maxTS) { + continue + } + selected = append(selected, pw) + } + return selected +} + func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req ScanQueryRequest, minKey, maxKey int64, results *[]*QueryResponse, currentBatch *QueryResponse, maxBatchSize int, diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index 7778610e1..8cd05755d 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -77,7 +77,7 @@ func NewSIDX(fileSystem fs.FileSystem, opts *Options) (SIDX, error) { } // ConvertToMemPart converts a write request to a memPart. -func (s *sidx) ConvertToMemPart(reqs []WriteRequest, segmentID int64) (*MemPart, error) { +func (s *sidx) ConvertToMemPart(reqs []WriteRequest, segmentID int64, minTimestamp, maxTimestamp *int64) (*MemPart, error) { // Validate requests for _, req := range reqs { if err := req.Validate(); err != nil { @@ -100,6 +100,8 @@ func (s *sidx) ConvertToMemPart(reqs []WriteRequest, segmentID int64) (*MemPart, mp := GenerateMemPart() mp.mustInitFromElements(es) mp.partMetadata.SegmentID = segmentID + mp.partMetadata.MinTimestamp = minTimestamp + mp.partMetadata.MaxTimestamp = maxTimestamp return mp, nil } diff --git a/banyand/internal/sidx/sidx_test.go b/banyand/internal/sidx/sidx_test.go index bd7f7bba8..fab62b89a 100644 --- a/banyand/internal/sidx/sidx_test.go +++ b/banyand/internal/sidx/sidx_test.go @@ -163,7 +163,7 @@ func createTestSIDXWithOptions(t *testing.T, tweak func(*Options)) SIDX { func writeTestData(t *testing.T, sidx SIDX, reqs []WriteRequest, segmentID int64, partID uint64) { // Convert write requests to MemPart - memPart, err := sidx.ConvertToMemPart(reqs, segmentID) + memPart, err := sidx.ConvertToMemPart(reqs, segmentID, nil, nil) require.NoError(t, err) require.NotNil(t, memPart) @@ -315,7 +315,7 @@ func TestSIDX_Write_Validation(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := sidx.ConvertToMemPart([]WriteRequest{tt.req}, 12) // Test with segmentID=12 + _, err := sidx.ConvertToMemPart([]WriteRequest{tt.req}, 12, nil, nil) // Test with segmentID=12 if tt.expectErr { assert.Error(t, err) } else { @@ -588,7 +588,7 @@ func TestSIDX_ConcurrentWrites(t *testing.T) { } // Convert to MemPart and introduce - memPart, err := sidx.ConvertToMemPart(reqs, int64(goroutineID+12)) // Test with varied segmentID + memPart, err := sidx.ConvertToMemPart(reqs, int64(goroutineID+12), nil, nil) // Test with varied segmentID if err != nil { errors <- err return @@ -670,7 +670,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) { fmt.Sprintf("writer-%d-data-%d", writerID, writeCount), ) // Convert to MemPart and introduce (ignore errors during concurrent stress) - if memPart, err := sidx.ConvertToMemPart([]WriteRequest{req}, int64(writerID+13)); err == nil { // Test with varied segmentID + if memPart, err := sidx.ConvertToMemPart([]WriteRequest{req}, int64(writerID+13), nil, nil); err == nil { // Test with varied segmentID sidx.IntroduceMemPart(uint64(writerID+13), memPart) // Test with varied partID } writeCount++ diff --git a/banyand/internal/sidx/snapshot_test.go b/banyand/internal/sidx/snapshot_test.go index 361995456..f870a70c1 100644 --- a/banyand/internal/sidx/snapshot_test.go +++ b/banyand/internal/sidx/snapshot_test.go @@ -32,7 +32,7 @@ import ( // writeTestData is a helper function for writing test data using the new pattern. func writeTestDataToSIDX(t *testing.T, sidx SIDX, reqs []WriteRequest, segmentID int64, partID uint64) { // Convert write requests to MemPart - memPart, err := sidx.ConvertToMemPart(reqs, segmentID) + memPart, err := sidx.ConvertToMemPart(reqs, segmentID, nil, nil) if err != nil { t.Errorf("ConvertToMemPart failed: %v", err) return diff --git a/banyand/internal/sidx/timestamp_test.go b/banyand/internal/sidx/timestamp_test.go new file mode 100644 index 000000000..fc7aaa9d6 --- /dev/null +++ b/banyand/internal/sidx/timestamp_test.go @@ -0,0 +1,394 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "context" + "encoding/json" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/fs" +) + +// writeTestDataWithTimeRange writes test data with optional min/max timestamps (back-compat: nil means no timestamps). +func writeTestDataWithTimeRange(t *testing.T, sidx SIDX, reqs []WriteRequest, segmentID int64, partID uint64, minTS, maxTS *int64) { + t.Helper() + memPart, err := sidx.ConvertToMemPart(reqs, segmentID, minTS, maxTS) + require.NoError(t, err) + require.NotNil(t, memPart) + sidx.IntroduceMemPart(partID, memPart) +} + +func ptrInt64Ts(v int64) *int64 { + val := v + return &val +} + +// TestConvertToMemPart_Timestamps verifies ConvertToMemPart with and without timestamps. +func TestConvertToMemPart_Timestamps(t *testing.T) { + sidx := createTestSIDX(t) + defer func() { + assert.NoError(t, sidx.Close()) + }() + + reqs := []WriteRequest{ + createTestWriteRequest(1, 1000, "data1"), + createTestWriteRequest(1, 2000, "data2"), + } + + t.Run("nil_timestamps_back_compat", func(t *testing.T) { + mp, err := sidx.ConvertToMemPart(reqs, 1, nil, nil) + require.NoError(t, err) + require.NotNil(t, mp) + require.Nil(t, mp.partMetadata.MinTimestamp) + require.Nil(t, mp.partMetadata.MaxTimestamp) + ReleaseMemPart(mp) + }) + + t.Run("with_timestamps", func(t *testing.T) { + minTS := int64(500) + maxTS := int64(2500) + mp, err := sidx.ConvertToMemPart(reqs, 1, &minTS, &maxTS) + require.NoError(t, err) + require.NotNil(t, mp) + require.NotNil(t, mp.partMetadata.MinTimestamp) + require.NotNil(t, mp.partMetadata.MaxTimestamp) + assert.Equal(t, int64(500), *mp.partMetadata.MinTimestamp) + assert.Equal(t, int64(2500), *mp.partMetadata.MaxTimestamp) + ReleaseMemPart(mp) + }) +} + +// TestManifest_BackCompat verifies manifest.json omits timestamps when nil (back-compat). +func TestManifest_BackCompat(t *testing.T) { + dir := t.TempDir() + fileSystem := fs.NewLocalFileSystem() + opts := NewDefaultOptions() + opts.Memory = protector.NewMemory(observability.NewBypassRegistry()) + opts.Path = dir + + sidxIface, err := NewSIDX(fileSystem, opts) + require.NoError(t, err) + raw := sidxIface.(*sidx) + defer func() { + assert.NoError(t, raw.Close()) + }() + + reqs := []WriteRequest{ + createTestWriteRequest(1, 100, "data"), + } + + t.Run("manifest_without_timestamps", func(t *testing.T) { + writeTestDataWithTimeRange(t, raw, reqs, 1, 1, nil, nil) + flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}}) + require.NoError(t, err) + raw.IntroduceFlushed(flushIntro) + flushIntro.Release() + + manifestPath := filepath.Join(dir, "0000000000000001", manifestFilename) + data, readErr := fileSystem.Read(manifestPath) + require.NoError(t, readErr) + + var m struct { + MinTimestamp *int64 `json:"minTimestamp,omitempty"` + MaxTimestamp *int64 `json:"maxTimestamp,omitempty"` + MinKey int64 `json:"minKey"` + MaxKey int64 `json:"maxKey"` + } + require.NoError(t, json.Unmarshal(data, &m)) + require.Nil(t, m.MinTimestamp) + require.Nil(t, m.MaxTimestamp) + assert.Equal(t, int64(100), m.MinKey) + assert.Equal(t, int64(100), m.MaxKey) + }) + + t.Run("manifest_with_timestamps", func(t *testing.T) { + minTS := int64(1000) + maxTS := int64(2000) + writeTestDataWithTimeRange(t, raw, reqs, 2, 2, &minTS, &maxTS) + flushIntro, err := raw.Flush(map[uint64]struct{}{2: {}}) + require.NoError(t, err) + raw.IntroduceFlushed(flushIntro) + flushIntro.Release() + + manifestPath := filepath.Join(dir, "0000000000000002", manifestFilename) + data, readErr := fileSystem.Read(manifestPath) + require.NoError(t, readErr) + + var m struct { + MinTimestamp *int64 `json:"minTimestamp,omitempty"` + MaxTimestamp *int64 `json:"maxTimestamp,omitempty"` + } + require.NoError(t, json.Unmarshal(data, &m)) + require.NotNil(t, m.MinTimestamp) + require.NotNil(t, m.MaxTimestamp) + assert.Equal(t, int64(1000), *m.MinTimestamp) + assert.Equal(t, int64(2000), *m.MaxTimestamp) + }) +} + +// TestQuery_TimestampSelection verifies timestamp-aware part selection and key-range fallback. +func TestQuery_TimestampSelection(t *testing.T) { + reqs := []WriteRequest{ + createTestWriteRequest(1, 1000, "data1000"), + createTestWriteRequest(1, 2000, "data2000"), + } + + t.Run("back_compat_key_range_only", func(t *testing.T) { + sidx := createTestSIDX(t) + defer func() { + assert.NoError(t, sidx.Close()) + }() + writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, nil, nil) + waitForIntroducerLoop() + + minKey := int64(500) + maxKey := int64(2500) + qr := QueryRequest{ + SeriesIDs: []common.SeriesID{1}, + MinKey: &minKey, + MaxKey: &maxKey, + } + resultsCh, errCh := sidx.StreamingQuery(context.Background(), qr) + var keys []int64 + for res := range resultsCh { + require.NoError(t, res.Error) + keys = append(keys, res.Keys...) + } + select { + case err, ok := <-errCh: + if ok { + require.NoError(t, err) + } + default: + } + assert.GreaterOrEqual(t, len(keys), 0, "key-range filter should work without timestamps") + }) + + t.Run("timestamp_filter_overlaps", func(t *testing.T) { + sidx := createTestSIDX(t) + defer func() { + assert.NoError(t, sidx.Close()) + }() + minTS := ptrInt64Ts(1000) + maxTS := ptrInt64Ts(3000) + writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, minTS, maxTS) + waitForIntroducerLoop() + + qMinTS := ptrInt64Ts(1500) + qMaxTS := ptrInt64Ts(2500) + qr := QueryRequest{ + SeriesIDs: []common.SeriesID{1}, + MinTimestamp: qMinTS, + MaxTimestamp: qMaxTS, + } + resultsCh, errCh := sidx.StreamingQuery(context.Background(), qr) + var count int + for res := range resultsCh { + require.NoError(t, res.Error) + count += res.Len() + } + select { + case err, ok := <-errCh: + if ok { + require.NoError(t, err) + } + default: + } + assert.GreaterOrEqual(t, count, 0, "query with overlapping timestamp range") + }) + + t.Run("timestamp_filter_no_overlap_excludes_part", func(t *testing.T) { + sidx := createTestSIDX(t) + defer func() { + assert.NoError(t, sidx.Close()) + }() + minTS := ptrInt64Ts(1000) + maxTS := ptrInt64Ts(2000) + writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, minTS, maxTS) + waitForIntroducerLoop() + + qMinTS := ptrInt64Ts(5000) + qMaxTS := ptrInt64Ts(6000) + qr := QueryRequest{ + SeriesIDs: []common.SeriesID{1}, + MinTimestamp: qMinTS, + MaxTimestamp: qMaxTS, + } + resultsCh, errCh := sidx.StreamingQuery(context.Background(), qr) + var count int + for res := range resultsCh { + require.NoError(t, res.Error) + count += res.Len() + } + select { + case err, ok := <-errCh: + if ok { + require.NoError(t, err) + } + default: + } + assert.Equal(t, 0, count, "parts outside timestamp range should be excluded") + }) +} + +// TestScanQuery_TimestampSelection verifies ScanQuery timestamp-aware part selection. +func TestScanQuery_TimestampSelection(t *testing.T) { + reqs := []WriteRequest{ + createTestWriteRequest(1, 100, "data"), + } + + t.Run("back_compat_no_timestamp_filter", func(t *testing.T) { + sidx := createTestSIDX(t) + defer func() { + assert.NoError(t, sidx.Close()) + }() + writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, nil, nil) + waitForIntroducerLoop() + + sqr := ScanQueryRequest{} + res, err := sidx.ScanQuery(context.Background(), sqr) + require.NoError(t, err) + assert.GreaterOrEqual(t, len(res), 0) + }) + + t.Run("timestamp_filter_excludes_non_overlapping", func(t *testing.T) { + sidx := createTestSIDX(t) + defer func() { + assert.NoError(t, sidx.Close()) + }() + minTS := ptrInt64Ts(1000) + maxTS := ptrInt64Ts(2000) + writeTestDataWithTimeRange(t, sidx, reqs, 1, 1, minTS, maxTS) + waitForIntroducerLoop() + + sqr := ScanQueryRequest{ + MinTimestamp: ptrInt64Ts(5000), + MaxTimestamp: ptrInt64Ts(6000), + } + res, err := sidx.ScanQuery(context.Background(), sqr) + require.NoError(t, err) + total := 0 + for _, r := range res { + total += r.Len() + } + assert.Equal(t, 0, total, "parts outside timestamp range should be excluded in scan") + }) +} + +// TestMerge_TimestampPropagation verifies merge aggregates timestamps and back-compat when absent. +func TestMerge_TimestampPropagation(t *testing.T) { + reqs := []WriteRequest{ + createTestWriteRequest(1, 100, "data"), + } + + t.Run("merge_parts_without_timestamps_back_compat", func(t *testing.T) { + dir := t.TempDir() + fileSystem := fs.NewLocalFileSystem() + opts := NewDefaultOptions() + opts.Memory = protector.NewMemory(observability.NewBypassRegistry()) + opts.Path = dir + opts.AvailablePartIDs = []uint64{1, 2, 10} + + sidxIface, err := NewSIDX(fileSystem, opts) + require.NoError(t, err) + raw := sidxIface.(*sidx) + defer func() { + assert.NoError(t, raw.Close()) + }() + + writeTestDataWithTimeRange(t, raw, reqs, 1, 1, nil, nil) + writeTestDataWithTimeRange(t, raw, reqs, 2, 2, nil, nil) + waitForIntroducerLoop() + + flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}, 2: {}}) + require.NoError(t, err) + raw.IntroduceFlushed(flushIntro) + flushIntro.Release() + + mergeIntro, mergeErr := raw.Merge(nil, map[uint64]struct{}{1: {}, 2: {}}, 10) + require.NoError(t, mergeErr) + require.NotNil(t, mergeIntro) + raw.IntroduceMerged(mergeIntro)() + + manifestPath := filepath.Join(dir, "000000000000000a", manifestFilename) + data, readErr := fileSystem.Read(manifestPath) + require.NoError(t, readErr) + + var m struct { + MinTimestamp *int64 `json:"minTimestamp,omitempty"` + MaxTimestamp *int64 `json:"maxTimestamp,omitempty"` + } + require.NoError(t, json.Unmarshal(data, &m)) + require.Nil(t, m.MinTimestamp) + require.Nil(t, m.MaxTimestamp) + }) + + t.Run("merge_parts_with_timestamps_aggregates", func(t *testing.T) { + dir := t.TempDir() + fileSystem := fs.NewLocalFileSystem() + opts := NewDefaultOptions() + opts.Memory = protector.NewMemory(observability.NewBypassRegistry()) + opts.Path = dir + opts.AvailablePartIDs = []uint64{1, 2, 10} + + sidxIface, err := NewSIDX(fileSystem, opts) + require.NoError(t, err) + raw := sidxIface.(*sidx) + defer func() { + assert.NoError(t, raw.Close()) + }() + + min1, max1 := int64(100), int64(200) + min2, max2 := int64(150), int64(300) + writeTestDataWithTimeRange(t, raw, reqs, 1, 1, &min1, &max1) + writeTestDataWithTimeRange(t, raw, reqs, 2, 2, &min2, &max2) + waitForIntroducerLoop() + + flushIntro, err := raw.Flush(map[uint64]struct{}{1: {}, 2: {}}) + require.NoError(t, err) + raw.IntroduceFlushed(flushIntro) + flushIntro.Release() + + mergeIntro, mergeErr := raw.Merge(nil, map[uint64]struct{}{1: {}, 2: {}}, 10) + require.NoError(t, mergeErr) + require.NotNil(t, mergeIntro) + raw.IntroduceMerged(mergeIntro)() + + manifestPath := filepath.Join(dir, "000000000000000a", manifestFilename) + data, readErr := fileSystem.Read(manifestPath) + require.NoError(t, readErr) + + var m struct { + MinTimestamp *int64 `json:"minTimestamp,omitempty"` + MaxTimestamp *int64 `json:"maxTimestamp,omitempty"` + } + require.NoError(t, json.Unmarshal(data, &m)) + require.NotNil(t, m.MinTimestamp) + require.NotNil(t, m.MaxTimestamp) + assert.Equal(t, int64(100), *m.MinTimestamp) + assert.Equal(t, int64(300), *m.MaxTimestamp) + }) +} diff --git a/banyand/trace/streaming_pipeline_test.go b/banyand/trace/streaming_pipeline_test.go index 863c32206..f1435c246 100644 --- a/banyand/trace/streaming_pipeline_test.go +++ b/banyand/trace/streaming_pipeline_test.go @@ -59,7 +59,7 @@ func (f *fakeSIDX) StreamingQuery(ctx context.Context, _ sidx.QueryRequest) (<-c func (f *fakeSIDX) IntroduceMemPart(uint64, *sidx.MemPart) { panic("not implemented") } func (f *fakeSIDX) IntroduceFlushed(*sidx.FlusherIntroduction) {} func (f *fakeSIDX) IntroduceMerged(*sidx.MergerIntroduction) func() { return func() {} } -func (f *fakeSIDX) ConvertToMemPart([]sidx.WriteRequest, int64) (*sidx.MemPart, error) { +func (f *fakeSIDX) ConvertToMemPart([]sidx.WriteRequest, int64, *int64, *int64) (*sidx.MemPart, error) { panic("not implemented") } @@ -654,7 +654,7 @@ func (f *fakeSIDXInfinite) StreamingQuery(ctx context.Context, _ sidx.QueryReque func (f *fakeSIDXInfinite) IntroduceMemPart(uint64, *sidx.MemPart) { panic("not implemented") } func (f *fakeSIDXInfinite) IntroduceFlushed(*sidx.FlusherIntroduction) {} func (f *fakeSIDXInfinite) IntroduceMerged(*sidx.MergerIntroduction) func() { return func() {} } -func (f *fakeSIDXInfinite) ConvertToMemPart([]sidx.WriteRequest, int64) (*sidx.MemPart, error) { +func (f *fakeSIDXInfinite) ConvertToMemPart([]sidx.WriteRequest, int64, *int64, *int64) (*sidx.MemPart, error) { panic("not implemented") } diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go index 34bf7c719..3c2ec1f99 100644 --- a/banyand/trace/write_liaison.go +++ b/banyand/trace/write_liaison.go @@ -222,8 +222,10 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx instance") continue } + minTS := es.timeRange.Start.UnixNano() + maxTS := es.timeRange.End.UnixNano() var siMemPart *sidx.MemPart - if siMemPart, err = sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano()); err != nil { + if siMemPart, err = sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano(), &minTS, &maxTS); err != nil { w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary index") continue } diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index 077d1e86a..eb3c16db6 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -435,8 +435,10 @@ func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Me w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx instance") continue } + minTS := es.timeRange.Start.UnixNano() + maxTS := es.timeRange.End.UnixNano() var siMemPart *sidx.MemPart - if siMemPart, err = sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano()); err != nil { + if siMemPart, err = sidxInstance.ConvertToMemPart(sidxReqs, es.timeRange.Start.UnixNano(), &minTS, &maxTS); err != nil { w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary index") continue }
