This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ae02ecefa4 Ingestion Aggregation Feature (#8611) ae02ecefa4 is described below commit ae02ecefa4b4367fc3547fa5bfb718d63b45b477 Author: noon-stripe <89272166+noon-str...@users.noreply.github.com> AuthorDate: Tue May 17 20:09:14 2022 -0700 Ingestion Aggregation Feature (#8611) This feature aggregates values at ingestion time, which reduces the number of rows stores (thus speeding up queries), using the same strategy as the 'aggregateMetrics' feature. This expands the feature to include, COUNT, MIN, and MAX, with the ability to expand further. --- .../common/utils/config/TableConfigUtils.java | 2 +- .../common/utils/config/TableConfigSerDeTest.java | 5 +- .../connector/flink/http/PinotConnectionUtils.java | 6 +- .../flink/sink/PinotSinkIntegrationTest.java | 2 +- .../pinot/controller/util/FileIngestionHelper.java | 2 +- .../helix/core/PinotHelixResourceManagerTest.java | 2 +- .../core/retention/SegmentLineageCleanupTest.java | 2 +- .../realtime/LLRealtimeSegmentDataManager.java | 4 +- .../apache/pinot/core/util/SchemaUtilsTest.java | 7 +- .../IngestionConfigHybridIntegrationTest.java | 3 +- .../tests/JsonPathClusterIntegrationTest.java | 2 +- .../tests/MapTypeClusterIntegrationTest.java | 2 +- .../tests/OfflineClusterIntegrationTest.java | 2 +- .../SegmentWriterUploaderIntegrationTest.java | 2 +- .../preprocess/DataPreprocessingHelperTest.java | 2 +- .../mergerollup/MergeRollupTaskGeneratorTest.java | 2 +- .../RealtimeToOfflineSegmentsTaskExecutorTest.java | 7 +- .../filebased/FileBasedSegmentWriterTest.java | 20 +- .../indexsegment/mutable/MutableSegmentImpl.java | 245 +++++++++++++++++---- .../local/realtime/impl/RealtimeSegmentConfig.java | 18 +- .../pinot/segment/local/utils/IngestionUtils.java | 11 + .../segment/local/utils/TableConfigUtils.java | 87 +++++++- ...MutableSegmentImplIngestionAggregationTest.java | 230 +++++++++++++++++++ .../mutable/MutableSegmentImplTestUtils.java | 24 +- .../ExpressionTransformerTest.java | 16 +- .../recordtransformer/RecordTransformerTest.java | 40 ++-- .../SegmentGenerationWithFilterRecordsTest.java | 3 +- .../index/loader/SegmentPreProcessorTest.java | 6 +- .../segment/local/utils/IngestionUtilsTest.java | 43 +++- .../segment/local/utils/TableConfigUtilsTest.java | 242 +++++++++++++++++--- .../config/table/ingestion/AggregationConfig.java | 49 +++++ .../config/table/ingestion/IngestionConfig.java | 17 +- .../pinot/spi/utils/IngestionConfigUtils.java | 16 +- .../spi/utils/builder/TableConfigBuilder.java | 7 + .../pinot/spi/utils/IngestionConfigUtilsTest.java | 10 +- 35 files changed, 973 insertions(+), 165 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java index 44e0ac7f9a..e515249761 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java @@ -268,7 +268,7 @@ public class TableConfigUtils { if (ingestionConfig == null) { if (batchIngestionConfig != null || streamIngestionConfig != null) { - ingestionConfig = new IngestionConfig(batchIngestionConfig, streamIngestionConfig, null, null, null); + ingestionConfig = new IngestionConfig(batchIngestionConfig, streamIngestionConfig, null, null, null, null); } } else { if (batchIngestionConfig != null) { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index 1a2e39531d..05f12d04a0 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -47,6 +47,7 @@ import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; @@ -267,6 +268,8 @@ public class TableConfigSerDeTest { } { // With ingestion config + List<AggregationConfig> aggregationConfigs = Lists.newArrayList(new AggregationConfig("SUM__bar", "SUM(bar)"), + new AggregationConfig("MIN_foo", "MIN(foo)")); List<TransformConfig> transformConfigs = Lists.newArrayList(new TransformConfig("bar", "func(moo)"), new TransformConfig("zoo", "myfunc()")); Map<String, String> batchConfigMap = new HashMap<>(); @@ -283,7 +286,7 @@ public class TableConfigSerDeTest { new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, "APPEND", "HOURLY"), new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), transformConfigs, new ComplexTypeConfig(fieldsToUnnest, ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, - prefixesToRename)); + prefixesToRename), aggregationConfigs); TableConfig tableConfig = tableConfigBuilder.setIngestionConfig(ingestionConfig).build(); checkIngestionConfig(tableConfig); diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java index cdefa67c83..500b8fed26 100644 --- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java +++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java @@ -66,14 +66,14 @@ public final class PinotConnectionUtils { if (ingestionConfig == null) { tableConfig.setIngestionConfig( new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND", "HOURLY"), - null, null, null, null)); + null, null, null, null, null)); return tableConfig; } if (ingestionConfig.getBatchIngestionConfig() == null) { tableConfig.setIngestionConfig( new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND", "HOURLY"), null, ingestionConfig.getFilterConfig(), ingestionConfig.getTransformConfigs(), - ingestionConfig.getComplexTypeConfig())); + ingestionConfig.getComplexTypeConfig(), ingestionConfig.getAggregationConfigs())); return tableConfig; } @@ -86,7 +86,7 @@ public final class PinotConnectionUtils { new BatchIngestionConfig(batchConfigMaps, ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(), ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null, ingestionConfig.getFilterConfig(), ingestionConfig.getTransformConfigs(), - ingestionConfig.getComplexTypeConfig())); + ingestionConfig.getComplexTypeConfig(), ingestionConfig.getAggregationConfigs())); return tableConfig; } diff --git a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java index 6b340a7918..a9a9086716 100644 --- a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java +++ b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java @@ -81,7 +81,7 @@ public class PinotSinkIntegrationTest extends BaseClusterIntegrationTest { batchConfigs.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _controllerBaseApiUrl); IngestionConfig ingestionConfig = new IngestionConfig(new BatchIngestionConfig(Collections.singletonList(batchConfigs), "APPEND", "HOURLY"), null, - null, null, null); + null, null, null, null); _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig) .build(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java index 961240edbb..1b1238c4a2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java @@ -148,7 +148,7 @@ public class FileIngestionHelper { // Upload segment IngestionConfig ingestionConfigOverride = - new IngestionConfig(batchIngestionConfigOverride, null, null, null, null); + new IngestionConfig(batchIngestionConfigOverride, null, null, null, null, null); TableConfig tableConfigOverride = new TableConfigBuilder(_tableConfig.getTableType()).setTableName(_tableConfig.getTableName()) .setIngestionConfig(ingestionConfigOverride).build(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index 4cb3d5c89d..f00b717c8d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -789,7 +789,7 @@ public class PinotHelixResourceManagerTest { new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME) .setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME) .setIngestionConfig( - new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY"), null, null, null, null)) + new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY"), null, null, null, null, null)) .build(); ControllerTestUtils.getHelixResourceManager().addTable(tableConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java index 4a8cb0a8f8..a54515269e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java @@ -100,7 +100,7 @@ public class SegmentLineageCleanupTest { ControllerTestUtils.getHelixResourceManager().addTable(tableConfig); IngestionConfig ingestionConfig = - new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY"), null, null, null, null); + new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY"), null, null, null, null, null); TableConfig refreshTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME) .setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1) .setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 48fa30e959..31cf25a55b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1310,7 +1310,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata) .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager) .setStatsHistory(realtimeTableDataManager.getStatsHistory()) - .setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled) + .setAggregateMetrics(indexingConfig.isAggregateMetrics()) + .setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig)) + .setNullHandlingEnabled(_nullHandlingEnabled) .setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode()) .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager) .setHashFunction(tableConfig.getHashFunction()) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java index 36d9eb29b2..af3d226708 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java @@ -93,7 +93,7 @@ public class SchemaUtilsTest { schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("colA", "round(colB, 1000)")), - null)).build(); + null, null)).build(); try { SchemaUtils.validate(schema, Lists.newArrayList(tableConfig)); Assert.fail("Should fail schema validation, as colA is not present in schema"); @@ -140,9 +140,8 @@ public class SchemaUtilsTest { schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS").build(); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN) - .setIngestionConfig( - new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("colA", "round(colB, 1000)")), - null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, + Lists.newArrayList(new TransformConfig("colA", "round(colB, 1000)")), null, null)).build(); try { SchemaUtils.validate(schema, Lists.newArrayList(tableConfig)); Assert.fail("Should fail schema validation, as colA is not present in schema"); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java index 5378425c4e..9fdd12e940 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java @@ -69,7 +69,8 @@ public class IngestionConfigHybridIntegrationTest extends BaseClusterIntegration List<Map<String, String>> streamConfigMaps = new ArrayList<>(); streamConfigMaps.add(getStreamConfigMap()); - return new IngestionConfig(null, new StreamIngestionConfig(streamConfigMaps), filterConfig, transformConfigs, null); + return new IngestionConfig(null, new StreamIngestionConfig(streamConfigMaps), filterConfig, transformConfigs, null, + null); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java index ba5bcd30eb..065cca63de 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java @@ -95,7 +95,7 @@ public class JsonPathClusterIntegrationTest extends BaseClusterIntegrationTest { new TransformConfig(COMPLEX_MAP_STR_K3_FIELD_NAME, "jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')")); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName) - .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null, null)).build(); addTableConfig(tableConfig); // Create and upload segments diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java index 02e7a73964..726572a7ca 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java @@ -84,7 +84,7 @@ public class MapTypeClusterIntegrationTest extends BaseClusterIntegrationTest { new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + STRING_KEY_MAP_FIELD_NAME + ")"), new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + INT_KEY_MAP_FIELD_NAME + ")")); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName) - .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null, null)).build(); addTableConfig(tableConfig); // Create and upload segments diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 30873869e5..df672f20e8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -948,7 +948,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "times(DaysSinceEpoch, 24)"), new TransformConfig("NewAddedDerivedSecondsSinceEpoch", "times(times(DaysSinceEpoch, 24), 3600)"), - new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')")), null)); + new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')")), null, null)); updateTableConfig(tableConfig); // Trigger reload diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java index 28f538ed08..fa67d8c62a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java @@ -90,7 +90,7 @@ public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegration batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false"); batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _controllerBaseApiUrl); return new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), null, - null, null, null); + null, null, null, null); } /** diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java index 2f97f720f8..9cbb840997 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java @@ -55,7 +55,7 @@ public class DataPreprocessingHelperTest { DataPreprocessingHelper dataPreprocessingHelper = new AvroDataPreprocessingHelper(inputPaths, outputPath); BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(null, "APPEND", "DAILY"); - IngestionConfig ingestionConfig = new IngestionConfig(batchIngestionConfig, null, null, null, null); + IngestionConfig ingestionConfig = new IngestionConfig(batchIngestionConfig, null, null, null, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTableName").setIngestionConfig(ingestionConfig) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java index 5e503729fc..1fbfcc59d1 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java @@ -97,7 +97,7 @@ public class MergeRollupTaskGeneratorTest { // Skip task generation, if REFRESH table IngestionConfig ingestionConfig = - new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", null), null, null, null, null); + new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", null), null, null, null, null, null); offlineTableConfig = getOfflineTableConfig(new HashMap<>()); offlineTableConfig.setIngestionConfig(ingestionConfig); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java index 4fcdbf6824..be3118a51b 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java @@ -100,13 +100,12 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest { .setSortedColumn(D1).build(); TableConfig tableConfigEpochHours = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX) - .setSortedColumn(D1).setIngestionConfig( - new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig(T_TRX, "toEpochHours(t)")), - null)).build(); + .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null, null, null, + Lists.newArrayList(new TransformConfig(T_TRX, "toEpochHours(t)")), null, null)).build(); TableConfig tableConfigSDF = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX) .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null, null, null, - Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')")), null)).build(); + Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')")), null, null)).build(); Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING) .addMetric(M1, FieldSpec.DataType.INT) diff --git a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java index 73833787e2..f48eb2195a 100644 --- a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java +++ b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java @@ -76,7 +76,7 @@ public class FileBasedSegmentWriterTest { batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, _outputDir.getAbsolutePath()); _ingestionConfig = new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), null, - null, transformConfigs, null); + null, transformConfigs, null, null); _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(_ingestionConfig) .setTimeColumnName(TIME_COLUMN_NAME).build(); @@ -112,7 +112,7 @@ public class FileBasedSegmentWriterTest { // expected } - tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, null, null)); + tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, null, null, null)); try { segmentWriter.init(tableConfig, _schema); Assert.fail("Should fail due to missing batchIngestionConfig"); @@ -121,7 +121,7 @@ public class FileBasedSegmentWriterTest { } tableConfig.setIngestionConfig( - new IngestionConfig(new BatchIngestionConfig(null, "APPEND", "HOURLY"), null, null, null, null)); + new IngestionConfig(new BatchIngestionConfig(null, "APPEND", "HOURLY"), null, null, null, null, null)); try { segmentWriter.init(tableConfig, _schema); Assert.fail("Should fail due to missing batchConfigMaps"); @@ -131,7 +131,7 @@ public class FileBasedSegmentWriterTest { tableConfig.setIngestionConfig( new IngestionConfig(new BatchIngestionConfig(Collections.emptyList(), "APPEND", "HOURLY"), null, null, null, - null)); + null, null)); try { segmentWriter.init(tableConfig, _schema); Assert.fail("Should fail due to missing batchConfigMaps"); @@ -141,7 +141,7 @@ public class FileBasedSegmentWriterTest { tableConfig.setIngestionConfig( new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(Collections.emptyMap()), "APPEND", "HOURLY"), - null, null, null, null)); + null, null, null, null, null)); try { segmentWriter.init(tableConfig, _schema); Assert.fail("Should fail due to missing outputDirURI in batchConfigMap"); @@ -153,7 +153,7 @@ public class FileBasedSegmentWriterTest { batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, _outputDir.getAbsolutePath()); tableConfig.setIngestionConfig( new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), null, - null, null, null)); + null, null, null, null)); segmentWriter.init(tableConfig, _schema); segmentWriter.close(); } @@ -255,7 +255,7 @@ public class FileBasedSegmentWriterTest { .setIngestionConfig(new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null, null, - _ingestionConfig.getTransformConfigs(), null)).build(); + _ingestionConfig.getTransformConfigs(), null, null)).build(); SegmentWriter segmentWriter = new FileBasedSegmentWriter(); segmentWriter.init(tableConfig, _schema); @@ -281,7 +281,7 @@ public class FileBasedSegmentWriterTest { new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null, null, - _ingestionConfig.getTransformConfigs(), null)); + _ingestionConfig.getTransformConfigs(), null, null)); segmentWriter.init(tableConfig, _schema); // write 2 records @@ -305,7 +305,7 @@ public class FileBasedSegmentWriterTest { new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null, null, - _ingestionConfig.getTransformConfigs(), null)); + _ingestionConfig.getTransformConfigs(), null, null)); segmentWriter.init(tableConfig, _schema); // write 2 records @@ -338,7 +338,7 @@ public class FileBasedSegmentWriterTest { .setIngestionConfig(new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(), _ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null, null, - _ingestionConfig.getTransformConfigs(), null)).build(); + _ingestionConfig.getTransformConfigs(), null, null)).build(); segmentWriter.init(tableConfig, _schema); // write 3 records with same timestamp diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index ecab67ed5d..a27be0b2f7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -34,9 +34,16 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary; @@ -56,6 +63,8 @@ import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.local.utils.IdMap; import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.datasource.DataSource; @@ -79,6 +88,7 @@ import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -127,7 +137,6 @@ public class MutableSegmentImpl implements MutableSegment { private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>(); private final IdMap<FixedIntArray> _recordIdMap; - private boolean _aggregateMetrics; private volatile int _numDocsIndexed = 0; private final int _numKeyColumns; @@ -196,7 +205,6 @@ public class MutableSegmentImpl implements MutableSegment { _partitionColumn = config.getPartitionColumn(); _partitionFunction = config.getPartitionFunction(); _nullHandlingEnabled = config.isNullHandlingEnabled(); - _aggregateMetrics = config.aggregateMetrics(); Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs(); List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size()); @@ -237,6 +245,15 @@ public class MutableSegmentImpl implements MutableSegment { int avgNumMultiValues = config.getAvgNumMultiValues(); + // Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary, + // and no metrics have dictionary. If not enabled, the map returned is null. + _recordIdMap = enableMetricsAggregationIfPossible(config, noDictionaryColumns); + + Map<String, Pair<String, ValueAggregator>> metricsAggregators = Collections.emptyMap(); + if (_recordIdMap != null) { + metricsAggregators = getMetricsAggregators(config); + } + // Initialize for each column for (FieldSpec fieldSpec : _physicalFieldSpecs) { String column = fieldSpec.getName(); @@ -336,10 +353,16 @@ public class MutableSegmentImpl implements MutableSegment { // Null value vector MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new MutableNullValueVector() : null; + Pair<String, ValueAggregator> columnAggregatorPair = + metricsAggregators.getOrDefault(column, Pair.of(column, null)); + String sourceColumn = columnAggregatorPair.getLeft(); + ValueAggregator valueAggregator = columnAggregatorPair.getRight(); + // TODO: Support range index and bloom filter for mutable segment _indexContainerMap.put(column, new IndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex, dictionary, - invertedIndexReader, null, textIndex, jsonIndex, h3Index, null, nullValueVector)); + invertedIndexReader, null, textIndex, jsonIndex, h3Index, null, nullValueVector, sourceColumn, + valueAggregator)); } // TODO separate concerns: this logic does not belong here @@ -349,14 +372,11 @@ public class MutableSegmentImpl implements MutableSegment { realtimeLuceneIndexRefreshState.addRealtimeReadersToQueue(_realtimeLuceneReaders); } - // Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary, - // and no metrics have dictionary. If not enabled, the map returned is null. - _recordIdMap = enableMetricsAggregationIfPossible(config, noDictionaryColumns); - // init upsert-related data structure _upsertMode = config.getUpsertMode(); if (isUpsertEnabled()) { - Preconditions.checkState(!_aggregateMetrics, "Metrics aggregation and upsert cannot be enabled together"); + Preconditions.checkState(!isAggregateMetricsEnabled(), + "Metrics aggregation and upsert cannot be enabled together"); _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager(); _validDocIds = new ThreadSafeMutableRoaringBitmap(); String upsertComparisonColumn = config.getUpsertComparisonColumn(); @@ -390,7 +410,8 @@ public class MutableSegmentImpl implements MutableSegment { // of noDict on STRING/BYTES. Without metrics aggregation, memory pressure increases. // So to continue aggregating metrics for such cases, we will create dictionary even // if the column is part of noDictionary set from table config - if (fieldSpec instanceof DimensionFieldSpec && _aggregateMetrics && (dataType == STRING || dataType == BYTES)) { + if (fieldSpec instanceof DimensionFieldSpec && isAggregateMetricsEnabled() && (dataType == STRING + || dataType == BYTES)) { _logger.info( "Aggregate metrics is enabled. Will create dictionary in consuming segment for column {} of type {}", column, dataType.toString()); @@ -478,7 +499,7 @@ public class MutableSegmentImpl implements MutableSegment { // Update number of documents indexed at last to make the latest row queryable canTakeMore = numDocsIndexed++ < _capacity; } else { - assert _aggregateMetrics; + assert isAggregateMetricsEnabled(); aggregateMetrics(row, docId); canTakeMore = true; } @@ -508,13 +529,16 @@ public class MutableSegmentImpl implements MutableSegment { private void updateDictionary(GenericRow row) { for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) { - String column = entry.getKey(); IndexContainer indexContainer = entry.getValue(); - Object value = row.getValue(column); MutableDictionary dictionary = indexContainer._dictionary; + if (dictionary == null) { + continue; + } + + Object value = row.getValue(entry.getKey()); if (value == null) { recordIndexingError("DICTIONARY"); - } else if (dictionary != null) { + } else { if (indexContainer._fieldSpec.isSingleValueField()) { indexContainer._dictId = dictionary.index(value); } else { @@ -533,6 +557,38 @@ public class MutableSegmentImpl implements MutableSegment { String column = entry.getKey(); IndexContainer indexContainer = entry.getValue(); + // aggregate metrics is enabled. + if (indexContainer._valueAggregator != null) { + Object value = row.getValue(indexContainer._sourceColumn); + + // Update numValues info + indexContainer._numValuesInfo.updateSVEntry(); + + MutableForwardIndex forwardIndex = indexContainer._forwardIndex; + FieldSpec fieldSpec = indexContainer._fieldSpec; + + DataType dataType = fieldSpec.getDataType(); + value = indexContainer._valueAggregator.getInitialAggregatedValue(value); + switch (dataType.getStoredType()) { + case INT: + forwardIndex.setInt(docId, ((Number) value).intValue()); + break; + case LONG: + forwardIndex.setLong(docId, ((Number) value).longValue()); + break; + case FLOAT: + forwardIndex.setFloat(docId, ((Number) value).floatValue()); + break; + case DOUBLE: + forwardIndex.setDouble(docId, ((Number) value).doubleValue()); + break; + default: + throw new UnsupportedOperationException( + "Unsupported data type: " + dataType + " for aggregation: " + column); + } + continue; + } + // Update the null value vector even if a null value is somehow produced if (_nullHandlingEnabled && row.isNullValue(column)) { indexContainer._nullValueVector.setNull(docId); @@ -612,7 +668,7 @@ public class MutableSegmentImpl implements MutableSegment { // Update min/max value from raw value // NOTE: Skip updating min/max value for aggregated metrics because the value will change over time. - if (!_aggregateMetrics || fieldSpec.getFieldType() != FieldSpec.FieldType.METRIC) { + if (!isAggregateMetricsEnabled() || fieldSpec.getFieldType() != FieldSpec.FieldType.METRIC) { Comparable comparable; if (dataType == BYTES) { comparable = new ByteArray((byte[]) value); @@ -706,26 +762,75 @@ public class MutableSegmentImpl implements MutableSegment { private void aggregateMetrics(GenericRow row, int docId) { for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) { - String column = metricFieldSpec.getName(); - Object value = row.getValue(column); - MutableForwardIndex forwardIndex = _indexContainerMap.get(column)._forwardIndex; + IndexContainer indexContainer = _indexContainerMap.get(metricFieldSpec.getName()); + Object value = row.getValue(indexContainer._sourceColumn); + MutableForwardIndex forwardIndex = indexContainer._forwardIndex; DataType dataType = metricFieldSpec.getDataType(); - switch (dataType) { - case INT: - forwardIndex.setInt(docId, (Integer) value + forwardIndex.getInt(docId)); + + Double oldDoubleValue; + Double newDoubleValue; + Long oldLongValue; + Long newLongValue; + ValueAggregator valueAggregator = indexContainer._valueAggregator; + switch (valueAggregator.getAggregatedValueType()) { + case DOUBLE: + switch (dataType) { + case INT: + oldDoubleValue = ((Integer) forwardIndex.getInt(docId)).doubleValue(); + newDoubleValue = (Double) valueAggregator.applyRawValue(oldDoubleValue, value); + forwardIndex.setInt(docId, newDoubleValue.intValue()); + break; + case LONG: + oldDoubleValue = ((Long) forwardIndex.getLong(docId)).doubleValue(); + newDoubleValue = (Double) valueAggregator.applyRawValue(oldDoubleValue, value); + forwardIndex.setLong(docId, newDoubleValue.longValue()); + break; + case FLOAT: + oldDoubleValue = ((Float) forwardIndex.getFloat(docId)).doubleValue(); + newDoubleValue = (Double) valueAggregator.applyRawValue(oldDoubleValue, value); + forwardIndex.setFloat(docId, newDoubleValue.floatValue()); + break; + case DOUBLE: + oldDoubleValue = forwardIndex.getDouble(docId); + newDoubleValue = (Double) valueAggregator.applyRawValue(oldDoubleValue, value); + forwardIndex.setDouble(docId, newDoubleValue); + break; + default: + throw new UnsupportedOperationException(String.format("Aggregation type %s of %s not supported for %s", + valueAggregator.getAggregatedValueType(), valueAggregator.getAggregationType(), dataType)); + } break; case LONG: - forwardIndex.setLong(docId, (Long) value + forwardIndex.getLong(docId)); - break; - case FLOAT: - forwardIndex.setFloat(docId, (Float) value + forwardIndex.getFloat(docId)); - break; - case DOUBLE: - forwardIndex.setDouble(docId, (Double) value + forwardIndex.getDouble(docId)); + switch (dataType) { + case INT: + oldLongValue = ((Integer) forwardIndex.getInt(docId)).longValue(); + newLongValue = (Long) valueAggregator.applyRawValue(oldLongValue, value); + forwardIndex.setInt(docId, newLongValue.intValue()); + break; + case LONG: + oldLongValue = forwardIndex.getLong(docId); + newLongValue = (Long) valueAggregator.applyRawValue(oldLongValue, value); + forwardIndex.setLong(docId, newLongValue); + break; + case FLOAT: + oldLongValue = ((Float) forwardIndex.getFloat(docId)).longValue(); + newLongValue = (Long) valueAggregator.applyRawValue(oldLongValue, value); + forwardIndex.setFloat(docId, newLongValue.floatValue()); + break; + case DOUBLE: + oldLongValue = ((Double) forwardIndex.getDouble(docId)).longValue(); + newLongValue = (Long) valueAggregator.applyRawValue(oldLongValue, value); + forwardIndex.setDouble(docId, newLongValue.doubleValue()); + break; + default: + throw new UnsupportedOperationException(String.format("Aggregation type %s of %s not supported for %s", + valueAggregator.getAggregatedValueType(), valueAggregator.getAggregationType(), dataType)); + } break; default: throw new UnsupportedOperationException( - "Unsupported data type: " + dataType + " for aggregate metric column: " + column); + String.format("Aggregation type %s of %s not supported for %s", valueAggregator.getAggregatedValueType(), + valueAggregator.getAggregationType(), dataType)); } } } @@ -969,7 +1074,7 @@ public class MutableSegmentImpl implements MutableSegment { * * */ private int getOrCreateDocId() { - if (!_aggregateMetrics) { + if (!isAggregateMetricsEnabled()) { return _numDocsIndexed; } @@ -1005,7 +1110,7 @@ public class MutableSegmentImpl implements MutableSegment { */ private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentConfig config, Set<String> noDictionaryColumns) { - if (!_aggregateMetrics) { + if (!config.aggregateMetrics() && CollectionUtils.isEmpty(config.getIngestionAggregationConfigs())) { _logger.info("Metrics aggregation is disabled."); return null; } @@ -1017,15 +1122,13 @@ public class MutableSegmentImpl implements MutableSegment { if (!noDictionaryColumns.contains(metric)) { _logger.warn("Metrics aggregation cannot be turned ON in presence of dictionary encoded metrics, eg: {}", metric); - _aggregateMetrics = false; - break; + return null; } if (!fieldSpec.isSingleValueField()) { _logger.warn("Metrics aggregation cannot be turned ON in presence of multi-value metric columns, eg: {}", metric); - _aggregateMetrics = false; - break; + return null; } } @@ -1036,15 +1139,13 @@ public class MutableSegmentImpl implements MutableSegment { if (noDictionaryColumns.contains(dimension)) { _logger.warn("Metrics aggregation cannot be turned ON in presence of no-dictionary dimensions, eg: {}", dimension); - _aggregateMetrics = false; - break; + return null; } if (!fieldSpec.isSingleValueField()) { _logger.warn("Metrics aggregation cannot be turned ON in presence of multi-value dimension columns, eg: {}", dimension); - _aggregateMetrics = false; - break; + return null; } } @@ -1054,15 +1155,10 @@ public class MutableSegmentImpl implements MutableSegment { _logger.warn( "Metrics aggregation cannot be turned ON in presence of no-dictionary datetime/time columns, eg: {}", timeColumnName); - _aggregateMetrics = false; - break; + return null; } } - if (!_aggregateMetrics) { - return null; - } - int estimatedRowsToIndex; if (_statsHistory.isEmpty()) { // Choose estimated rows to index as maxNumRowsPerSegment / EXPECTED_COMPRESSION (1000, to be conservative in @@ -1082,6 +1178,10 @@ public class MutableSegmentImpl implements MutableSegment { RECORD_ID_MAP); } + private boolean isAggregateMetricsEnabled() { + return _recordIdMap != null; + } + // NOTE: Okay for single-writer @SuppressWarnings("NonAtomicOperationOnVolatileField") private static class NumValuesInfo { @@ -1098,6 +1198,56 @@ public class MutableSegmentImpl implements MutableSegment { } } + private static Map<String, Pair<String, ValueAggregator>> getMetricsAggregators(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig.aggregateMetrics()) { + return fromAggregateMetrics(segmentConfig); + } else if (!CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) { + return fromAggregationConfig(segmentConfig); + } else { + return Collections.emptyMap(); + } + } + + private static Map<String, Pair<String, ValueAggregator>> fromAggregateMetrics(RealtimeSegmentConfig segmentConfig) { + Preconditions.checkState(CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs()), + "aggregateMetrics cannot be enabled if AggregationConfig is set"); + + Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>(); + for (String metricName : segmentConfig.getSchema().getMetricNames()) { + columnNameToAggregator.put(metricName, + Pair.of(metricName, ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM))); + } + return columnNameToAggregator; + } + + private static Map<String, Pair<String, ValueAggregator>> fromAggregationConfig(RealtimeSegmentConfig segmentConfig) { + Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>(); + + Preconditions.checkState(!segmentConfig.aggregateMetrics(), + "aggregateMetrics cannot be enabled if AggregationConfig is set"); + for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) { + ExpressionContext expressionContext = RequestContextUtils.getExpression(config.getAggregationFunction()); + // validation is also done when the table is created, this is just a sanity check. + Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION, + "aggregation function must be a function: %s", config); + FunctionContext functionContext = expressionContext.getFunction(); + TableConfigUtils.validateIngestionAggregation(functionContext.getFunctionName()); + Preconditions.checkState(functionContext.getArguments().size() == 1, + "aggregation function can only have one argument: %s", config); + ExpressionContext argument = functionContext.getArguments().get(0); + Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER, + "aggregator function argument must be a identifier: %s", config); + + AggregationFunctionType functionType = + AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName()); + + columnNameToAggregator.put(config.getColumnName(), + Pair.of(argument.getLiteral(), ValueAggregatorFactory.getValueAggregator(functionType))); + } + + return columnNameToAggregator; + } + private class IndexContainer implements Closeable { final FieldSpec _fieldSpec; final PartitionFunction _partitionFunction; @@ -1112,6 +1262,8 @@ public class MutableSegmentImpl implements MutableSegment { final MutableJsonIndex _jsonIndex; final BloomFilterReader _bloomFilter; final MutableNullValueVector _nullValueVector; + final String _sourceColumn; + final ValueAggregator _valueAggregator; volatile Comparable _minValue; volatile Comparable _maxValue; @@ -1125,7 +1277,8 @@ public class MutableSegmentImpl implements MutableSegment { @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex invertedIndex, @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex textIndex, @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index h3Index, @Nullable BloomFilterReader bloomFilter, - @Nullable MutableNullValueVector nullValueVector) { + @Nullable MutableNullValueVector nullValueVector, @Nullable String sourceColumn, + @Nullable ValueAggregator valueAggregator) { _fieldSpec = fieldSpec; _partitionFunction = partitionFunction; _partitions = partitions; @@ -1140,6 +1293,8 @@ public class MutableSegmentImpl implements MutableSegment { _jsonIndex = jsonIndex; _bloomFilter = bloomFilter; _nullValueVector = nullValueVector; + _sourceColumn = sourceColumn; + _valueAggregator = valueAggregator; } DataSource toDataSource() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java index 5ac3189ea0..dcc89db811 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java @@ -31,6 +31,7 @@ import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.data.Schema; @@ -64,6 +65,7 @@ public class RealtimeSegmentConfig { private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private final String _consumerDir; private final List<FieldConfig> _fieldConfigList; + private final List<AggregationConfig> _ingestionAggregationConfigs; // TODO: Clean up this constructor. Most of these things can be extracted from tableConfig. private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema, @@ -74,7 +76,8 @@ public class RealtimeSegmentConfig { RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir, UpsertConfig.Mode upsertMode, String upsertComparisonColumn, UpsertConfig.HashFunction hashFunction, - PartitionUpsertMetadataManager partitionUpsertMetadataManager, List<FieldConfig> fieldConfigList) { + PartitionUpsertMetadataManager partitionUpsertMetadataManager, List<FieldConfig> fieldConfigList, + List<AggregationConfig> ingestionAggregationConfigs) { _tableNameWithType = tableNameWithType; _segmentName = segmentName; _streamName = streamName; @@ -104,6 +107,7 @@ public class RealtimeSegmentConfig { _upsertComparisonColumn = upsertComparisonColumn; _partitionUpsertMetadataManager = partitionUpsertMetadataManager; _fieldConfigList = fieldConfigList; + _ingestionAggregationConfigs = ingestionAggregationConfigs; } public String getTableNameWithType() { @@ -227,6 +231,10 @@ public class RealtimeSegmentConfig { return _fieldConfigList; } + public List<AggregationConfig> getIngestionAggregationConfigs() { + return _ingestionAggregationConfigs; + } + public static class Builder { private String _tableNameWithType; private String _segmentName; @@ -257,6 +265,7 @@ public class RealtimeSegmentConfig { private String _upsertComparisonColumn; private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private List<FieldConfig> _fieldConfigList; + private List<AggregationConfig> _ingestionAggregationConfigs; public Builder() { } @@ -414,13 +423,18 @@ public class RealtimeSegmentConfig { return this; } + public Builder setIngestionAggregationConfigs(List<AggregationConfig> ingestionAggregationConfigs) { + _ingestionAggregationConfigs = ingestionAggregationConfigs; + return this; + } + public RealtimeSegmentConfig build() { return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName, _capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _jsonIndexColumns, _h3IndexConfigs, _segmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _hashFunction, - _partitionUpsertMetadataManager, _fieldConfigList); + _partitionUpsertMetadataManager, _fieldConfigList, _ingestionAggregationConfigs); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index eea20f0687..1a4695f9a0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -30,6 +30,8 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer; @@ -41,6 +43,7 @@ import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; @@ -361,6 +364,14 @@ public final class IngestionUtils { fields.addAll(functionEvaluator.getArguments()); } } + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + if (aggregationConfigs != null) { + for (AggregationConfig aggregationConfig : aggregationConfigs) { + ExpressionContext expressionContext = + RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction()); + expressionContext.getColumns(fields); + } + } List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { for (TransformConfig transformConfig : transformConfigs) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index fd831e70d1..381c74dcb5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -24,6 +24,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableSet; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -34,10 +35,14 @@ import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; +import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexingConfig; @@ -50,6 +55,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; @@ -89,6 +95,9 @@ public final class TableConfigUtils { // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. private static final String KINESIS_STREAM_TYPE = "kinesis"; + private static final EnumSet<AggregationFunctionType> SUPPORTED_INGESTION_AGGREGATIONS = + EnumSet.of(AggregationFunctionType.SUM, AggregationFunctionType.MIN, AggregationFunctionType.MAX, + AggregationFunctionType.COUNT); /** * @see TableConfigUtils#validate(TableConfig, Schema, String, boolean) @@ -306,6 +315,60 @@ public final class TableConfigUtils { } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (!CollectionUtils.isEmpty(aggregationConfigs)) { + Preconditions.checkState( + !tableConfig.getIndexingConfig().isAggregateMetrics(), + "aggregateMetrics cannot be set with AggregationConfig"); + Set<String> aggregationColumns = new HashSet<>(); + for (AggregationConfig aggregationConfig : aggregationConfigs) { + String columnName = aggregationConfig.getColumnName(); + String aggregationFunction = aggregationConfig.getAggregationFunction(); + if (columnName == null || aggregationFunction == null) { + throw new IllegalStateException( + "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig); + } + + if (schema != null) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + Preconditions.checkState(fieldSpec != null, "The destination column '" + columnName + + "' of the aggregation function must be present in the schema"); + Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC, + "The destination column '" + columnName + "' of the aggregation function must be a metric column"); + } + + if (!aggregationColumns.add(columnName)) { + throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'"); + } + ExpressionContext expressionContext; + try { + expressionContext = RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction()); + } catch (Exception e) { + throw new IllegalStateException( + "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e); + } + Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION, + "aggregation function must be a function for: %s", aggregationConfig); + + FunctionContext functionContext = expressionContext.getFunction(); + validateIngestionAggregation(functionContext.getFunctionName()); + Preconditions.checkState(functionContext.getArguments().size() == 1, + "aggregation function can only have one argument: %s", aggregationConfig); + + ExpressionContext argument = functionContext.getArguments().get(0); + Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER, + "aggregator function argument must be a identifier: %s", aggregationConfig); + + aggregationSourceColumns.add(argument.getIdentifier()); + } + if (schema != null) { + Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns), + "all metric columns must be aggregated"); + } + } + // Transform configs List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs(); if (transformConfigs != null) { @@ -313,8 +376,11 @@ public final class TableConfigUtils { for (TransformConfig transformConfig : transformConfigs) { String columnName = transformConfig.getColumnName(); if (schema != null) { - Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, - "The destination column '" + columnName + "' of the transform function must be present in the schema"); + Preconditions.checkState( + schema.getFieldSpecFor(columnName) != null || aggregationSourceColumns.contains(columnName), + "The destination column '" + columnName + + "' of the transform function must be present in the schema or as a source column for " + + "aggregations"); } String transformFunction = transformConfig.getTransformFunction(); if (columnName == null || transformFunction == null) { @@ -363,6 +429,23 @@ public final class TableConfigUtils { } } + /** + * Currently only, ValueAggregators with fixed width types are allowed, so MIN, MAX, SUM, and COUNT. The reason + * is that only the {@link org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex} + * supports random inserts and lookups. The + * {@link org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex only supports + * sequential inserts. + */ + public static void validateIngestionAggregation(String name) { + for (AggregationFunctionType functionType : SUPPORTED_INGESTION_AGGREGATIONS) { + if (functionType.getName().equals(name)) { + return; + } + } + throw new IllegalStateException( + String.format("aggregation function %s must be one of %s", name, SUPPORTED_INGESTION_AGGREGATIONS)); + } + @VisibleForTesting static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java new file mode 100644 index 0000000000..5c5e2beeec --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.indexsegment.mutable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageMetadata; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class MutableSegmentImplIngestionAggregationTest { + private static final String DIMENSION_1 = "dim1"; + private static final String DIMENSION_2 = "dim2"; + + private static final String METRIC = "metric"; + private static final String METRIC_2 = "metric_2"; + + private static final String TIME_COLUMN1 = "time1"; + private static final String TIME_COLUMN2 = "time2"; + private static final String KEY_SEPARATOR = "\t\t"; + private static final int NUM_ROWS = 10001; + + private static final Schema.SchemaBuilder getSchemaBuilder() { + return new Schema.SchemaBuilder().setSchemaName("testSchema") + .addSingleValueDimension(DIMENSION_1, FieldSpec.DataType.INT) + .addSingleValueDimension(DIMENSION_2, FieldSpec.DataType.STRING) + .addDateTime(TIME_COLUMN1, FieldSpec.DataType.INT, "1:DAYS:EPOCH", "1:DAYS") + .addDateTime(TIME_COLUMN2, FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS"); + } + + private static final Set<String> VAR_LENGTH_SET = Collections.singleton(DIMENSION_2); + private static final Set<String> INVERTED_INDEX_SET = + new HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN1, TIME_COLUMN2)); + + private static final List<String> STRING_VALUES = + Collections.unmodifiableList(Arrays.asList("aa", "bbb", "cc", "ddd", "ee", "fff", "gg", "hhh", "ii", "jjj")); + + @Test + public void testSameSrcDifferentAggregations() + throws Exception { + String m1 = "metric_MAX"; + String m2 = "metric_MIN"; + + Schema schema = + getSchemaBuilder().addMetric(m2, FieldSpec.DataType.DOUBLE).addMetric(m1, FieldSpec.DataType.DOUBLE).build(); + MutableSegmentImpl mutableSegmentImpl = + MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(m2, m1)), + VAR_LENGTH_SET, INVERTED_INDEX_SET, + Arrays.asList(new AggregationConfig(m1, "MAX(metric)"), new AggregationConfig(m2, "MIN(metric)"))); + + Map<String, Double> expectedMin = new HashMap<>(); + Map<String, Double> expectedMax = new HashMap<>(); + for (List<Metric> metrics : addRows(1, mutableSegmentImpl)) { + expectedMin.put(metrics.get(0).getKey(), + Math.min(expectedMin.getOrDefault(metrics.get(0).getKey(), Double.POSITIVE_INFINITY), + metrics.get(0).getValue())); + expectedMax.put(metrics.get(0).getKey(), + Math.max(expectedMax.getOrDefault(metrics.get(0).getKey(), Double.NEGATIVE_INFINITY), + metrics.get(0).getValue())); + } + + GenericRow reuse = new GenericRow(); + for (int docId = 0; docId < expectedMax.size(); docId++) { + GenericRow row = mutableSegmentImpl.getRecord(docId, reuse); + String key = buildKey(row); + Assert.assertEquals(row.getValue(m2), expectedMin.get(key), key); + Assert.assertEquals(row.getValue(m1), expectedMax.get(key), key); + } + + mutableSegmentImpl.destroy(); + } + + @Test + public void testSameAggregationDifferentSrc() + throws Exception { + String m1 = "sum1"; + String m2 = "sum2"; + + Schema schema = + getSchemaBuilder().addMetric(m1, FieldSpec.DataType.INT).addMetric(m2, FieldSpec.DataType.LONG).build(); + MutableSegmentImpl mutableSegmentImpl = + MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(m2, m1)), + VAR_LENGTH_SET, INVERTED_INDEX_SET, + Arrays.asList(new AggregationConfig(m1, "SUM(metric)"), new AggregationConfig(m2, "SUM(metric_2)"))); + + Map<String, Integer> expectedSum1 = new HashMap<>(); + Map<String, Long> expectedSum2 = new HashMap<>(); + for (List<Metric> metrics : addRows(2, mutableSegmentImpl)) { + expectedSum1.put(metrics.get(0).getKey(), + expectedSum1.getOrDefault(metrics.get(0).getKey(), 0) + metrics.get(0).getValue()); + expectedSum2.put(metrics.get(1).getKey(), + expectedSum2.getOrDefault(metrics.get(1).getKey(), 0L) + metrics.get(1).getValue().longValue()); + } + + GenericRow reuse = new GenericRow(); + for (int docId = 0; docId < expectedSum1.size(); docId++) { + GenericRow row = mutableSegmentImpl.getRecord(docId, reuse); + String key = buildKey(row); + Assert.assertEquals(row.getValue(m1), expectedSum1.get(key), key); + Assert.assertEquals(row.getValue(m2), expectedSum2.get(key), key); + } + + mutableSegmentImpl.destroy(); + } + + @Test + public void testCOUNT() + throws Exception { + String m1 = "count1"; + String m2 = "count2"; + + Schema schema = + getSchemaBuilder().addMetric(m1, FieldSpec.DataType.LONG).addMetric(m2, FieldSpec.DataType.LONG).build(); + MutableSegmentImpl mutableSegmentImpl = + MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(m1, m2)), + VAR_LENGTH_SET, INVERTED_INDEX_SET, + Arrays.asList(new AggregationConfig(m1, "COUNT(metric)"), new AggregationConfig(m2, "COUNT(*)"))); + + Map<String, Long> expectedCount = new HashMap<>(); + for (List<Metric> metrics : addRows(3, mutableSegmentImpl)) { + expectedCount.put(metrics.get(0).getKey(), + expectedCount.getOrDefault(metrics.get(0).getKey(), 0L) + 1L); + } + + GenericRow reuse = new GenericRow(); + for (int docId = 0; docId < expectedCount.size(); docId++) { + GenericRow row = mutableSegmentImpl.getRecord(docId, reuse); + String key = buildKey(row); + Assert.assertEquals(row.getValue(m1), expectedCount.get(key), key); + Assert.assertEquals(row.getValue(m2), expectedCount.get(key), key); + } + + mutableSegmentImpl.destroy(); + } + + private String buildKey(GenericRow row) { + return row.getValue(DIMENSION_1) + KEY_SEPARATOR + row.getValue(DIMENSION_2) + KEY_SEPARATOR + row.getValue( + TIME_COLUMN1) + KEY_SEPARATOR + row.getValue(TIME_COLUMN2); + } + + private GenericRow getRow(Random random) { + GenericRow row = new GenericRow(); + + row.putValue(DIMENSION_1, random.nextInt(10)); + row.putValue(DIMENSION_2, STRING_VALUES.get(random.nextInt(STRING_VALUES.size()))); + row.putValue(TIME_COLUMN1, random.nextInt(10)); + row.putValue(TIME_COLUMN2, random.nextInt(5)); + + return row; + } + + private class Metric { + private final String _key; + private final Integer _value; + + Metric(String key, Integer value) { + _key = key; + _value = value; + } + + public String getKey() { + return _key; + } + + public Integer getValue() { + return _value; + } + } + + private List<List<Metric>> addRows(long seed, MutableSegmentImpl mutableSegmentImpl) + throws Exception { + List<List<Metric>> metrics = new ArrayList<>(); + Set<String> keys = new HashSet<>(); + + + Random random = new Random(seed); + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis()); + + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = getRow(random); + // This needs to be relatively low since it will tend to overflow with the Int-to-Double conversion. + Integer metricValue = random.nextInt(10000); + Integer metric2Value = random.nextInt(); + row.putValue(METRIC, metricValue); + row.putValue(METRIC_2, metric2Value); + + mutableSegmentImpl.index(row, defaultMetadata); + + String key = buildKey(row); + metrics.add(Arrays.asList(new Metric(key, metricValue), new Metric(key, metric2Value))); + keys.add(key); + } + + int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed(); + Assert.assertEquals(numDocsIndexed, keys.size()); + + // Assert that aggregation happened. + Assert.assertTrue(numDocsIndexed < NUM_ROWS); + + return metrics; + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java index e715cdd22a..20d15334e4 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.indexsegment.mutable; import java.util.Collections; +import java.util.List; import java.util.Set; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; @@ -27,6 +28,7 @@ import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.data.Schema; import static org.mockito.Mockito.anyString; @@ -40,7 +42,14 @@ public class MutableSegmentImplTestUtils { private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME"; private static final String SEGMENT_NAME = "testSegment__0__0__155555"; - private static final String STEAM_NAME = "testStream"; + private static final String STREAM_NAME = "testStream"; + + public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, + Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, + List<AggregationConfig> preAggregationConfigs) { + return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns, + Collections.emptySet(), false, false, null, null, null, null, preAggregationConfigs); + } public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, boolean aggregateMetrics) { @@ -61,20 +70,22 @@ public class MutableSegmentImplTestUtils { PartitionUpsertMetadataManager partitionUpsertMetadataManager) { return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns, Collections.emptySet(), aggregateMetrics, nullHandlingEnabled, upsertConfig, timeColumnName, - partitionUpsertMetadataManager, null); + partitionUpsertMetadataManager, null, Collections.emptyList()); } public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> jsonIndexColumns, ServerMetrics serverMetrics) { return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns, - jsonIndexColumns, false, true, null, null, null, serverMetrics); + jsonIndexColumns, false, true, null, null, null, serverMetrics, Collections.emptyList()); } public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> jsonIndexColumns, boolean aggregateMetrics, boolean nullHandlingEnabled, UpsertConfig upsertConfig, String timeColumnName, - PartitionUpsertMetadataManager partitionUpsertMetadataManager, ServerMetrics serverMetrics) { + PartitionUpsertMetadataManager partitionUpsertMetadataManager, ServerMetrics serverMetrics, + List<AggregationConfig> aggregationConfigs) { + RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class); when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200); when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32); @@ -85,14 +96,15 @@ public class MutableSegmentImplTestUtils { upsertConfig == null ? UpsertConfig.HashFunction.NONE : upsertConfig.getHashFunction(); RealtimeSegmentConfig realtimeSegmentConfig = new RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME) - .setStreamName(STEAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000) + .setStreamName(STREAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000) .setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns).setJsonIndexColumns(jsonIndexColumns) .setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns) .setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME)) .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory) .setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode) .setUpsertComparisonColumn(comparisonColumn).setHashFunction(hashFunction) - .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build(); + .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager) + .setIngestionAggregationConfigs(aggregationConfigs).build(); return new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java index 2f38f39a02..3415a46995 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java @@ -61,7 +61,7 @@ public class ExpressionTransformerTest { transformConfigs.add(new TransformConfig("map2_values", "Groovy({map2.sort()*.value}, map2)")); transformConfigs.add(new TransformConfig("hoursSinceEpoch", "Groovy({timestamp/(1000*60*60)}, timestamp)")); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions") - .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null, null)).build(); ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema); DataTypeTransformer dataTypeTransformer = new DataTypeTransformer(pinotSchema); @@ -150,7 +150,7 @@ public class ExpressionTransformerTest { transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName+' '+lastName}, firstName, lastName)")); transformConfigs.add(new TransformConfig("hoursSinceEpoch", "Groovy({timestamp/(1000*60*60)}, timestamp)")); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions") - .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null, null)).build(); ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema); @@ -204,7 +204,7 @@ public class ExpressionTransformerTest { List<TransformConfig> transformConfigs = new ArrayList<>(); transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName + ' ' + lastName}, firstName, lastName)")); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists") - .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, transformConfigs, null, null)).build(); ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema); GenericRow genericRow = new GenericRow(); @@ -220,7 +220,7 @@ public class ExpressionTransformerTest { .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "incoming"), new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "outgoing")).build(); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists") - .setIngestionConfig(new IngestionConfig(null, null, null, null, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, null, null, null)).build(); expressionTransformer = new ExpressionTransformer(tableConfig, pinotSchema); genericRow = new GenericRow(); @@ -244,7 +244,7 @@ public class ExpressionTransformerTest { transformConfigs.add(new TransformConfig("a", "plus(b, 10)")); transformConfigs.add(new TransformConfig("c", "plus(a, d)")); transformConfigs.add(new TransformConfig("f", "plus(e, 10)")); - IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testDerivedFunctions") .setIngestionConfig(ingestionConfig).build(); ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, schema); @@ -272,7 +272,7 @@ public class ExpressionTransformerTest { transformConfigs.add(new TransformConfig("a", "plus(b,10)")); transformConfigs.add(new TransformConfig("a", "plus(c,10)")); - IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testMultipleTransformFunctionSortOrder") .setIngestionConfig(ingestionConfig).build(); @@ -295,7 +295,7 @@ public class ExpressionTransformerTest { transformConfigs.add(new TransformConfig("d", "plus(e,10)")); transformConfigs.add(new TransformConfig("c", "plus(d,e)")); - IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testNonCyclicTransformFunctionSortOrder") .setIngestionConfig(ingestionConfig).build(); @@ -323,7 +323,7 @@ public class ExpressionTransformerTest { transformConfigs.add(new TransformConfig("b", "plus(c,10)")); transformConfigs.add(new TransformConfig("c", "plus(a,10)")); - IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testRecrusiveTransformFunctionSortOrder") .setIngestionConfig(ingestionConfig).build(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index f76551c0f4..f479fc351b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -92,7 +92,7 @@ public class RecordTransformerTest { // expression false, not filtered GenericRow genericRow = getRecord(); tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("Groovy({svInt > 123}, svInt)"), null, null)); + new IngestionConfig(null, null, new FilterConfig("Groovy({svInt > 123}, svInt)"), null, null, null)); RecordTransformer transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -100,7 +100,7 @@ public class RecordTransformerTest { // expression true, filtered genericRow = getRecord(); tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("Groovy({svInt <= 123}, svInt)"), null, null)); + new IngestionConfig(null, null, new FilterConfig("Groovy({svInt <= 123}, svInt)"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -108,14 +108,14 @@ public class RecordTransformerTest { // value not found genericRow = getRecord(); tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("Groovy({notPresent == 123}, notPresent)"), null, null)); + new IngestionConfig(null, null, new FilterConfig("Groovy({notPresent == 123}, notPresent)"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); // invalid function tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("Groovy(svInt == 123)"), null, null)); + new IngestionConfig(null, null, new FilterConfig("Groovy(svInt == 123)"), null, null, null)); try { new FilterTransformer(tableConfig); Assert.fail("Should have failed constructing FilterTransformer"); @@ -126,7 +126,7 @@ public class RecordTransformerTest { // multi value column genericRow = getRecord(); tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("Groovy({svFloat.max() < 500}, svFloat)"), null, null)); + new IngestionConfig(null, null, new FilterConfig("Groovy({svFloat.max() < 500}, svFloat)"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -187,7 +187,7 @@ public class RecordTransformerTest { GenericRow genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svInt = 123"), null, null)); + new FilterConfig("svInt = 123"), null, null, null)); RecordTransformer transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -196,7 +196,7 @@ public class RecordTransformerTest { genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svDouble > 120"), null, null)); + new FilterConfig("svDouble > 120"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -205,7 +205,7 @@ public class RecordTransformerTest { genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svDouble >= 123"), null, null)); + new FilterConfig("svDouble >= 123"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -214,7 +214,7 @@ public class RecordTransformerTest { genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svDouble < 200"), null, null)); + new FilterConfig("svDouble < 200"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -223,7 +223,7 @@ public class RecordTransformerTest { genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svDouble <= 123"), null, null)); + new FilterConfig("svDouble <= 123"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -232,7 +232,7 @@ public class RecordTransformerTest { genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svLong != 125"), null, null)); + new FilterConfig("svLong != 125"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -241,7 +241,7 @@ public class RecordTransformerTest { genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svLong = 123"), null, null)); + new FilterConfig("svLong = 123"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -249,7 +249,7 @@ public class RecordTransformerTest { // expression true, filtered genericRow = getRecord(); tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 125)"), null, null)); + new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 125)"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -272,21 +272,23 @@ public class RecordTransformerTest { // expression true, filtered GenericRow genericRow = getNullColumnsRecord(); tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("svNullString is null"), null, null)); + new IngestionConfig(null, null, new FilterConfig("svNullString is null"), null, null, null)); RecordTransformer transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); // expression true, filtered genericRow = getNullColumnsRecord(); - tableConfig.setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("svInt is not null"), null, null)); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, new FilterConfig("svInt is not null"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); // expression true, filtered genericRow = getNullColumnsRecord(); - tableConfig.setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("mvLong is not null"), null, null)); + tableConfig.setIngestionConfig( + new IngestionConfig(null, null, new FilterConfig("mvLong is not null"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -294,7 +296,7 @@ public class RecordTransformerTest { // expression true, filtered genericRow = getNullColumnsRecord(); tableConfig.setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("mvNullFloat is null"), null, null)); + new IngestionConfig(null, null, new FilterConfig("mvNullFloat is null"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -308,7 +310,7 @@ public class RecordTransformerTest { GenericRow genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null)); + new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null, null)); RecordTransformer transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); @@ -317,7 +319,7 @@ public class RecordTransformerTest { genericRow = getRecord(); tableConfig.setIngestionConfig( new IngestionConfig(null, null, - new FilterConfig("svInt = 125 OR svLong <= 200"), null, null)); + new FilterConfig("svInt = 125 OR svLong <= 200"), null, null, null)); transformer = new FilterTransformer(tableConfig); transformer.transform(genericRow); Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY)); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java index 0c5600c758..be0462dae3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java @@ -67,7 +67,8 @@ public class SegmentGenerationWithFilterRecordsTest { public void setup() { String filterFunction = "Groovy({((col2 < 1589007600000L) && (col3.max() < 4)) || col1 == \"B\"}, col1, col2, col3)"; - IngestionConfig ingestionConfig = new IngestionConfig(null, null, new FilterConfig(filterFunction), null, null); + IngestionConfig ingestionConfig = + new IngestionConfig(null, null, new FilterConfig(filterFunction), null, null, null); _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build(); _schema = new Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index c21bc47a90..8afb04880c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -591,7 +591,8 @@ public class SegmentPreProcessorTest { throws Exception { constructV1Segment(); _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, - Collections.singletonList(new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null)); + Collections.singletonList(new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null, + null)); _indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX); checkUpdateDefaultColumns(); @@ -636,7 +637,8 @@ public class SegmentPreProcessorTest { assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3); _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, - Collections.singletonList(new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null)); + Collections.singletonList(new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null, + null)); _indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX); checkUpdateDefaultColumns(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java index f9b7cf09da..d1c5e660ee 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; @@ -138,7 +139,7 @@ public class IngestionUtilsTest { // filter config IngestionConfig ingestionConfig = - new IngestionConfig(null, null, new FilterConfig("Groovy({x > 100}, x)"), null, null); + new IngestionConfig(null, null, new FilterConfig("Groovy({x > 100}, x)"), null, null, null); Set<String> fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); Assert.assertEquals(fields.size(), 1); Assert.assertTrue(fields.containsAll(Sets.newHashSet("x"))); @@ -152,14 +153,14 @@ public class IngestionUtilsTest { schema = new Schema.SchemaBuilder().addSingleValueDimension("d1", FieldSpec.DataType.STRING).build(); List<TransformConfig> transformConfigs = Lists.newArrayList(new TransformConfig("d1", "Groovy({function}, argument1, argument2)")); - ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); List<String> extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 3); Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", "argument2"))); // groovy function, no arguments transformConfigs = Lists.newArrayList(new TransformConfig("d1", "Groovy({function})")); - ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 1); Assert.assertTrue(extract.contains("d1")); @@ -167,7 +168,7 @@ public class IngestionUtilsTest { // inbuilt functions schema = new Schema.SchemaBuilder().addSingleValueDimension("hoursSinceEpoch", FieldSpec.DataType.LONG).build(); transformConfigs = Lists.newArrayList(new TransformConfig("hoursSinceEpoch", "toEpochHours(timestampColumn)")); - ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Arrays.asList("timestampColumn", "hoursSinceEpoch"))); @@ -177,7 +178,7 @@ public class IngestionUtilsTest { new Schema.SchemaBuilder().addSingleValueDimension("tenMinutesSinceEpoch", FieldSpec.DataType.LONG).build(); transformConfigs = Lists.newArrayList(new TransformConfig("tenMinutesSinceEpoch", "toEpochMinutesBucket(timestampColumn, 10)")); - ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch", "timestampColumn"))); @@ -187,7 +188,7 @@ public class IngestionUtilsTest { .addDateTime("dateColumn", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build(); transformConfigs = Lists.newArrayList(new TransformConfig("dateColumn", "toDateTime(timestampColumn, 'yyyy-MM-dd')")); - ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null); + ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, null, null); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 2); Assert.assertTrue(extract.containsAll(Lists.newArrayList("dateColumn", "timestampColumn"))); @@ -200,7 +201,7 @@ public class IngestionUtilsTest { transformConfigs = Lists.newArrayList(new TransformConfig("dateColumn", "toDateTime(timestampColumn, 'yyyy-MM-dd')")); ingestionConfig = - new IngestionConfig(null, null, new FilterConfig("Groovy({d1 == \"10\"}, d1)"), transformConfigs, null); + new IngestionConfig(null, null, new FilterConfig("Groovy({d1 == \"10\"}, d1)"), transformConfigs, null, null); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 6); Assert.assertTrue(extract.containsAll(Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn"))); @@ -220,11 +221,37 @@ public class IngestionUtilsTest { ComplexTypeConfig complexTypeConfigs = new ComplexTypeConfig(fieldsToUnnest, ".", ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename); FilterConfig filterConfig = new FilterConfig("Groovy({d1 == \"10\"}, d1)"); - ingestionConfig = new IngestionConfig(null, null, filterConfig, transformConfigs, complexTypeConfigs); + ingestionConfig = new IngestionConfig(null, null, filterConfig, transformConfigs, complexTypeConfigs, null); extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema)); Assert.assertEquals(extract.size(), 8); List<String> expectedColumns = Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn", "before", "after"); Assert.assertTrue(extract.containsAll(expectedColumns)); } + + @Test + public void testExtractFieldsAggregationConfig() { + Schema schema = new Schema(); + + List<AggregationConfig> aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)")); + IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, null, null, aggregationConfigs); + + Set<String> fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + Assert.assertEquals(fields.size(), 1); + Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1"))); + + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MIN(s1)")); + ingestionConfig = new IngestionConfig(null, null, null, null, null, aggregationConfigs); + + fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + Assert.assertEquals(fields.size(), 1); + Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1"))); + + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MAX(s1)")); + ingestionConfig = new IngestionConfig(null, null, null, null, null, aggregationConfigs); + + fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema); + Assert.assertEquals(fields.size(), 1); + Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1"))); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index b1910b540d..69ad516bfb 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; @@ -229,28 +230,30 @@ public class TableConfigUtilsTest { // null filter config, transform config tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setIngestionConfig(new IngestionConfig(null, null, null, null, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, null, null, null)).build(); TableConfigUtils.validate(tableConfig, schema); // null filter function tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig(null), null, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig(null), null, null, null)).build(); TableConfigUtils.validate(tableConfig, schema); // valid filterFunction tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("startsWith(columnX, \"myPrefix\")"), null, null)).build(); + new IngestionConfig(null, null, new FilterConfig("startsWith(columnX, \"myPrefix\")"), null, null, null)) + .build(); TableConfigUtils.validate(tableConfig, schema); // valid filterFunction tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("Groovy({x == 10}, x)"), null, null)) + .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("Groovy({x == 10}, x)"), null, null, null)) .build(); TableConfigUtils.validate(tableConfig, schema); // invalid filter function tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("Groovy(badExpr)"), null, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("Groovy(badExpr)"), null, null, null)) + .build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail on invalid filter function string"); @@ -259,7 +262,8 @@ public class TableConfigUtilsTest { } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("fakeFunction(xx)"), null, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("fakeFunction(xx)"), null, null, null)) + .build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for invalid filter function"); @@ -269,13 +273,13 @@ public class TableConfigUtilsTest { // empty transform configs tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setIngestionConfig(new IngestionConfig(null, null, null, Collections.emptyList(), null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, Collections.emptyList(), null, null)).build(); TableConfigUtils.validate(tableConfig, schema); // transformed column not in schema tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(anotherCol)")), - null)).build(); + null, null)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for transformedColumn not present in schema"); @@ -283,13 +287,21 @@ public class TableConfigUtilsTest { // expected } + // using a transformation column in an aggregation + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addMetric("twiceSum", FieldSpec.DataType.DOUBLE).build(); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( + new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("twice", "col * 2")), + null, Lists.newArrayList((new AggregationConfig("twiceSum", "SUM(twice)"))))).build(); + TableConfigUtils.validate(tableConfig, schema); + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .build(); // valid transform configs tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(anotherCol)")), - null)).build(); + null, null)).build(); TableConfigUtils.validate(tableConfig, schema); schema = @@ -298,7 +310,7 @@ public class TableConfigUtilsTest { // valid transform configs tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(anotherCol)"), - new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")), null)).build(); + new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")), null, null)).build(); TableConfigUtils.validate(tableConfig, schema); // invalid transform config since Groovy is disabled @@ -311,7 +323,7 @@ public class TableConfigUtilsTest { // invalid filter config since Groovy is disabled tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, new FilterConfig("Groovy({timestamp > 0}, timestamp)"), null, null)) + new IngestionConfig(null, null, new FilterConfig("Groovy({timestamp > 0}, timestamp)"), null, null, null)) .build(); try { TableConfigUtils.validate(tableConfig, schema, null, true); @@ -323,7 +335,7 @@ public class TableConfigUtilsTest { // null transform column name tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig(null, "reverse(anotherCol)")), - null)).build(); + null, null)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for null column name in transform config"); @@ -333,7 +345,8 @@ public class TableConfigUtilsTest { // null transform function string tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", null)), null)).build(); + new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", null)), null, null)) + .build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for null transform function in transform config"); @@ -344,7 +357,7 @@ public class TableConfigUtilsTest { // invalid function tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "fakeFunction(col)")), - null)).build(); + null, null)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for invalid transform function in transform config"); @@ -354,7 +367,7 @@ public class TableConfigUtilsTest { // invalid function tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "Groovy(badExpr)")), + new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "Groovy(badExpr)")), null, null)).build(); try { TableConfigUtils.validate(tableConfig, schema); @@ -365,8 +378,8 @@ public class TableConfigUtilsTest { // input field name used as destination field tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")), null)) - .build(); + new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")), null, + null)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail due to use of myCol as arguments and columnName"); @@ -376,8 +389,9 @@ public class TableConfigUtilsTest { // input field name used as destination field tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, null, - Lists.newArrayList(new TransformConfig("myCol", "Groovy({x + y + myCol}, x, myCol, y)")), null)).build(); + new IngestionConfig(null, null, null, + Lists.newArrayList(new TransformConfig("myCol", "Groovy({x + y + myCol}, x, myCol, y)")), null, null)) + .build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail due to use of myCol as arguments and columnName"); @@ -389,7 +403,7 @@ public class TableConfigUtilsTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new TransformConfig("myCol", "lower(y)")), - null)).build(); + null, null)).build(); try { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail due to duplicate transform config"); @@ -400,16 +414,15 @@ public class TableConfigUtilsTest { // derived columns - should pass tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("transformedCol", "reverse(x)"), - new TransformConfig("myCol", "lower(transformedCol)")), null)).build(); + new TransformConfig("myCol", "lower(transformedCol)")), null, null)).build(); TableConfigUtils.validate(tableConfig, schema); // invalid field name in schema with matching prefix from complexConfigType's prefixesToRename HashMap<String, String> prefixesToRename = new HashMap<>(); prefixesToRename.put("after.", ""); ComplexTypeConfig complexConfig = new ComplexTypeConfig(null, ".", null, prefixesToRename); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig( - new IngestionConfig(null, null, null, - null, complexConfig)).build(); + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setIngestionConfig(new IngestionConfig(null, null, null, null, complexConfig, null)).build(); schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) .addMultiValueDimension("after.test", FieldSpec.DataType.STRING).build(); try { @@ -420,12 +433,181 @@ public class TableConfigUtilsTest { } } + @Test + public void ingestionAggregationConfigsTest() { + List<AggregationConfig> aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)")); + IngestionConfig ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + + Schema schema = + new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", FieldSpec.DataType.DOUBLE).build(); + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).setAggregateMetrics(true).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to aggregateMetrics being set"); + } catch (IllegalStateException e) { + // expected + } + + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to destination column not being in schema"); + } catch (IllegalStateException e) { + // expected + } + + schema = + new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("d1", FieldSpec.DataType.DOUBLE) + .build(); + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to aggregation column being a dimension"); + } catch (IllegalStateException e) { + // expected + } + + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", FieldSpec.DataType.DOUBLE).build(); + aggregationConfigs = Arrays.asList(new AggregationConfig(null, null)); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to null columnName/aggregationFunction"); + } catch (IllegalStateException e) { + // expected + } + + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"), new AggregationConfig("d1", "SUM(s2)")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to duplicate destination column"); + } catch (IllegalStateException e) { + // expected + } + + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM s1")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to invalid aggregation function"); + } catch (IllegalStateException e) { + // expected + } + + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "DISTINCTCOUNTHLL(s1)")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to not supported aggregation function"); + } catch (IllegalStateException e) { + // expected + } + + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "s1 + s2")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to multiple arguments"); + } catch (IllegalStateException e) { + // expected + } + + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1 - s2)")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to inner value not being a column"); + } catch (IllegalStateException e) { + // expected + } + + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", FieldSpec.DataType.DOUBLE).build(); + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(d1)")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", FieldSpec.DataType.DOUBLE) + .addMetric("d2", FieldSpec.DataType.DOUBLE).build(); + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + + try { + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + Assert.fail("Should fail due to one metric column not being aggregated"); + } catch (IllegalStateException e) { + // expected + } + + + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", FieldSpec.DataType.DOUBLE).build(); + aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)")); + ingestionConfig = + new IngestionConfig(null, null, null, null, null, aggregationConfigs); + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); + TableConfigUtils.validateIngestionConfig(tableConfig, schema); + } + @Test public void ingestionStreamConfigsTest() { Map<String, String> streamConfigs = getStreamConfigs(); IngestionConfig ingestionConfig = new IngestionConfig(null, new StreamIngestionConfig(Lists.newArrayList(streamConfigs, streamConfigs)), null, - null, null); + null, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn") .setIngestionConfig(ingestionConfig).build(); @@ -440,7 +622,7 @@ public class TableConfigUtilsTest { // stream config should be valid ingestionConfig = - new IngestionConfig(null, new StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null); + new IngestionConfig(null, new StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null, null); tableConfig.setIngestionConfig(ingestionConfig); TableConfigUtils.validateIngestionConfig(tableConfig, null); @@ -465,7 +647,7 @@ public class TableConfigUtilsTest { IngestionConfig ingestionConfig = new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, null), - null, null, null, null); + null, null, null, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig) .build(); @@ -486,12 +668,12 @@ public class TableConfigUtilsTest { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true) .setIngestionConfig(new IngestionConfig( new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), "REFRESH", null), null, null, - null, null)).build(); + null, null, null)).build(); TableConfigUtils.validateIngestionConfig(tableConfig, null); // dimension tables should have batch ingestion config tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true) - .setIngestionConfig(new IngestionConfig(null, null, null, null, null)).build(); + .setIngestionConfig(new IngestionConfig(null, null, null, null, null, null)).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, null); Assert.fail("Should fail for Dimension table without batch ingestion config"); @@ -503,7 +685,7 @@ public class TableConfigUtilsTest { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true) .setIngestionConfig(new IngestionConfig( new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), "APPEND", null), null, null, - null, null)).build(); + null, null, null)).build(); try { TableConfigUtils.validateIngestionConfig(tableConfig, null); Assert.fail("Should fail for Dimension table with ingestion type APPEND (should be REFRESH)"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/AggregationConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/AggregationConfig.java new file mode 100644 index 0000000000..811af8bbbd --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/AggregationConfig.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table.ingestion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +public class AggregationConfig extends BaseJsonConfig { + + @JsonPropertyDescription("Aggregated column name") + private final String _columnName; + + @JsonPropertyDescription("Aggregation function") + private final String _aggregationFunction; + + @JsonCreator + public AggregationConfig(@JsonProperty("columnName") String columnName, + @JsonProperty("aggregationFunction") String aggregationFunction) { + _columnName = columnName; + _aggregationFunction = aggregationFunction; + } + + public String getColumnName() { + return _columnName; + } + + public String getAggregationFunction() { + return _aggregationFunction; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java index 8a1ea11b67..6b8329410a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java @@ -47,17 +47,23 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Config related to handling complex type") private ComplexTypeConfig _complexTypeConfig; + @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion") + private List<AggregationConfig> _aggregationConfigs; + + @JsonCreator public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable BatchIngestionConfig batchIngestionConfig, @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig streamIngestionConfig, @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig, @JsonProperty("transformConfigs") @Nullable List<TransformConfig> transformConfigs, - @JsonProperty("complexTypeConfig") @Nullable ComplexTypeConfig complexTypeConfig) { + @JsonProperty("complexTypeConfig") @Nullable ComplexTypeConfig complexTypeConfig, + @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> aggregationConfigs) { _batchIngestionConfig = batchIngestionConfig; _streamIngestionConfig = streamIngestionConfig; _filterConfig = filterConfig; _transformConfigs = transformConfigs; _complexTypeConfig = complexTypeConfig; + _aggregationConfigs = aggregationConfigs; } public IngestionConfig() { @@ -88,6 +94,11 @@ public class IngestionConfig extends BaseJsonConfig { return _complexTypeConfig; } + @Nullable + public List<AggregationConfig> getAggregationConfigs() { + return _aggregationConfigs; + } + public void setBatchIngestionConfig(BatchIngestionConfig batchIngestionConfig) { _batchIngestionConfig = batchIngestionConfig; } @@ -107,4 +118,8 @@ public class IngestionConfig extends BaseJsonConfig { public void setComplexTypeConfig(ComplexTypeConfig complexTypeConfig) { _complexTypeConfig = complexTypeConfig; } + + public void setAggregationConfigs(List<AggregationConfig> aggregationConfigs) { + _aggregationConfigs = aggregationConfigs; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index e89b28e831..ba8aefb1ee 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; @@ -73,6 +74,17 @@ public final class IngestionConfigUtils { return streamConfigMap; } + public static List<AggregationConfig> getAggregationConfigs(TableConfig tableConfig) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "aggregationConfigs are only supported in REALTIME tables. Found a OFFLINE table: %s", tableNameWithType); + + if (tableConfig.getIngestionConfig() != null) { + return tableConfig.getIngestionConfig().getAggregationConfigs(); + } + return null; + } + /** * Fetches the configured segmentIngestionType (APPEND/REFRESH) from the table config * First checks in the ingestionConfig. If not found, checks in the segmentsConfig (has been deprecated from here @@ -178,8 +190,8 @@ public final class IngestionConfigUtils { * Extracts the segment name generator type from the batchConfigMap, or returns default value if not found */ public static String getSegmentNameGeneratorType(Map<String, String> batchConfigMap) { - return batchConfigMap - .getOrDefault(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, DEFAULT_SEGMENT_NAME_GENERATOR_TYPE); + return batchConfigMap.getOrDefault(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, + DEFAULT_SEGMENT_NAME_GENERATOR_TYPE); } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java index cb0dc7ba9e..159b3c0d6b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java @@ -99,6 +99,7 @@ public class TableConfigBuilder { private List<String> _varLengthDictionaryColumns; private List<StarTreeIndexConfig> _starTreeIndexConfigs; private List<String> _jsonIndexColumns; + private boolean _aggregateMetrics; private TableCustomConfig _customConfig; private QuotaConfig _quotaConfig; @@ -284,6 +285,11 @@ public class TableConfigBuilder { return this; } + public TableConfigBuilder setAggregateMetrics(boolean aggregateMetrics) { + _aggregateMetrics = aggregateMetrics; + return this; + } + public TableConfigBuilder setStreamConfigs(Map<String, String> streamConfigs) { Preconditions.checkState(_tableType == TableType.REALTIME); _streamConfigs = streamConfigs; @@ -404,6 +410,7 @@ public class TableConfigBuilder { indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns); indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs); indexingConfig.setJsonIndexColumns(_jsonIndexColumns); + indexingConfig.setAggregateMetrics(_aggregateMetrics); if (_customConfig == null) { _customConfig = new TableCustomConfig(null); diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index 1121f87f90..e9831ea622 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -56,7 +56,8 @@ public class IngestionConfigUtilsTest { Map<String, String> streamConfigMap = new HashMap<>(); streamConfigMap.put("streamType", "kafka"); tableConfig.setIngestionConfig( - new IngestionConfig(null, new StreamIngestionConfig(Lists.newArrayList(streamConfigMap)), null, null, null)); + new IngestionConfig(null, new StreamIngestionConfig(Lists.newArrayList(streamConfigMap)), null, null, null, + null)); Map<String, String> actualStreamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); Assert.assertEquals(actualStreamConfigsMap.size(), 1); Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka"); @@ -74,7 +75,8 @@ public class IngestionConfigUtilsTest { // fail if multiple found tableConfig.setIngestionConfig(new IngestionConfig(null, - new StreamIngestionConfig(Lists.newArrayList(streamConfigMap, deprecatedStreamConfigMap)), null, null, null)); + new StreamIngestionConfig(Lists.newArrayList(streamConfigMap, deprecatedStreamConfigMap)), null, null, null, + null)); try { IngestionConfigUtils.getStreamConfigMap(tableConfig); Assert.fail("Should fail for multiple stream configs"); @@ -103,7 +105,7 @@ public class IngestionConfigUtilsTest { // get from ingestion config, when not present in segmentsConfig TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); tableConfig.setIngestionConfig( - new IngestionConfig(new BatchIngestionConfig(null, "APPEND", "HOURLY"), null, null, null, null)); + new IngestionConfig(new BatchIngestionConfig(null, "APPEND", "HOURLY"), null, null, null, null, null)); Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig), "HOURLY"); // get from ingestion config, even if present in segmentsConfig @@ -128,7 +130,7 @@ public class IngestionConfigUtilsTest { // get from ingestion config, when not present in segmentsConfig TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); tableConfig.setIngestionConfig( - new IngestionConfig(new BatchIngestionConfig(null, "APPEND", "HOURLY"), null, null, null, null)); + new IngestionConfig(new BatchIngestionConfig(null, "APPEND", "HOURLY"), null, null, null, null, null)); Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig), "APPEND"); // get from ingestion config, even if present in segmentsConfig --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org