This is an automated email from the ASF dual-hosted git repository.

swaminathanmanish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bd76ff0ee27 Query both segments of a multi-segment SegmentDataManager 
(#18484)
bd76ff0ee27 is described below

commit bd76ff0ee27f5554a4021e3c2a7ec769eb02617d
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed May 13 14:57:03 2026 +0530

    Query both segments of a multi-segment SegmentDataManager (#18484)
    
    DuoSegmentDataManager wraps two IndexSegments under one segment name (e.g.
    the committed immutable and the still-mutable consuming segment during a
    realtime commit window). SingleTableExecutionInfo.create() previously only
    took the primary via getSegment(), so the secondary's rows were silently
    missed during the duo window. Use hasMultiSegments()/getSegments() to expand
    all live segments into the query plan, for both upsert and non-upsert 
tables.
    Extracted the expansion into a private helper to keep the two branches in
    sync.
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../query/executor/SingleTableExecutionInfo.java   |  28 ++--
 .../executor/SingleTableExecutionInfoTest.java     | 182 +++++++++++++++++++++
 2 files changed, 198 insertions(+), 12 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
index 6cdb6c0614a..df484ea40d0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
@@ -75,10 +75,7 @@ public class SingleTableExecutionInfo implements 
TableExecutionInfo {
 
     if (!tableDataManager.isUpsertEnabled()) {
       segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery, 
optionalSegments, notAcquiredSegments);
-      indexSegments = new ArrayList<>(segmentDataManagers.size());
-      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-        indexSegments.add(segmentDataManager.getSegment());
-      }
+      indexSegments = collectIndexSegments(segmentDataManagers);
     } else {
       TableUpsertMetadataManager tumm = 
tableDataManager.getTableUpsertMetadataManager();
       Preconditions.checkState(tumm != null,
@@ -101,14 +98,7 @@ public class SingleTableExecutionInfo implements 
TableExecutionInfo {
           }
         }
         segmentDataManagers = 
tableDataManager.acquireSegments(segmentsToQuery, optionalSegments, 
notAcquiredSegments);
-        indexSegments = new ArrayList<>(segmentDataManagers.size());
-        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-          if (segmentDataManager.hasMultiSegments()) {
-            indexSegments.addAll(segmentDataManager.getSegments());
-          } else {
-            indexSegments.add(segmentDataManager.getSegment());
-          }
-        }
+        indexSegments = collectIndexSegments(segmentDataManagers);
         if (isUsingConsistencyMode) {
           List<SegmentContext> segmentContexts =
               tableDataManager.getSegmentContexts(indexSegments, 
queryContext.getQueryOptions());
@@ -128,6 +118,20 @@ public class SingleTableExecutionInfo implements 
TableExecutionInfo {
         segmentsToQuery, optionalSegments, notAcquiredSegments);
   }
 
+  // A SegmentDataManager may expose more than one IndexSegment (e.g. 
DuoSegmentDataManager during a commit window,
+  // where both the committed immutable and the still-mutable consuming 
segment must be queried together).
+  private static List<IndexSegment> 
collectIndexSegments(List<SegmentDataManager> segmentDataManagers) {
+    List<IndexSegment> indexSegments = new 
ArrayList<>(segmentDataManagers.size());
+    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+      if (segmentDataManager.hasMultiSegments()) {
+        indexSegments.addAll(segmentDataManager.getSegments());
+      } else {
+        indexSegments.add(segmentDataManager.getSegment());
+      }
+    }
+    return indexSegments;
+  }
+
   private SingleTableExecutionInfo(TableDataManager tableDataManager, 
List<SegmentDataManager> segmentDataManagers,
       List<IndexSegment> indexSegments, Map<IndexSegment, SegmentContext> 
providedSegmentContexts,
       List<String> segmentsToQuery, List<String> optionalSegments, 
List<String> notAcquiredSegments) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfoTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfoTest.java
new file mode 100644
index 00000000000..34f23e9a5ff
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfoTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.executor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.expectThrows;
+
+
+public class SingleTableExecutionInfoTest {
+
+  private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
+
+  /**
+   * For a {@link DuoSegmentDataManager}, both the primary (immutable) segment 
and the secondary (mutable) segment
+   * must appear in the index segment list used for query execution. The 
primary segment alone is not enough — a
+   * mutable consuming segment continues to receive rows after the immutable 
was committed, and those rows must
+   * still be queryable for the duration of the duo window.
+   */
+  @Test
+  public void testCreateExpandsDuoSegmentDataManagerForNonUpsertTable()
+      throws TableNotFoundException {
+    SegmentDataManager singleSegmentMgr = 
mockImmutableSegmentDataManager("seg00");
+    IndexSegment singleSegment = singleSegmentMgr.getSegment();
+
+    SegmentDataManager primary = mockImmutableSegmentDataManager("seg01");
+    SegmentDataManager secondary = mockMutableSegmentDataManager("seg01");
+    IndexSegment primarySegment = primary.getSegment();
+    IndexSegment secondarySegment = secondary.getSegment();
+    DuoSegmentDataManager duo = new DuoSegmentDataManager(primary, secondary);
+
+    List<SegmentDataManager> acquired = Arrays.asList(singleSegmentMgr, duo);
+    InstanceDataManager instanceDataManager =
+        mockInstanceDataManager(acquired, /*isUpsertEnabled=*/ false);
+
+    SingleTableExecutionInfo info = 
SingleTableExecutionInfo.create(instanceDataManager, TABLE_NAME_WITH_TYPE,
+        Arrays.asList("seg00", "seg01"), null, mock(QueryContext.class));
+
+    // Single-segment manager contributes 1 segment; duo contributes 2 -> 3 
total
+    List<IndexSegment> indexSegments = info.getIndexSegments();
+    assertEquals(indexSegments.size(), 3);
+    assertSame(indexSegments.get(0), singleSegment);
+    assertSame(indexSegments.get(1), primarySegment);
+    assertSame(indexSegments.get(2), secondarySegment);
+
+    // Acquired segment data managers list still tracks the duo once (one 
reference to release)
+    assertEquals(info.getSegmentDataManagers().size(), 2);
+    assertEquals(info.getNumSegmentsAcquired(), 2);
+  }
+
+  /**
+   * When a {@link DuoSegmentDataManager}'s secondary segment has been 
released (refCount=0),
+   * {@link DuoSegmentDataManager#getSegments()} only returns the live primary 
segment. The execution info should
+   * include just that one segment for the duo manager — no nulls, no stale 
references.
+   */
+  @Test
+  public void testCreateExpandsDuoSegmentDataManagerWithReleasedSecondary()
+      throws TableNotFoundException {
+    SegmentDataManager primary = mockImmutableSegmentDataManager("seg01");
+    SegmentDataManager secondary = mockMutableSegmentDataManager("seg01");
+    // Simulate the secondary already released after duo manager construction
+    when(secondary.getReferenceCount()).thenReturn(0);
+    IndexSegment primarySegment = primary.getSegment();
+    DuoSegmentDataManager duo = new DuoSegmentDataManager(primary, secondary);
+
+    InstanceDataManager instanceDataManager =
+        mockInstanceDataManager(Collections.singletonList(duo), 
/*isUpsertEnabled=*/ false);
+
+    SingleTableExecutionInfo info = 
SingleTableExecutionInfo.create(instanceDataManager, TABLE_NAME_WITH_TYPE,
+        Collections.singletonList("seg01"), null, mock(QueryContext.class));
+
+    List<IndexSegment> indexSegments = info.getIndexSegments();
+    assertEquals(indexSegments.size(), 1);
+    assertSame(indexSegments.get(0), primarySegment);
+  }
+
+  /**
+   * Plain (non-duo) {@link SegmentDataManager} instances are still handled 
correctly: each one contributes
+   * exactly one segment to the index segment list, matching the 
pre-multi-segment behavior.
+   */
+  @Test
+  public void testCreateKeepsSingleSegmentBehaviorWhenNoMultiSegments()
+      throws TableNotFoundException {
+    SegmentDataManager sdm1 = mockImmutableSegmentDataManager("seg01");
+    SegmentDataManager sdm2 = mockImmutableSegmentDataManager("seg02");
+    IndexSegment seg1 = sdm1.getSegment();
+    IndexSegment seg2 = sdm2.getSegment();
+
+    InstanceDataManager instanceDataManager =
+        mockInstanceDataManager(Arrays.asList(sdm1, sdm2), 
/*isUpsertEnabled=*/ false);
+
+    SingleTableExecutionInfo info = 
SingleTableExecutionInfo.create(instanceDataManager, TABLE_NAME_WITH_TYPE,
+        Arrays.asList("seg01", "seg02"), null, mock(QueryContext.class));
+
+    List<IndexSegment> indexSegments = info.getIndexSegments();
+    assertEquals(indexSegments.size(), 2);
+    assertSame(indexSegments.get(0), seg1);
+    assertSame(indexSegments.get(1), seg2);
+  }
+
+  /**
+   * {@link SingleTableExecutionInfo#create} throws {@link 
TableNotFoundException} when the requested table
+   * is not registered with the instance data manager.
+   */
+  @Test
+  public void testCreateThrowsTableNotFoundWhenTableMissing() {
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    
when(instanceDataManager.getTableDataManager(TABLE_NAME_WITH_TYPE)).thenReturn(null);
+
+    expectThrows(TableNotFoundException.class,
+        () -> SingleTableExecutionInfo.create(instanceDataManager, 
TABLE_NAME_WITH_TYPE,
+            Collections.emptyList(), null, mock(QueryContext.class)));
+  }
+
+  private static InstanceDataManager 
mockInstanceDataManager(List<SegmentDataManager> acquired,
+      boolean isUpsertEnabled) {
+    TableDataManager tableDataManager = mock(TableDataManager.class);
+    when(tableDataManager.isUpsertEnabled()).thenReturn(isUpsertEnabled);
+    when(tableDataManager.acquireSegments(anyList(), any(), 
anyList())).thenAnswer(invocation -> {
+      // Return the prebuilt list of acquired managers; tests do not exercise 
notAcquiredSegments
+      return new ArrayList<>(acquired);
+    });
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    
when(instanceDataManager.getTableDataManager(TABLE_NAME_WITH_TYPE)).thenReturn(tableDataManager);
+    return instanceDataManager;
+  }
+
+  private static SegmentDataManager mockImmutableSegmentDataManager(String 
segmentName) {
+    SegmentDataManager sdm = mock(ImmutableSegmentDataManager.class);
+    IndexSegment segment = mock(ImmutableSegment.class);
+    when(sdm.getSegmentName()).thenReturn(segmentName);
+    when(sdm.getSegment()).thenReturn(segment);
+    when(sdm.getReferenceCount()).thenReturn(1);
+    return sdm;
+  }
+
+  private static SegmentDataManager mockMutableSegmentDataManager(String 
segmentName) {
+    SegmentDataManager sdm = mock(RealtimeSegmentDataManager.class);
+    IndexSegment segment = mock(MutableSegment.class);
+    when(sdm.getSegmentName()).thenReturn(segmentName);
+    when(sdm.getSegment()).thenReturn(segment);
+    when(sdm.getReferenceCount()).thenReturn(1);
+    return sdm;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to