This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 71e28a2 Wire EmptySegmentPruner to routing config (#8067) 71e28a2 is described below commit 71e28a2313a0e175e64398b195e488b0fd67d49b Author: Liang Mingqiang <mili...@linkedin.com> AuthorDate: Mon Jan 31 23:53:10 2022 -0800 Wire EmptySegmentPruner to routing config (#8067) * Remove EmptySegmentPruner from the default path and wire it to the config * fix unit test * rename isKinesisEnabled as needsEmptySegmentPruner and move to TableConfigUtils * add pinot-kinesis dependency to pinot-segment-local * hardcode kinesis in TableConfigUtils to avoid pulling pinot-kinesis module as dependency * Minor change on the comments --- .../segmentpruner/SegmentPrunerFactory.java | 28 +++++-- .../routing/segmentpruner/SegmentPrunerTest.java | 85 ++++++++++++++-------- .../segment/local/utils/TableConfigUtils.java | 57 ++++++++++++++- .../pinot/spi/config/table/RoutingConfig.java | 1 + 4 files changed, 131 insertions(+), 40 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java index 9837db3..b5adeba 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java @@ -24,6 +24,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -45,11 +46,14 @@ public class SegmentPrunerFactory { public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) { - RoutingConfig routingConfig = tableConfig.getRoutingConfig(); List<SegmentPruner> segmentPruners = new ArrayList<>(); - // Always prune out empty segments first - segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore)); + boolean needsEmptySegment = TableConfigUtils.needsEmptySegmentPruner(tableConfig); + if (needsEmptySegment) { + // Add EmptySegmentPruner if needed + segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore)); + } + RoutingConfig routingConfig = tableConfig.getRoutingConfig(); if (routingConfig != null) { List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes(); if (segmentPrunerTypes != null) { @@ -61,7 +65,6 @@ public class SegmentPrunerFactory { configuredSegmentPruners.add(partitionSegmentPruner); } } - if (RoutingConfig.TIME_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) { TimeSegmentPruner timeSegmentPruner = getTimeSegmentPruner(tableConfig, propertyStore); if (timeSegmentPruner != null) { @@ -69,13 +72,16 @@ public class SegmentPrunerFactory { } } } + // Sort all segment pruners in order of: empty -> time -> partition. We are trying to sort them in a this order + // for improving the performance, this order may not be the optimal case -- we need move the pruner that will + // potentially prune the most segments to front) segmentPruners.addAll(sortSegmentPruners(configuredSegmentPruners)); } else { // Handle legacy configs for backward-compatibility TableType tableType = tableConfig.getTableType(); String routingTableBuilderName = routingConfig.getRoutingTableBuilderName(); - if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING - .equalsIgnoreCase(routingTableBuilderName)) || (tableType == TableType.REALTIME + if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase( + routingTableBuilderName)) || (tableType == TableType.REALTIME && LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) { PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore); if (partitionSegmentPruner != null) { @@ -129,17 +135,23 @@ public class SegmentPrunerFactory { } private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner> pruners) { - // If there's multiple pruners, move time range pruners to the front。 + // If there's multiple pruners, always prune empty segments first. After that, pruned based on time range, and + // followed by partition pruners. // Partition pruner run time is proportional to input # of segments while time range pruner is not, // Prune based on time range first will have a smaller input size for partition pruners, so have better performance. List<SegmentPruner> sortedPruners = new ArrayList<>(); for (SegmentPruner pruner : pruners) { + if (pruner instanceof EmptySegmentPruner) { + sortedPruners.add(pruner); + } + } + for (SegmentPruner pruner : pruners) { if (pruner instanceof TimeSegmentPruner) { sortedPruners.add(pruner); } } for (SegmentPruner pruner : pruners) { - if (!(pruner instanceof TimeSegmentPruner)) { + if (pruner instanceof PartitionSegmentPruner) { sortedPruners.add(pruner); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java index c09705d..f06798f 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java @@ -48,9 +48,11 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; @@ -95,6 +97,10 @@ public class SegmentPrunerTest extends ControllerTest { private static final String SDF_QUERY_5 = "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530"; + // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we + // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. + private static final String KINESIS_STREAM_TYPE = "kinesis"; + private ZkClient _zkClient; private ZkHelixPropertyStore<ZNRecord> _propertyStore; @@ -121,63 +127,53 @@ public class SegmentPrunerTest extends ControllerTest { // Routing config is missing List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Segment pruner type is not configured RoutingConfig routingConfig = mock(RoutingConfig.class); when(tableConfig.getRoutingConfig()).thenReturn(routingConfig); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Segment partition config is missing when(routingConfig.getSegmentPrunerTypes()).thenReturn( Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Column partition config is missing Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>(); when(indexingConfig.getSegmentPartitionConfig()).thenReturn(new SegmentPartitionConfig(columnPartitionConfigMap)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Partition-aware segment pruner should be returned columnPartitionConfigMap.put(PARTITION_COLUMN, new ColumnPartitionConfig("Modulo", 5)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 2); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); - assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner); + assertEquals(segmentPruners.size(), 1); + assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner); // Do not allow multiple partition columns columnPartitionConfigMap.put("anotherPartitionColumn", new ColumnPartitionConfig("Modulo", 5)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Should be backward-compatible with legacy config columnPartitionConfigMap.remove("anotherPartitionColumn"); when(routingConfig.getSegmentPrunerTypes()).thenReturn(null); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE); when(routingConfig.getRoutingTableBuilderName()).thenReturn( SegmentPrunerFactory.LEGACY_PARTITION_AWARE_OFFLINE_ROUTING); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 2); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); - assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner); + assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner); when(tableConfig.getTableType()).thenReturn(TableType.REALTIME); when(routingConfig.getRoutingTableBuilderName()).thenReturn( SegmentPrunerFactory.LEGACY_PARTITION_AWARE_REALTIME_ROUTING); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 2); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); - assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner); + assertEquals(segmentPruners.size(), 1); + assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner); } @Test @@ -188,36 +184,63 @@ public class SegmentPrunerTest extends ControllerTest { // Routing config is missing List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Segment pruner type is not configured RoutingConfig routingConfig = mock(RoutingConfig.class); when(tableConfig.getRoutingConfig()).thenReturn(routingConfig); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Validation config is missing when(routingConfig.getSegmentPrunerTypes()).thenReturn( Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Time column is missing SegmentsValidationAndRetentionConfig validationConfig = mock(SegmentsValidationAndRetentionConfig.class); when(tableConfig.getValidationConfig()).thenReturn(validationConfig); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 1); - assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + assertEquals(segmentPruners.size(), 0); // Time range pruner should be returned when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - assertEquals(segmentPruners.size(), 2); + assertEquals(segmentPruners.size(), 1); + assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner); + } + + @Test + public void testEnablingEmptySegmentPruner() { + TableConfig tableConfig = mock(TableConfig.class); + IndexingConfig indexingConfig = mock(IndexingConfig.class); + RoutingConfig routingConfig = mock(RoutingConfig.class); + StreamIngestionConfig streamIngestionConfig = mock(StreamIngestionConfig.class); + + // When routingConfig is configured with EmptySegmentPruner, EmptySegmentPruner should be returned. + when(tableConfig.getRoutingConfig()).thenReturn(routingConfig); + when(routingConfig.getSegmentPrunerTypes()).thenReturn( + Collections.singletonList(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE)); + List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + assertEquals(segmentPruners.size(), 1); + assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + + // When indexingConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned. + when(indexingConfig.getStreamConfigs()).thenReturn( + Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)); + segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + assertEquals(segmentPruners.size(), 1); + assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); + + // When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned. + when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList( + Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE))); + when(indexingConfig.getStreamConfigs()).thenReturn( + Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)); + segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); + assertEquals(segmentPruners.size(), 1); assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); - assertTrue(segmentPruners.get(1) instanceof TimeSegmentPruner); } @DataProvider diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 60f6bda..d5cc083 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -53,12 +53,14 @@ import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.ingestion.batch.BatchConfig; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -83,6 +85,10 @@ public final class TableConfigUtils { // supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}. private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask"; + // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we + // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. + private static final String KINESIS_STREAM_TYPE = "kinesis"; + /** * @see TableConfigUtils#validate(TableConfig, Schema, String) */ @@ -107,7 +113,7 @@ public final class TableConfigUtils { } // Sanitize the table config before validation sanitize(tableConfig); - // skip all validation if skip type ALL is selected. + // skip all validation if skip type ALL is selected. if (!skipTypes.contains(ValidationType.ALL)) { validateValidationConfig(tableConfig, schema); validateIngestionConfig(tableConfig, schema); @@ -857,4 +863,53 @@ public final class TableConfigUtils { public enum ValidationType { ALL, TASK, UPSERT } + + /** + * needsEmptySegmentPruner checks if EmptySegmentPruner is needed for a TableConfig. + * @param tableConfig Input table config. + */ + public static boolean needsEmptySegmentPruner(TableConfig tableConfig) { + if (isKinesisConfigured(tableConfig)) { + return true; + } + RoutingConfig routingConfig = tableConfig.getRoutingConfig(); + if (routingConfig == null) { + return false; + } + List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes(); + if (segmentPrunerTypes == null || segmentPrunerTypes.isEmpty()) { + return false; + } + for (String segmentPrunerType : segmentPrunerTypes) { + if (RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) { + return true; + } + } + return false; + } + + private static boolean isKinesisConfigured(TableConfig tableConfig) { + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + if (indexingConfig != null) { + Map<String, String> streamConfig = indexingConfig.getStreamConfigs(); + if (streamConfig != null && KINESIS_STREAM_TYPE.equals( + streamConfig.get(StreamConfigProperties.STREAM_TYPE))) { + return true; + } + } + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig == null) { + return false; + } + StreamIngestionConfig streamIngestionConfig = ingestionConfig.getStreamIngestionConfig(); + if (streamIngestionConfig == null) { + return false; + } + for (Map<String, String> config : streamIngestionConfig.getStreamConfigMaps()) { + if (config != null && KINESIS_STREAM_TYPE.equals(config.get(StreamConfigProperties.STREAM_TYPE))) { + return true; + } + } + return false; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java index 4a9127f..2c238aa 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java @@ -28,6 +28,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig; public class RoutingConfig extends BaseJsonConfig { public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition"; public static final String TIME_SEGMENT_PRUNER_TYPE = "time"; + public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty"; public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "replicaGroup"; public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "strictReplicaGroup"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org