This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a1f3c30 Add parameter to list segments which can be queried (#7878) a1f3c30 is described below commit a1f3c30e2ac74dd0acb67087a86d8f03b416f5b2 Author: Jialiang Li <j...@linkedin.com> AuthorDate: Mon Jan 3 13:41:02 2022 -0800 Add parameter to list segments which can be queried (#7878) Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> --- .../broker/broker/HelixBrokerStarterTest.java | 2 +- .../api/resources/PinotSegmentRestletResource.java | 17 +++++++++--- .../helix/core/PinotHelixResourceManager.java | 32 ++++++++++++++++------ .../helix/core/retention/RetentionManager.java | 3 +- ...PinotIngestionRestletResourceStatelessTest.java | 6 ++-- .../helix/core/PinotHelixResourceManagerTest.java | 2 +- .../core/retention/SegmentLineageCleanupTest.java | 14 ++++++---- .../tests/OfflineGRPCServerIntegrationTest.java | 4 +-- 8 files changed, 53 insertions(+), 27 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index b42fdf2..68cf4f1 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -222,7 +222,7 @@ public class HelixBrokerStarterTest extends ControllerTest { assertEquals(timeBoundaryInfo.getTimeValue(), Integer.toString(currentEndTime - 1)); // Refresh a segment with a new end time - String segmentToRefresh = _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).get(0); + String segmentToRefresh = _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true).get(0); int newEndTime = currentEndTime + 10; SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentToRefresh); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index ba569d5..7078e60 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -163,17 +163,26 @@ public class PinotSegmentRestletResource { @GET @Produces(MediaType.APPLICATION_JSON) @Path("/segments/{tableName}") - @ApiOperation(value = "List all segments", notes = "List all segments") + @ApiOperation(value = "List all segments. An optional 'excludeReplacedSegments' parameter is used to get the" + + " list of segments which has not yet been replaced (determined by segment lineage entries) and can be queried" + + " from the table. The value is false by default.", + // TODO: more and more filters can be added later on, like excludeErrorSegments, excludeConsumingSegments, etc. + notes = "List all segments") public List<Map<TableType, List<String>>> getSegments( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, - @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr) { + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + @ApiParam(value = "Whether to exclude replaced segments in the response, which have been replaced" + + " specified in the segment lineage entries and cannot be queried from the table") + @QueryParam("excludeReplacedSegments") String excludeReplacedSegments) { List<String> tableNamesWithType = ResourceUtils .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, Constants.validateTableType(tableTypeStr), LOGGER); + boolean shouldExcludeReplacedSegments = Boolean.parseBoolean(excludeReplacedSegments); List<Map<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size()); for (String tableNameWithType : tableNamesWithType) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - List<String> segments = _pinotHelixResourceManager.getSegmentsFor(tableNameWithType); + List<String> segments = + _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, shouldExcludeReplacedSegments); resultList.add(Collections.singletonMap(tableType, segments)); } return resultList; @@ -585,7 +594,7 @@ public class PinotSegmentRestletResource { } String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType)); + deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false)); return new SuccessResponse("All segments of table " + tableNameWithType + " deleted"); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f9d6973..9f719a9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -587,10 +587,15 @@ public class PinotHelixResourceManager { * Returns the segments for the given table. * * @param tableNameWithType Table name with type suffix + * @param shouldExcludeReplacedSegments whether to return the list of segments that doesn't contain replaced segments. * @return List of segment names */ - public List<String> getSegmentsFor(String tableNameWithType) { - return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType); + public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) { + List<String> segmentsFromPropertiesStore = ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType); + if (shouldExcludeReplacedSegments) { + return excludeReplacedSegments(tableNameWithType, segmentsFromPropertiesStore); + } + return segmentsFromPropertiesStore; } /** @@ -606,7 +611,7 @@ public class PinotHelixResourceManager { List<String> selectedSegments; // If no start and end timestamp specified, just select all the segments. if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) { - selectedSegments = getSegmentsFor(tableNameWithType); + selectedSegments = getSegmentsFor(tableNameWithType, false); } else { selectedSegments = new ArrayList<>(); List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType); @@ -617,12 +622,21 @@ public class PinotHelixResourceManager { } } } + return excludeReplacedSegments(tableNameWithType, selectedSegments); + } + + /** + * Given the list of segment names, exclude all the replaced segments which cannot be queried. + * @param tableNameWithType table name with type + * @param segments list of input segment names + */ + private List<String> excludeReplacedSegments(String tableNameWithType, List<String> segments) { // Fetch the segment lineage metadata, and filter segments based on segment lineage. SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, tableNameWithType); if (segmentLineage == null) { - return selectedSegments; + return segments; } else { - Set<String> selectedSegmentSet = new HashSet<>(selectedSegments); + Set<String> selectedSegmentSet = new HashSet<>(segments); SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, segmentLineage); return new ArrayList<>(selectedSegmentSet); } @@ -1618,7 +1632,7 @@ public class PinotHelixResourceManager { } // Remove all stored segments for the table - _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFor(offlineTableName)); + _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFor(offlineTableName, false)); LOGGER.info("Deleting table {}: Removed stored segments", offlineTableName); // Remove segment metadata @@ -1662,7 +1676,7 @@ public class PinotHelixResourceManager { } // Remove all stored segments for the table - _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName)); + _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName, false)); LOGGER.info("Deleting table {}: Removed stored segments", realtimeTableName); // Remove segment metadata @@ -2755,7 +2769,7 @@ public class PinotHelixResourceManager { String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId(); // Check that all the segments from 'segmentsFrom' exist in the table - Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType)); + Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType, false)); Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format( "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', " + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo, @@ -2892,7 +2906,7 @@ public class PinotHelixResourceManager { } // Check that all the segments from 'segmentsTo' exist in the table - Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType)); + Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType, false)); Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()), String.format( "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', " + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable)); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java index 2512c78..9ed7a69 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java @@ -214,7 +214,8 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { // 1. The original segments can be deleted once the merged segments are successfully uploaded // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in // the middle - Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType)); + Set<String> segmentsForTable = + new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false)); List<String> segmentsToDelete = new ArrayList<>(); for (String lineageEntryId : segmentLineage.getLineageEntryIds()) { LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java index f98ece1..ddee58c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java @@ -85,7 +85,7 @@ public class PinotIngestionRestletResourceStatelessTest extends ControllerTest { public void testIngestEndpoint() throws Exception { - List<String> segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE); + List<String> segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE, false); Assert.assertEquals(segments.size(), 0); // ingest from file @@ -93,13 +93,13 @@ public class PinotIngestionRestletResourceStatelessTest extends ControllerTest { batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "csv"); batchConfigMap.put(String.format("%s.delimiter", BatchConfigProperties.RECORD_READER_PROP_PREFIX), "|"); sendHttpPost(_controllerRequestURLBuilder.forIngestFromFile(TABLE_NAME_WITH_TYPE, batchConfigMap)); - segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE); + segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE, false); Assert.assertEquals(segments.size(), 1); // ingest from URI sendHttpPost(_controllerRequestURLBuilder.forIngestFromURI(TABLE_NAME_WITH_TYPE, batchConfigMap, String.format("file://%s", _inputFile.getAbsolutePath()))); - segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE); + segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE, false); Assert.assertEquals(segments.size(), 2); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index ae524a6..faf594a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -464,7 +464,7 @@ public class PinotHelixResourceManagerTest { "downloadUrl"); } List<String> segmentsForTable = - ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, false); Assert.assertEquals(segmentsForTable.size(), 5); List<String> segmentsFrom = new ArrayList<>(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java index 13423aa..b77e211 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java @@ -114,7 +114,8 @@ public class SegmentLineageCleanupTest { ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_" + i), "downloadUrl"); } - Assert.assertEquals(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME).size(), 7); + Assert.assertEquals(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), + 7); long currentTimeInMillis = System.currentTimeMillis(); // Validate the case when the lineage entry state is 'IN_PROGRESS' @@ -126,7 +127,8 @@ public class SegmentLineageCleanupTest { .writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(), segmentLineage, -1); _retentionManager.processTable(OFFLINE_TABLE_NAME); waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 7); - List<String> segmentsForTable = ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME); + List<String> segmentsForTable = + ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); Assert.assertEquals(segmentsForTable.size(), 7); // Validate the case when the lineage entry state is 'COMPLETED' @@ -137,7 +139,7 @@ public class SegmentLineageCleanupTest { .writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(), segmentLineage, -1); _retentionManager.processTable(OFFLINE_TABLE_NAME); waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 5); - segmentsForTable = ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME); + segmentsForTable = ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); Assert.assertEquals(segmentsForTable.size(), 5); Assert.assertTrue(Collections.disjoint(segmentsForTable, Arrays.asList("segment_0", "segment_1"))); @@ -146,7 +148,7 @@ public class SegmentLineageCleanupTest { waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4); _retentionManager.processTable(OFFLINE_TABLE_NAME); waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4); - segmentsForTable = ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME); + segmentsForTable = ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); Assert.assertEquals(segmentsForTable.size(), 4); Assert.assertTrue(Collections.disjoint(segmentsForTable, Arrays.asList("segment_0", "segment_1", "merged_0"))); segmentLineage = @@ -163,7 +165,7 @@ public class SegmentLineageCleanupTest { .writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(), segmentLineage, -1); _retentionManager.processTable(OFFLINE_TABLE_NAME); waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 3); - segmentsForTable = ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME); + segmentsForTable = ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); Assert.assertEquals(segmentsForTable.size(), 3); Assert.assertTrue(Collections.disjoint(segmentsForTable, Arrays.asList("merged_1", "merged_2"))); segmentLineage = @@ -176,7 +178,7 @@ public class SegmentLineageCleanupTest { throws InterruptedException { long endTimeMs = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND; do { - if (ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType).size() + if (ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType, false).size() == expectedNumSegmentsAfterDelete) { return; } else { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java index 99c7e7b..f3df4c9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java @@ -152,7 +152,7 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest GrpcQueryClient queryClient = new GrpcQueryClient("localhost", CommonConstants.Server.DEFAULT_GRPC_PORT); String sql = "SELECT * FROM mytable_OFFLINE LIMIT 1000000"; BrokerRequest brokerRequest = new Pql2Compiler().compileToBrokerRequest(sql); - List<String> segments = _helixResourceManager.getSegmentsFor("mytable_OFFLINE"); + List<String> segments = _helixResourceManager.getSegmentsFor("mytable_OFFLINE", false); GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder().setSegments(segments); testNonStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build())); @@ -167,7 +167,7 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest public void testQueryingGrpcServer(String sql) throws Exception { GrpcQueryClient queryClient = new GrpcQueryClient("localhost", CommonConstants.Server.DEFAULT_GRPC_PORT); - List<String> segments = _helixResourceManager.getSegmentsFor("mytable_OFFLINE"); + List<String> segments = _helixResourceManager.getSegmentsFor("mytable_OFFLINE", false); GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder().setSegments(segments); DataTable dataTable = collectNonStreamingRequestResult(queryClient.submit(requestBuilder.setSql(sql).build())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org