This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a1f3c30  Add parameter to list segments which can be queried (#7878)
a1f3c30 is described below

commit a1f3c30e2ac74dd0acb67087a86d8f03b416f5b2
Author: Jialiang Li <j...@linkedin.com>
AuthorDate: Mon Jan 3 13:41:02 2022 -0800

    Add parameter to list segments which can be queried (#7878)
    
    Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
---
 .../broker/broker/HelixBrokerStarterTest.java      |  2 +-
 .../api/resources/PinotSegmentRestletResource.java | 17 +++++++++---
 .../helix/core/PinotHelixResourceManager.java      | 32 ++++++++++++++++------
 .../helix/core/retention/RetentionManager.java     |  3 +-
 ...PinotIngestionRestletResourceStatelessTest.java |  6 ++--
 .../helix/core/PinotHelixResourceManagerTest.java  |  2 +-
 .../core/retention/SegmentLineageCleanupTest.java  | 14 ++++++----
 .../tests/OfflineGRPCServerIntegrationTest.java    |  4 +--
 8 files changed, 53 insertions(+), 27 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index b42fdf2..68cf4f1 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -222,7 +222,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
     assertEquals(timeBoundaryInfo.getTimeValue(), 
Integer.toString(currentEndTime - 1));
 
     // Refresh a segment with a new end time
-    String segmentToRefresh = 
_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).get(0);
+    String segmentToRefresh = 
_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true).get(0);
     int newEndTime = currentEndTime + 10;
     SegmentZKMetadata segmentZKMetadata =
         _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, 
segmentToRefresh);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index ba569d5..7078e60 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -163,17 +163,26 @@ public class PinotSegmentRestletResource {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/segments/{tableName}")
-  @ApiOperation(value = "List all segments", notes = "List all segments")
+  @ApiOperation(value = "List all segments. An optional 
'excludeReplacedSegments' parameter is used to get the"
+      + " list of segments which has not yet been replaced (determined by 
segment lineage entries) and can be queried"
+      + " from the table. The value is false by default.",
+      // TODO: more and more filters can be added later on, like 
excludeErrorSegments, excludeConsumingSegments, etc.
+      notes = "List all segments")
   public List<Map<TableType, List<String>>> getSegments(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "Whether to exclude replaced segments in the response, 
which have been replaced"
+          + " specified in the segment lineage entries and cannot be queried 
from the table")
+      @QueryParam("excludeReplacedSegments") String excludeReplacedSegments) {
     List<String> tableNamesWithType = ResourceUtils
         .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, 
Constants.validateTableType(tableTypeStr),
             LOGGER);
+    boolean shouldExcludeReplacedSegments = 
Boolean.parseBoolean(excludeReplacedSegments);
     List<Map<TableType, List<String>>> resultList = new 
ArrayList<>(tableNamesWithType.size());
     for (String tableNameWithType : tableNamesWithType) {
       TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-      List<String> segments = 
_pinotHelixResourceManager.getSegmentsFor(tableNameWithType);
+      List<String> segments =
+          _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, 
shouldExcludeReplacedSegments);
       resultList.add(Collections.singletonMap(tableType, segments));
     }
     return resultList;
@@ -585,7 +594,7 @@ public class PinotSegmentRestletResource {
     }
     String tableNameWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
-    deleteSegmentsInternal(tableNameWithType, 
_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+    deleteSegmentsInternal(tableNameWithType, 
_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false));
     return new SuccessResponse("All segments of table " + tableNameWithType + 
" deleted");
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f9d6973..9f719a9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -587,10 +587,15 @@ public class PinotHelixResourceManager {
    * Returns the segments for the given table.
    *
    * @param tableNameWithType Table name with type suffix
+   * @param shouldExcludeReplacedSegments whether to return the list of 
segments that doesn't contain replaced segments.
    * @return List of segment names
    */
-  public List<String> getSegmentsFor(String tableNameWithType) {
-    return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
+  public List<String> getSegmentsFor(String tableNameWithType, boolean 
shouldExcludeReplacedSegments) {
+    List<String> segmentsFromPropertiesStore = 
ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
+    if (shouldExcludeReplacedSegments) {
+      return excludeReplacedSegments(tableNameWithType, 
segmentsFromPropertiesStore);
+    }
+    return segmentsFromPropertiesStore;
   }
 
   /**
@@ -606,7 +611,7 @@ public class PinotHelixResourceManager {
     List<String> selectedSegments;
     // If no start and end timestamp specified, just select all the segments.
     if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
-      selectedSegments = getSegmentsFor(tableNameWithType);
+      selectedSegments = getSegmentsFor(tableNameWithType, false);
     } else {
       selectedSegments = new ArrayList<>();
       List<SegmentZKMetadata> segmentZKMetadataList = 
getSegmentsZKMetadata(tableNameWithType);
@@ -617,12 +622,21 @@ public class PinotHelixResourceManager {
         }
       }
     }
+    return excludeReplacedSegments(tableNameWithType, selectedSegments);
+  }
+
+  /**
+   * Given the list of segment names, exclude all the replaced segments which 
cannot be queried.
+   * @param tableNameWithType table name with type
+   * @param segments list of input segment names
+   */
+  private List<String> excludeReplacedSegments(String tableNameWithType, 
List<String> segments) {
     // Fetch the segment lineage metadata, and filter segments based on 
segment lineage.
     SegmentLineage segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType);
     if (segmentLineage == null) {
-      return selectedSegments;
+      return segments;
     } else {
-      Set<String> selectedSegmentSet = new HashSet<>(selectedSegments);
+      Set<String> selectedSegmentSet = new HashSet<>(segments);
       
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, 
segmentLineage);
       return new ArrayList<>(selectedSegmentSet);
     }
@@ -1618,7 +1632,7 @@ public class PinotHelixResourceManager {
     }
 
     // Remove all stored segments for the table
-    _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, 
getSegmentsFor(offlineTableName));
+    _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, 
getSegmentsFor(offlineTableName, false));
     LOGGER.info("Deleting table {}: Removed stored segments", 
offlineTableName);
 
     // Remove segment metadata
@@ -1662,7 +1676,7 @@ public class PinotHelixResourceManager {
     }
 
     // Remove all stored segments for the table
-    _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, 
getSegmentsFor(realtimeTableName));
+    _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, 
getSegmentsFor(realtimeTableName, false));
     LOGGER.info("Deleting table {}: Removed stored segments", 
realtimeTableName);
 
     // Remove segment metadata
@@ -2755,7 +2769,7 @@ public class PinotHelixResourceManager {
     String segmentLineageEntryId = 
SegmentLineageUtils.generateLineageEntryId();
 
     // Check that all the segments from 'segmentsFrom' exist in the table
-    Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType));
+    Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType, false));
     Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), 
String.format(
         "Not all segments from 'segmentsFrom' are available in the table. 
(tableName = '%s', segmentsFrom = '%s', "
             + "segmentsTo = '%s', segmentsFromTable = '%s')", 
tableNameWithType, segmentsFrom, segmentsTo,
@@ -2892,7 +2906,7 @@ public class PinotHelixResourceManager {
         }
 
         // Check that all the segments from 'segmentsTo' exist in the table
-        Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType));
+        Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType, false));
         
Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()),
 String.format(
             "Not all segments from 'segmentsTo' are available in the table. 
(tableName = '%s', segmentsTo = '%s', "
                 + "segmentsFromTable = '%s')", tableNameWithType, 
lineageEntry.getSegmentsTo(), segmentsForTable));
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 2512c78..9ed7a69 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -214,7 +214,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
         // 1. The original segments can be deleted once the merged segments 
are successfully uploaded
         // 2. The zombie lineage entry & merged segments should be deleted if 
the segment replacement failed in
         //    the middle
-        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+        Set<String> segmentsForTable =
+            new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false));
         List<String> segmentsToDelete = new ArrayList<>();
         for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
           LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
index f98ece1..ddee58c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
@@ -85,7 +85,7 @@ public class PinotIngestionRestletResourceStatelessTest 
extends ControllerTest {
   public void testIngestEndpoint()
       throws Exception {
 
-    List<String> segments = 
_helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE);
+    List<String> segments = 
_helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE, false);
     Assert.assertEquals(segments.size(), 0);
 
     // ingest from file
@@ -93,13 +93,13 @@ public class PinotIngestionRestletResourceStatelessTest 
extends ControllerTest {
     batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "csv");
     batchConfigMap.put(String.format("%s.delimiter", 
BatchConfigProperties.RECORD_READER_PROP_PREFIX), "|");
     
sendHttpPost(_controllerRequestURLBuilder.forIngestFromFile(TABLE_NAME_WITH_TYPE,
 batchConfigMap));
-    segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE);
+    segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE, 
false);
     Assert.assertEquals(segments.size(), 1);
 
     // ingest from URI
     
sendHttpPost(_controllerRequestURLBuilder.forIngestFromURI(TABLE_NAME_WITH_TYPE,
 batchConfigMap,
         String.format("file://%s", _inputFile.getAbsolutePath())));
-    segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE);
+    segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE, 
false);
     Assert.assertEquals(segments.size(), 2);
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index ae524a6..faf594a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -464,7 +464,7 @@ public class PinotHelixResourceManagerTest {
           "downloadUrl");
     }
     List<String> segmentsForTable =
-        
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+        
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 5);
 
     List<String> segmentsFrom = new ArrayList<>();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index 13423aa..b77e211 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -114,7 +114,8 @@ public class SegmentLineageCleanupTest {
       
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
           SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_" + i), "downloadUrl");
     }
-    
Assert.assertEquals(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME).size(),
 7);
+    
Assert.assertEquals(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false).size(),
+        7);
     long currentTimeInMillis = System.currentTimeMillis();
 
     // Validate the case when the lineage entry state is 'IN_PROGRESS'
@@ -126,7 +127,8 @@ public class SegmentLineageCleanupTest {
         
.writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
     waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 7);
-    List<String> segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME);
+    List<String> segmentsForTable =
+        
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 7);
 
     // Validate the case when the lineage entry state is 'COMPLETED'
@@ -137,7 +139,7 @@ public class SegmentLineageCleanupTest {
         
.writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
     waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 5);
-    segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME);
+    segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 5);
     Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("segment_0", "segment_1")));
 
@@ -146,7 +148,7 @@ public class SegmentLineageCleanupTest {
     waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
     waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
-    segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME);
+    segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 4);
     Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("segment_0", "segment_1", "merged_0")));
     segmentLineage =
@@ -163,7 +165,7 @@ public class SegmentLineageCleanupTest {
         
.writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
     waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 3);
-    segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME);
+    segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 3);
     Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("merged_1", "merged_2")));
     segmentLineage =
@@ -176,7 +178,7 @@ public class SegmentLineageCleanupTest {
       throws InterruptedException {
     long endTimeMs = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
     do {
-      if 
(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType).size()
+      if 
(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType,
 false).size()
           == expectedNumSegmentsAfterDelete) {
         return;
       } else {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 99c7e7b..f3df4c9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -152,7 +152,7 @@ public class OfflineGRPCServerIntegrationTest extends 
BaseClusterIntegrationTest
     GrpcQueryClient queryClient = new GrpcQueryClient("localhost", 
CommonConstants.Server.DEFAULT_GRPC_PORT);
     String sql = "SELECT * FROM mytable_OFFLINE LIMIT 1000000";
     BrokerRequest brokerRequest = new 
Pql2Compiler().compileToBrokerRequest(sql);
-    List<String> segments = 
_helixResourceManager.getSegmentsFor("mytable_OFFLINE");
+    List<String> segments = 
_helixResourceManager.getSegmentsFor("mytable_OFFLINE", false);
 
     GrpcRequestBuilder requestBuilder = new 
GrpcRequestBuilder().setSegments(segments);
     
testNonStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build()));
@@ -167,7 +167,7 @@ public class OfflineGRPCServerIntegrationTest extends 
BaseClusterIntegrationTest
   public void testQueryingGrpcServer(String sql)
       throws Exception {
     GrpcQueryClient queryClient = new GrpcQueryClient("localhost", 
CommonConstants.Server.DEFAULT_GRPC_PORT);
-    List<String> segments = 
_helixResourceManager.getSegmentsFor("mytable_OFFLINE");
+    List<String> segments = 
_helixResourceManager.getSegmentsFor("mytable_OFFLINE", false);
 
     GrpcRequestBuilder requestBuilder = new 
GrpcRequestBuilder().setSegments(segments);
     DataTable dataTable = 
collectNonStreamingRequestResult(queryClient.submit(requestBuilder.setSql(sql).build()));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to