Jackie-Jiang commented on a change in pull request #5681: URL: https://github.com/apache/incubator-pinot/pull/5681#discussion_r453846803
########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.util; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.data.function.FunctionEvaluator; +import org.apache.pinot.core.data.function.FunctionEvaluatorFactory; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; + + +/** + * Utils related to table config operations + * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done + */ +public final class TableConfigUtils { + + private TableConfigUtils() { + + } + + /** + * Validates the table config with the following rules: + * <ul> + * <li>Text index column must be raw</li> + * <li>peerSegmentDownloadScheme in ValidationConfig must be http or https</li> + * </ul> + */ + public static void validate(TableConfig tableConfig) { + validateFieldConfigList(tableConfig); + validateValidationConfig(tableConfig); + validateIngestionConfig(tableConfig.getIngestionConfig()); + } + + private static void validateFieldConfigList(TableConfig tableConfig) { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList != null) { + List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns(); + for (FieldConfig fieldConfig : fieldConfigList) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.TEXT) { + // For Text index column, it must be raw (no-dictionary) + // NOTE: Check both encodingType and noDictionaryColumns before migrating indexing configs into field configs + String column = fieldConfig.getName(); + if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW || noDictionaryColumns == null + || !noDictionaryColumns.contains(column)) { + throw new IllegalStateException( + "Text index column: " + column + " must be raw (no-dictionary) in both FieldConfig and IndexingConfig"); + } + } + } + } + } + + private static void validateValidationConfig(TableConfig tableConfig) { + SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); + if (validationConfig != null) { + if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) { + throw new IllegalStateException("Must provide time column in real-time table config"); + } + String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme(); + if (peerSegmentDownloadScheme != null) { + if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) { + throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" ); + } + } + } + } + + /** + * Validates the following: + * 1. validity of filter function + * 2. checks for duplicate transform configs + * 3. checks for null column name or transform function in transform config + * 4. validity of transform function string + * 5. checks for source fields used in destination columns + */ + private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) { + if (ingestionConfig != null) { + FilterConfig filterConfig = ingestionConfig.getFilterConfig(); + if (filterConfig != null) { + String filterFunction = filterConfig.getFilterFunction(); + if (filterFunction != null) { + try { + FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction); + } catch (Exception e) { + throw new IllegalStateException("Invalid filter function " + filterFunction, e); + } + } + } + List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs(); + if (transformConfigs != null) { + Set<String> transformColumns = new HashSet<>(); + for (TransformConfig transformConfig : transformConfigs) { + String columnName = transformConfig.getColumnName(); + if (transformColumns.contains(columnName)) { + throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'"); + } + transformColumns.add(columnName); Review comment: ```suggestion if (!transformColumns.add(columnName)) { throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'"); } ``` ########## File path: pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTest.java ########## @@ -43,14 +43,28 @@ public class ExpressionTransformerTest { @Test - public void testGroovyExpressionTransformer() - throws IOException { - URL resource = AbstractRecordExtractorTest.class.getClassLoader() Review comment: Should we keep a test for transform in schema to ensure this change is backward-compatible? We can remove the test when we remove the schema transform support. ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/Temp.java ########## @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +/** + * Hybrid cluster integration test that uses one of the DateTimeFieldSpec as primary time column + */ +public class Temp extends BaseClusterIntegrationTest { Review comment: Remove this class ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java ########## @@ -36,12 +38,19 @@ private final Map<String, FunctionEvaluator> _expressionEvaluators = new HashMap<>(); - public ExpressionTransformer(Schema schema) { + public ExpressionTransformer(Schema schema, TableConfig tableConfig) { Review comment: (nit) Let's put `tableConfig` in front of `schema` ########## File path: pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema ########## @@ -0,0 +1,342 @@ +{ Review comment: Why do we need this new schema? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.util; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.data.function.FunctionEvaluator; +import org.apache.pinot.core.data.function.FunctionEvaluatorFactory; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; + + +/** + * Utils related to table config operations + * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done + */ +public final class TableConfigUtils { + + private TableConfigUtils() { + + } + + /** + * Validates the table config with the following rules: + * <ul> + * <li>Text index column must be raw</li> + * <li>peerSegmentDownloadScheme in ValidationConfig must be http or https</li> + * </ul> + */ + public static void validate(TableConfig tableConfig) { + validateFieldConfigList(tableConfig); + validateValidationConfig(tableConfig); + validateIngestionConfig(tableConfig.getIngestionConfig()); + } + + private static void validateFieldConfigList(TableConfig tableConfig) { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList != null) { + List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns(); + for (FieldConfig fieldConfig : fieldConfigList) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.TEXT) { + // For Text index column, it must be raw (no-dictionary) + // NOTE: Check both encodingType and noDictionaryColumns before migrating indexing configs into field configs + String column = fieldConfig.getName(); + if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW || noDictionaryColumns == null + || !noDictionaryColumns.contains(column)) { + throw new IllegalStateException( + "Text index column: " + column + " must be raw (no-dictionary) in both FieldConfig and IndexingConfig"); + } + } + } + } + } + + private static void validateValidationConfig(TableConfig tableConfig) { + SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); + if (validationConfig != null) { + if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) { + throw new IllegalStateException("Must provide time column in real-time table config"); + } + String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme(); + if (peerSegmentDownloadScheme != null) { + if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) { + throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" ); + } + } + } + } + + /** + * Validates the following: + * 1. validity of filter function + * 2. checks for duplicate transform configs + * 3. checks for null column name or transform function in transform config + * 4. validity of transform function string + * 5. checks for source fields used in destination columns + */ + private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) { + if (ingestionConfig != null) { + FilterConfig filterConfig = ingestionConfig.getFilterConfig(); + if (filterConfig != null) { + String filterFunction = filterConfig.getFilterFunction(); + if (filterFunction != null) { + try { + FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction); + } catch (Exception e) { + throw new IllegalStateException("Invalid filter function " + filterFunction, e); + } + } + } + List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs(); + if (transformConfigs != null) { + Set<String> transformColumns = new HashSet<>(); + for (TransformConfig transformConfig : transformConfigs) { + String columnName = transformConfig.getColumnName(); + if (transformColumns.contains(columnName)) { + throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'"); + } + transformColumns.add(columnName); + String transformFunction = transformConfig.getTransformFunction(); + if (columnName == null || transformFunction == null) { + throw new IllegalStateException("columnName/transformFunction cannot be null in TransformConfig " + transformConfig); + } + FunctionEvaluator expressionEvaluator; + try { + expressionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(transformFunction); + } catch (Exception e) { + throw new IllegalStateException( + "Invalid transform function '" + transformFunction + "' for column '" + columnName + "'"); + } + List<String> arguments = expressionEvaluator.getArguments(); Review comment: Should we check that arguments are not contained in the `transformColumns`? We do not support chained transforms currently ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java ########## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +/** + * Integration test that converts Avro data for 12 segments and runs queries against it. + */ +public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet { + + private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema"; + + @Override + protected String getSchemaFileName() { + return SCHEMA_FILE_NAME; + } + + @Override + protected String getTimeColumnName() { + return TIME_COLUMN_NAME; + } + + @Override + protected long getCountStarResult() { + return 22300; Review comment: This result should be the same as `select count(*) from mytable where AirlineID != 19393 AND ArrDelayMinutes > 5` within other integration test, where I got 24047. Please document how this number is calculated. When we add a test, we should not run the test and directly put the result as the expected value because that won't catch the bug of the code or the test logic ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java ########## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +/** + * Integration test that converts Avro data for 12 segments and runs queries against it. + */ +public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet { + + private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema"; + + @Override + protected String getSchemaFileName() { + return SCHEMA_FILE_NAME; + } + + @Override + protected String getTimeColumnName() { + return TIME_COLUMN_NAME; + } + + @Override + protected long getCountStarResult() { + return 22300; + } + + @Override + protected boolean useLlc() { + return true; + } + + @Override + protected IngestionConfig getIngestionConfig() { + FilterConfig filterConfig = new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 }, AirlineID, ArrDelayMinutes)"); + List<TransformConfig> transformConfigs = new ArrayList<>(); + transformConfigs.add(new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)")); + transformConfigs.add(new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)")); + transformConfigs.add(new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)")); + return new IngestionConfig(filterConfig, transformConfigs); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + startKafka(); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(0, avroFiles.size() -1), tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + List<File> realtimeAvroFile = Lists.newArrayList(avroFiles.get(avroFiles.size() - 1)); + addTableConfig(createRealtimeTableConfig(realtimeAvroFile.get(0))); + pushAvroIntoKafka(realtimeAvroFile); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @Test + public void testQueries() + throws Exception { + // Select column created with transform function + String sqlQuery = "Select millisSinceEpoch from " + DEFAULT_TABLE_NAME; + JsonNode response = postSqlQuery(sqlQuery); + assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "millisSinceEpoch"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG"); + + // Select column created with transform function + sqlQuery = "Select AmPm, DepTime from " + DEFAULT_TABLE_NAME; + response = postSqlQuery(sqlQuery); + assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "AmPm"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(1).asText(), "DepTime"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "STRING"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(1).asText(), "INT"); + for (int i = 0; i < response.get("resultTable").get("rows").size(); i++) { + String amPm = response.get("resultTable").get("rows").get(i).get(0).asText(); + int depTime = response.get("resultTable").get("rows").get(i).get(1).asInt(); + Assert.assertEquals(amPm, (depTime < 1200) ? "AM" : "PM"); + } + + // Select column created with transform function - offline table + sqlQuery = "Select AmPm, DepTime from " + DEFAULT_TABLE_NAME + "_OFFLINE"; + response = postSqlQuery(sqlQuery); + assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "AmPm"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(1).asText(), "DepTime"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "STRING"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(1).asText(), "INT"); + for (int i = 0; i < response.get("resultTable").get("rows").size(); i++) { + String amPm = response.get("resultTable").get("rows").get(i).get(0).asText(); + int depTime = response.get("resultTable").get("rows").get(i).get(1).asInt(); + Assert.assertEquals(amPm, (depTime < 1200) ? "AM" : "PM"); + } + + // Select column created with transform - realtime table + sqlQuery = "Select AmPm, DepTime from " + DEFAULT_TABLE_NAME + "_REALTIME"; + response = postSqlQuery(sqlQuery); + assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "AmPm"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(1).asText(), "DepTime"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "STRING"); + assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(1).asText(), "INT"); + for (int i = 0; i < response.get("resultTable").get("rows").size(); i++) { + String amPm = response.get("resultTable").get("rows").get(i).get(0).asText(); + int depTime = response.get("resultTable").get("rows").get(i).get(1).asInt(); + Assert.assertEquals(amPm, (depTime < 1200) ? "AM" : "PM"); + } + + // Check there's no values that should've been filtered + sqlQuery = "Select * from " + DEFAULT_TABLE_NAME + + " where AirlineID = 19393 or ArrDelayMinutes <= 5"; + response = postSqlQuery(sqlQuery); + Assert.assertEquals(response.get("resultTable").get("rows").size(), 0); + + // Check there's no values that should've been filtered - realtime table + sqlQuery = "Select * from " + DEFAULT_TABLE_NAME + "_REALTIME" + + " where AirlineID = 19393 or ArrDelayMinutes <= 5"; + response = postSqlQuery(sqlQuery); + Assert.assertEquals(response.get("resultTable").get("rows").size(), 0); + + // Check there's no values that should've been filtered - offline table + sqlQuery = "Select * from " + DEFAULT_TABLE_NAME + "_OFFLINE" + + " where AirlineID = 19393 or ArrDelayMinutes <= 5"; + response = postSqlQuery(sqlQuery); + Assert.assertEquals(response.get("resultTable").get("rows").size(), 0); + } + + @AfterClass + public void tearDown() + throws Exception { + Review comment: (nit) remove empty line ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java ########## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +/** + * Integration test that converts Avro data for 12 segments and runs queries against it. + */ +public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet { + + private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema"; Review comment: Make a simplified schema (only contains the columns needed for the test). You can directly override `createSchema()` ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java ########## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +/** + * Integration test that converts Avro data for 12 segments and runs queries against it. + */ +public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet { + + private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; + private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_ingestion_config.schema"; + + @Override + protected String getSchemaFileName() { + return SCHEMA_FILE_NAME; + } + + @Override + protected String getTimeColumnName() { + return TIME_COLUMN_NAME; + } + + @Override + protected long getCountStarResult() { + return 22300; + } + + @Override + protected boolean useLlc() { + return true; + } + + @Override + protected IngestionConfig getIngestionConfig() { + FilterConfig filterConfig = new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 }, AirlineID, ArrDelayMinutes)"); + List<TransformConfig> transformConfigs = new ArrayList<>(); + transformConfigs.add(new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, DepTime)")); + transformConfigs.add(new TransformConfig("millisSinceEpoch", "fromEpochDays(DaysSinceEpoch)")); + transformConfigs.add(new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)")); + return new IngestionConfig(filterConfig, transformConfigs); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + startKafka(); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(0, avroFiles.size() -1), tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + List<File> realtimeAvroFile = Lists.newArrayList(avroFiles.get(avroFiles.size() - 1)); Review comment: No overlapping segments? The result won't be correct. Please use the set up logic as in the `HybridClusterIntegrationTest` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.util; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.data.function.FunctionEvaluator; +import org.apache.pinot.core.data.function.FunctionEvaluatorFactory; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; + + +/** + * Utils related to table config operations + * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done + */ +public final class TableConfigUtils { Review comment: This will have conflict with #5667. Let's figure out the sequence of merging these 2 PRs ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java ########## @@ -67,12 +70,24 @@ private static void extractFieldsFromSchema(Schema schema, Set<String> fields) { * Extracts the fields needed by a RecordExtractor from given {@link IngestionConfig} */ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig ingestionConfig, Set<String> fields) { - if (ingestionConfig != null && ingestionConfig.getFilterConfig() != null) { - String filterFunction = ingestionConfig.getFilterConfig().getFilterFunction(); - if (filterFunction != null) { - FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction); - if (functionEvaluator != null) { - fields.addAll(functionEvaluator.getArguments()); + if (ingestionConfig != null) { + FilterConfig filterConfig = ingestionConfig.getFilterConfig(); + if (filterConfig != null) { + String filterFunction = filterConfig.getFilterFunction(); + if (filterFunction != null) { + FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction); + if (functionEvaluator != null) { + fields.addAll(functionEvaluator.getArguments()); + } + } + } + List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs(); + if (transformConfigs != null) { + for (TransformConfig transformConfig : transformConfigs) { + FunctionEvaluator expressionEvaluator = + FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()); + fields.addAll(expressionEvaluator.getArguments()); + fields.add(transformConfig.getColumnName()); Review comment: Please comment on why we extract both input and output column ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.util; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.data.function.FunctionEvaluator; +import org.apache.pinot.core.data.function.FunctionEvaluatorFactory; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; + + +/** + * Utils related to table config operations + * FIXME: Merge this TableConfigUtils with the TableConfigUtils from pinot-common when merging of modules is done + */ +public final class TableConfigUtils { + + private TableConfigUtils() { + + } + + /** + * Validates the table config with the following rules: + * <ul> + * <li>Text index column must be raw</li> + * <li>peerSegmentDownloadScheme in ValidationConfig must be http or https</li> + * </ul> + */ + public static void validate(TableConfig tableConfig) { + validateFieldConfigList(tableConfig); + validateValidationConfig(tableConfig); + validateIngestionConfig(tableConfig.getIngestionConfig()); + } + + private static void validateFieldConfigList(TableConfig tableConfig) { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList != null) { + List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns(); + for (FieldConfig fieldConfig : fieldConfigList) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.TEXT) { + // For Text index column, it must be raw (no-dictionary) + // NOTE: Check both encodingType and noDictionaryColumns before migrating indexing configs into field configs + String column = fieldConfig.getName(); + if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW || noDictionaryColumns == null + || !noDictionaryColumns.contains(column)) { + throw new IllegalStateException( + "Text index column: " + column + " must be raw (no-dictionary) in both FieldConfig and IndexingConfig"); + } + } + } + } + } + + private static void validateValidationConfig(TableConfig tableConfig) { + SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); + if (validationConfig != null) { + if (tableConfig.getTableType() == TableType.REALTIME && validationConfig.getTimeColumnName() == null) { + throw new IllegalStateException("Must provide time column in real-time table config"); + } + String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme(); + if (peerSegmentDownloadScheme != null) { + if (!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && !CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) { + throw new IllegalStateException("Invalid value '" + peerSegmentDownloadScheme + "' for peerSegmentDownloadScheme. Must be one of http nor https" ); + } + } + } + } + + /** + * Validates the following: + * 1. validity of filter function + * 2. checks for duplicate transform configs + * 3. checks for null column name or transform function in transform config + * 4. validity of transform function string + * 5. checks for source fields used in destination columns + */ + private static void validateIngestionConfig(@Nullable IngestionConfig ingestionConfig) { + if (ingestionConfig != null) { + FilterConfig filterConfig = ingestionConfig.getFilterConfig(); + if (filterConfig != null) { + String filterFunction = filterConfig.getFilterFunction(); + if (filterFunction != null) { + try { + FunctionEvaluatorFactory.getExpressionEvaluator(filterFunction); + } catch (Exception e) { + throw new IllegalStateException("Invalid filter function " + filterFunction, e); + } + } + } + List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs(); + if (transformConfigs != null) { + Set<String> transformColumns = new HashSet<>(); + for (TransformConfig transformConfig : transformConfigs) { + String columnName = transformConfig.getColumnName(); + if (transformColumns.contains(columnName)) { + throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'"); + } + transformColumns.add(columnName); + String transformFunction = transformConfig.getTransformFunction(); + if (columnName == null || transformFunction == null) { Review comment: Perform null check before the set check ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigIntegrationTest.java ########## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.IngestionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.FilterConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +/** + * Integration test that converts Avro data for 12 segments and runs queries against it. + */ +public class IngestionConfigIntegrationTest extends BaseClusterIntegrationTestSet { Review comment: extend `BaseClusterIntegrationTest` instead of `BaseClusterIntegrationTestSet` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org