This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a949f9553b [Feature] Support Coalesce for Column Names (#9327) a949f9553b is described below commit a949f9553b239e690ff44ba0406d795c684eb1b4 Author: Yao Liu <y...@startree.ai> AuthorDate: Wed Sep 14 16:50:13 2022 -0700 [Feature] Support Coalesce for Column Names (#9327) --- .../common/function/TransformFunctionType.java | 1 + .../function/CoalesceTransformFunction.java | 369 +++++++++++++++++ .../function/TransformFunctionFactory.java | 1 + .../function/CoalesceTransformFunctionTest.java | 446 +++++++++++++++++++++ 4 files changed, 817 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java index 75e339c6d5..93a87ba41c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java @@ -62,6 +62,7 @@ public enum TransformFunctionType { IS_NULL("is_null"), IS_NOT_NULL("is_not_null"), + COALESCE("coalesce"), IS_DISTINCT_FROM("is_distinct_from"), IS_NOT_DISTINCT_FROM("is_not_distinct_from"), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java new file mode 100644 index 0000000000..dfc45a8036 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import com.google.common.base.Preconditions; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.function.TransformFunctionType; +import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.utils.NullValueUtils; +import org.roaringbitmap.RoaringBitmap; + + +/** + * The <code>CoalesceTransformFunction</code> implements the Coalesce operator. + * + * The results are in String format for first non-null value in the argument list. + * If all arguments are null, return a 'null' string. + * Note: arguments have to be column names and single type. The type can be either numeric or string. + * Number of arguments has to be greater than 0. + * + * Expected result: + * Coalesce(nullColumn, columnA): columnA + * Coalesce(columnA, nullColumn): nullColumn + * Coalesce(nullColumnA, nullColumnB): "null" + * + * Note this operator only takes column names for now. + * SQL Syntax: + * Coalesce(columnA, columnB) + */ +public class CoalesceTransformFunction extends BaseTransformFunction { + private TransformFunction[] _transformFunctions; + private FieldSpec.DataType _storedType; + private TransformResultMetadata _resultMetadata; + + /** + * Returns a bit map of corresponding column. + * Returns an empty bitmap by default if null option is disabled. + */ + private static RoaringBitmap[] getNullBitMaps(ProjectionBlock projectionBlock, + TransformFunction[] transformFunctions) { + RoaringBitmap[] roaringBitmaps = new RoaringBitmap[transformFunctions.length]; + for (int i = 0; i < roaringBitmaps.length; i++) { + TransformFunction func = transformFunctions[i]; + String columnName = ((IdentifierTransformFunction) func).getColumnName(); + RoaringBitmap nullBitmap = projectionBlock.getBlockValueSet(columnName).getNullBitmap(); + roaringBitmaps[i] = nullBitmap; + } + return roaringBitmaps; + } + + /** + * Get transform int results based on store type. + * @param projectionBlock + */ + private int[] getIntTransformResults(ProjectionBlock projectionBlock) { + int length = projectionBlock.getNumDocs(); + int[] results = new int[length]; + int width = _transformFunctions.length; + RoaringBitmap[] nullBitMaps = getNullBitMaps(projectionBlock, _transformFunctions); + int[][] data = new int[width][length]; + RoaringBitmap filledData = new RoaringBitmap(); + for (int i = 0; i < length; i++) { + boolean hasNonNullValue = false; + for (int j = 0; j < width; j++) { + // Consider value as null only when null option is enabled. + if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) { + continue; + } + if (!filledData.contains(j)) { + filledData.add(j); + data[j] = _transformFunctions[j].transformToIntValuesSV(projectionBlock); + } + hasNonNullValue = true; + results[i] = data[j][i]; + break; + } + if (!hasNonNullValue) { + results[i] = (int) NullValueUtils.getDefaultNullValue(_storedType); + } + } + return results; + } + + /** + * Get transform long results based on store type. + * @param projectionBlock + */ + private long[] getLongTransformResults(ProjectionBlock projectionBlock) { + int length = projectionBlock.getNumDocs(); + long[] results = new long[length]; + int width = _transformFunctions.length; + RoaringBitmap[] nullBitMaps = getNullBitMaps(projectionBlock, _transformFunctions); + long[][] data = new long[width][length]; + RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data. + for (int i = 0; i < length; i++) { + boolean hasNonNullValue = false; + for (int j = 0; j < width; j++) { + // Consider value as null only when null option is enabled. + if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) { + continue; + } + if (!filledData.contains(j)) { + filledData.add(j); + data[j] = _transformFunctions[j].transformToLongValuesSV(projectionBlock); + } + hasNonNullValue = true; + results[i] = data[j][i]; + break; + } + if (!hasNonNullValue) { + results[i] = (long) NullValueUtils.getDefaultNullValue(_storedType); + } + } + return results; + } + + /** + * Get transform float results based on store type. + * @param projectionBlock + */ + private float[] getFloatTransformResults(ProjectionBlock projectionBlock) { + int length = projectionBlock.getNumDocs(); + float[] results = new float[length]; + int width = _transformFunctions.length; + RoaringBitmap[] nullBitMaps = getNullBitMaps(projectionBlock, _transformFunctions); + float[][] data = new float[width][length]; + RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data. + for (int i = 0; i < length; i++) { + boolean hasNonNullValue = false; + for (int j = 0; j < width; j++) { + // Consider value as null only when null option is enabled. + if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) { + continue; + } + if (!filledData.contains(j)) { + filledData.add(j); + data[j] = _transformFunctions[j].transformToFloatValuesSV(projectionBlock); + } + hasNonNullValue = true; + results[i] = data[j][i]; + break; + } + if (!hasNonNullValue) { + results[i] = (float) NullValueUtils.getDefaultNullValue(_storedType); + } + } + return results; + } + + /** + * Get transform double results based on store type. + * @param projectionBlock + */ + private double[] getDoublelTransformResults(ProjectionBlock projectionBlock) { + int length = projectionBlock.getNumDocs(); + double[] results = new double[length]; + int width = _transformFunctions.length; + RoaringBitmap[] nullBitMaps = getNullBitMaps(projectionBlock, _transformFunctions); + double[][] data = new double[width][length]; + RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data. + for (int i = 0; i < length; i++) { + boolean hasNonNullValue = false; + for (int j = 0; j < width; j++) { + // Consider value as null only when null option is enabled. + if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) { + continue; + } + if (!filledData.contains(j)) { + filledData.add(j); + data[j] = _transformFunctions[j].transformToDoubleValuesSV(projectionBlock); + } + hasNonNullValue = true; + results[i] = data[j][i]; + break; + } + if (!hasNonNullValue) { + results[i] = (double) NullValueUtils.getDefaultNullValue(_storedType); + } + } + return results; + } + + /** + * Get transform BigDecimal results based on store type. + * @param projectionBlock + */ + private BigDecimal[] getBigDecimalTransformResults(ProjectionBlock projectionBlock) { + int length = projectionBlock.getNumDocs(); + BigDecimal[] results = new BigDecimal[length]; + int width = _transformFunctions.length; + RoaringBitmap[] nullBitMaps = getNullBitMaps(projectionBlock, _transformFunctions); + BigDecimal[][] data = new BigDecimal[width][length]; + RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data. + for (int i = 0; i < length; i++) { + boolean hasNonNullValue = false; + for (int j = 0; j < width; j++) { + // Consider value as null only when null option is enabled. + if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) { + continue; + } + if (!filledData.contains(j)) { + filledData.add(j); + data[j] = _transformFunctions[j].transformToBigDecimalValuesSV(projectionBlock); + } + hasNonNullValue = true; + results[i] = data[j][i]; + break; + } + if (!hasNonNullValue) { + results[i] = (BigDecimal) NullValueUtils.getDefaultNullValue(_storedType); + } + } + return results; + } + + + /** + * Get transform String results based on store type. + * @param projectionBlock + */ + private String[] getStringTransformResults(ProjectionBlock projectionBlock) { + int length = projectionBlock.getNumDocs(); + String[] results = new String[length]; + int width = _transformFunctions.length; + RoaringBitmap[] nullBitMaps = getNullBitMaps(projectionBlock, _transformFunctions); + String[][] data = new String[width][length]; + RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data. + for (int i = 0; i < length; i++) { + boolean hasNonNullValue = false; + for (int j = 0; j < width; j++) { + // Consider value as null only when null option is enabled. + if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) { + continue; + } + if (!filledData.contains(j)) { + filledData.add(j); + data[j] = _transformFunctions[j].transformToStringValuesSV(projectionBlock); + } + hasNonNullValue = true; + results[i] = data[j][i]; + break; + } + if (!hasNonNullValue) { + results[i] = (String) NullValueUtils.getDefaultNullValue(_storedType); + } + } + return results; + } + + @Override + public String getName() { + return TransformFunctionType.COALESCE.getName(); + } + + @Override + public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) { + int argSize = arguments.size(); + Preconditions.checkArgument(argSize > 0, "COALESCE needs to have at least one argument."); + _transformFunctions = new TransformFunction[argSize]; + for (int i = 0; i < argSize; i++) { + TransformFunction func = arguments.get(i); + Preconditions.checkArgument(func instanceof IdentifierTransformFunction, + "Only column names are supported in COALESCE."); + FieldSpec.DataType storedType = func.getResultMetadata().getDataType().getStoredType(); + if (_storedType == null) { + _storedType = storedType; + } else { + Preconditions.checkArgument(storedType.equals(_storedType), "Argument types have to be the same."); + } + _transformFunctions[i] = func; + } + switch (_storedType) { + case INT: + _resultMetadata = INT_SV_NO_DICTIONARY_METADATA; + break; + case LONG: + _resultMetadata = LONG_SV_NO_DICTIONARY_METADATA; + break; + case FLOAT: + _resultMetadata = FLOAT_SV_NO_DICTIONARY_METADATA; + break; + case DOUBLE: + _resultMetadata = DOUBLE_SV_NO_DICTIONARY_METADATA; + break; + case BIG_DECIMAL: + _resultMetadata = BIG_DECIMAL_SV_NO_DICTIONARY_METADATA; + break; + case STRING: + _resultMetadata = STRING_SV_NO_DICTIONARY_METADATA; + break; + default: + throw new UnsupportedOperationException("Coalesce only supports numerical and string data type"); + } + } + + @Override + public TransformResultMetadata getResultMetadata() { + return _resultMetadata; + } + + @Override + public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) { + if (_storedType != FieldSpec.DataType.STRING) { + return super.transformToStringValuesSV(projectionBlock); + } + return getStringTransformResults(projectionBlock); + } + + @Override + public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) { + if (_storedType != FieldSpec.DataType.INT) { + return super.transformToIntValuesSV(projectionBlock); + } + return getIntTransformResults(projectionBlock); + } + + @Override + public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) { + if (_storedType != FieldSpec.DataType.LONG) { + return super.transformToLongValuesSV(projectionBlock); + } + return getLongTransformResults(projectionBlock); + } + + @Override + public BigDecimal[] transformToBigDecimalValuesSV(ProjectionBlock projectionBlock) { + if (_storedType != FieldSpec.DataType.BIG_DECIMAL) { + return super.transformToBigDecimalValuesSV(projectionBlock); + } + return getBigDecimalTransformResults(projectionBlock); + } + + @Override + public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) { + if (_storedType != FieldSpec.DataType.DOUBLE) { + return super.transformToDoubleValuesSV(projectionBlock); + } + return getDoublelTransformResults(projectionBlock); + } + + @Override + public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) { + if (_storedType != FieldSpec.DataType.FLOAT) { + return super.transformToFloatValuesSV(projectionBlock); + } + return getFloatTransformResults(projectionBlock); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java index dee41efde3..74a2763345 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java @@ -202,6 +202,7 @@ public class TransformFunctionFactory { typeToImplementation.put(TransformFunctionType.IS_NULL, IsNullTransformFunction.class); typeToImplementation.put(TransformFunctionType.IS_NOT_NULL, IsNotNullTransformFunction.class); + typeToImplementation.put(TransformFunctionType.COALESCE, CoalesceTransformFunction.class); typeToImplementation.put(TransformFunctionType.IS_DISTINCT_FROM, IsDistinctFromTransformFunction.class); typeToImplementation.put(TransformFunctionType.IS_NOT_DISTINCT_FROM, IsNotDistinctFromTransformFunction.class); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java new file mode 100644 index 0000000000..8f7584388e --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java @@ -0,0 +1,446 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.io.File; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.core.operator.DocIdSetOperator; +import org.apache.pinot.core.operator.ProjectionOperator; +import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.filter.MatchAllFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.NullValueUtils; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class CoalesceTransformFunctionTest extends BaseTransformFunctionTest { + private static final String ENABLE_NULL_SEGMENT_NAME = "testSegment1"; + private static final String DISABLE_NULL_SEGMENT_NAME = "testSegment2"; + private static final Random RANDOM = new Random(); + + private static final int NUM_ROWS = 1000; + private static final String INT_SV_COLUMN1 = "intSV1"; + private static final String INT_SV_COLUMN2 = "intSV2"; + private static final String STRING_SV_COLUMN1 = "StringSV1"; + private static final String STRING_SV_COLUMN2 = "StringSV2"; + private static final String BIG_DECIMAL_SV_COLUMN1 = "BigDecimalSV1"; + private static final String BIG_DECIMAL_SV_COLUMN2 = "BigDecimalSV2"; + private static final String LONG_SV_COLUMN1 = "LongSV1"; + private static final String LONG_SV_COLUMN2 = "LongSV2"; + private static final String DOUBLE_SV_COLUMN1 = "DoubleSV1"; + private static final String DOUBLE_SV_COLUMN2 = "DoubleSV2"; + + private static final String FLOAT_SV_COLUMN1 = "FloatSV1"; + private static final String FLOAT_SV_COLUMN2 = "FLoatSV2"; + private final int[] _intSVValues = new int[NUM_ROWS]; + private final double[] _doubleValues = new double[NUM_ROWS]; + private final float[] _floatValues = new float[NUM_ROWS]; + private final String[] _stringSVValues = new String[NUM_ROWS]; + private Map<String, DataSource> _enableNullDataSourceMap; + private Map<String, DataSource> _disableNullDataSourceMap; + private ProjectionBlock _enableNullProjectionBlock; + private ProjectionBlock _disableNullProjectionBlock; + // Mod decides whether the first column of the same type should be null. + private static final int NULL_MOD1 = 3; + // Mod decides whether the second column of the same type should be null. + private static final int NULL_MOD2 = 5; + // Difference between two same type numeric columns. + private static final int INT_VALUE_SHIFT = 2; + private static final double DOUBLE_VALUE_SHIFT = 0.1; + private static final float FLOAT_VALUE_SHIFT = 0.1f; + + // Suffix for second string column. + private static final String SUFFIX = "column2"; + + private static String getIndexDirPath(String segmentName) { + return FileUtils.getTempDirectoryPath() + File.separator + segmentName; + } + + private static Map<String, DataSource> getDataSourceMap(Schema schema, List<GenericRow> rows, String segmentName) + throws Exception { + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(segmentName).setNullHandlingEnabled(true).build(); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(getIndexDirPath(segmentName)); + config.setSegmentName(segmentName); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(rows)); + driver.build(); + IndexSegment indexSegment = + ImmutableSegmentLoader.load(new File(getIndexDirPath(segmentName), segmentName), ReadMode.heap); + Set<String> columnNames = indexSegment.getPhysicalColumnNames(); + Map<String, DataSource> enableNullDataSourceMap = new HashMap<>(columnNames.size()); + for (String columnName : columnNames) { + enableNullDataSourceMap.put(columnName, indexSegment.getDataSource(columnName)); + } + return enableNullDataSourceMap; + } + + private static ProjectionBlock getProjectionBlock(Map<String, DataSource> dataSourceMap) { + return new ProjectionOperator(dataSourceMap, + new DocIdSetOperator(new MatchAllFilterOperator(NUM_ROWS), DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock(); + } + + private static boolean isColumn1Null(int i) { + return i % NULL_MOD1 == 0; + } + + private static boolean isColumn2Null(int i) { + return i % NULL_MOD2 == 0; + } + + @BeforeClass + public void setup() + throws Exception { + // Set up two tables: one with null option enable, the other with null option disable. + // Each table one string column, and one int column with some rows set to null. + FileUtils.deleteQuietly(new File(getIndexDirPath(DISABLE_NULL_SEGMENT_NAME))); + FileUtils.deleteQuietly(new File(getIndexDirPath(ENABLE_NULL_SEGMENT_NAME))); + for (int i = 0; i < NUM_ROWS; i++) { + _intSVValues[i] = RANDOM.nextInt(); + _doubleValues[i] = RANDOM.nextDouble(); + _floatValues[i] = RANDOM.nextFloat(); + _stringSVValues[i] = "a" + RANDOM.nextInt(); + } + List<GenericRow> rows = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + Map<String, Object> map = new HashMap<>(); + map.put(INT_SV_COLUMN1, _intSVValues[i]); + map.put(INT_SV_COLUMN2, _intSVValues[i] + INT_VALUE_SHIFT); + map.put(DOUBLE_SV_COLUMN1, _doubleValues[i]); + map.put(DOUBLE_SV_COLUMN2, _doubleValues[i] + DOUBLE_VALUE_SHIFT); + map.put(FLOAT_SV_COLUMN1, _floatValues[i]); + map.put(FLOAT_SV_COLUMN2, _floatValues[i] + FLOAT_VALUE_SHIFT); + map.put(STRING_SV_COLUMN1, _stringSVValues[i]); + map.put(STRING_SV_COLUMN2, _stringSVValues[i] + SUFFIX); + map.put(BIG_DECIMAL_SV_COLUMN1, BigDecimal.valueOf(_intSVValues[i])); + map.put(BIG_DECIMAL_SV_COLUMN2, BigDecimal.valueOf(_intSVValues[i] + INT_VALUE_SHIFT)); + map.put(LONG_SV_COLUMN1, _intSVValues[i]); + map.put(LONG_SV_COLUMN2, _intSVValues[i] + INT_VALUE_SHIFT); + + if (isColumn1Null(i)) { + map.put(INT_SV_COLUMN1, null); + map.put(STRING_SV_COLUMN1, null); + map.put(BIG_DECIMAL_SV_COLUMN1, null); + map.put(LONG_SV_COLUMN1, null); + map.put(DOUBLE_SV_COLUMN1, null); + map.put(FLOAT_SV_COLUMN1, null); + } + if (isColumn2Null(i)) { + map.put(INT_SV_COLUMN2, null); + map.put(STRING_SV_COLUMN2, null); + map.put(LONG_SV_COLUMN2, null); + map.put(BIG_DECIMAL_SV_COLUMN2, null); + map.put(DOUBLE_SV_COLUMN2, null); + map.put(FLOAT_SV_COLUMN2, null); + } + GenericRow row = new GenericRow(); + row.init(map); + rows.add(row); + } + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(INT_SV_COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(INT_SV_COLUMN2, FieldSpec.DataType.INT) + .addSingleValueDimension(STRING_SV_COLUMN1, FieldSpec.DataType.STRING) + .addSingleValueDimension(STRING_SV_COLUMN2, FieldSpec.DataType.STRING) + .addSingleValueDimension(LONG_SV_COLUMN1, FieldSpec.DataType.LONG) + .addSingleValueDimension(LONG_SV_COLUMN2, FieldSpec.DataType.LONG) + .addSingleValueDimension(DOUBLE_SV_COLUMN1, FieldSpec.DataType.DOUBLE) + .addSingleValueDimension(DOUBLE_SV_COLUMN2, FieldSpec.DataType.DOUBLE) + .addSingleValueDimension(FLOAT_SV_COLUMN1, FieldSpec.DataType.FLOAT) + .addSingleValueDimension(FLOAT_SV_COLUMN2, FieldSpec.DataType.FLOAT) + .addMetric(BIG_DECIMAL_SV_COLUMN1, FieldSpec.DataType.BIG_DECIMAL) + .addMetric(BIG_DECIMAL_SV_COLUMN2, FieldSpec.DataType.BIG_DECIMAL).build(); + _enableNullDataSourceMap = getDataSourceMap(schema, rows, ENABLE_NULL_SEGMENT_NAME); + _enableNullProjectionBlock = getProjectionBlock(_enableNullDataSourceMap); + _disableNullDataSourceMap = getDataSourceMap(schema, rows, DISABLE_NULL_SEGMENT_NAME); + _disableNullProjectionBlock = getProjectionBlock(_disableNullDataSourceMap); + } + + private static void testIntTransformFunction(ExpressionContext expression, int[] expectedValues, + ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap) + throws Exception { + int[] actualValues = + TransformFunctionFactory.get(expression, dataSourceMap).transformToIntValuesSV(projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(actualValues[i], expectedValues[i]); + } + } + + private static void testStringTransformFunction(ExpressionContext expression, String[] expectedValues, + ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap) + throws Exception { + String[] actualValues = + TransformFunctionFactory.get(expression, dataSourceMap).transformToStringValuesSV(projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(actualValues[i], expectedValues[i]); + } + } + + private static void testLongTransformFunction(ExpressionContext expression, long[] expectedValues, + ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap) + throws Exception { + long[] actualValues = + TransformFunctionFactory.get(expression, dataSourceMap).transformToLongValuesSV(projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(actualValues[i], expectedValues[i]); + } + } + + private static void testDoubleTransformFunction(ExpressionContext expression, double[] expectedValues, + ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap) + throws Exception { + double[] actualValues = + TransformFunctionFactory.get(expression, dataSourceMap).transformToDoubleValuesSV(projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(actualValues[i], expectedValues[i]); + } + } + + private static void testFloatTransformFunction(ExpressionContext expression, float[] expectedValues, + ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap) + throws Exception { + float[] actualValues = + TransformFunctionFactory.get(expression, dataSourceMap).transformToFloatValuesSV(projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(actualValues[i], expectedValues[i]); + } + } + + private static void testBigDecimalTransformFunction(ExpressionContext expression, BigDecimal[] expectedValues, + ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap) + throws Exception { + BigDecimal[] actualValues = + TransformFunctionFactory.get(expression, dataSourceMap).transformToBigDecimalValuesSV(projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(actualValues[i], expectedValues[i]); + } + } + + // Test the Coalesce on two Int columns where one or the other or both can be null. + @Test + public void testCoalesceIntColumns() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", INT_SV_COLUMN1, INT_SV_COLUMN2)); + TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce"); + int[] expectedResults = new int[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + if (isColumn1Null(i) && isColumn2Null(i)) { + expectedResults[i] = (int) NullValueUtils.getDefaultNullValue(FieldSpec.DataType.INT); + } else if (isColumn1Null(i)) { + expectedResults[i] = _intSVValues[i] + INT_VALUE_SHIFT; + } else if (isColumn2Null(i)) { + expectedResults[i] = _intSVValues[i]; + } else { + expectedResults[i] = _intSVValues[i]; + } + } + testIntTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap); + testIntTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap); + } + + // Test the Coalesce on two String columns where one or the other or both can be null. + @Test + public void testCoalesceStringColumns() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", STRING_SV_COLUMN1, STRING_SV_COLUMN2)); + TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce"); + String[] expectedResults = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + if (isColumn1Null(i) && isColumn2Null(i)) { + expectedResults[i] = (String) NullValueUtils.getDefaultNullValue(FieldSpec.DataType.STRING); + } else if (isColumn1Null(i)) { + expectedResults[i] = _stringSVValues[i] + SUFFIX; + } else if (isColumn2Null(i)) { + expectedResults[i] = _stringSVValues[i]; + } else { + expectedResults[i] = _stringSVValues[i]; + } + } + testStringTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap); + testStringTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap); + } + + // Test the Coalesce on two big decimal columns where one or the other or both can be null. + @Test + public void testCoalesceBigDecimalColumns() + throws Exception { + ExpressionContext coalesceExpr = RequestContextUtils.getExpression( + String.format("COALESCE(%s,%s)", BIG_DECIMAL_SV_COLUMN1, BIG_DECIMAL_SV_COLUMN2)); + TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce"); + BigDecimal[] expectedResults = new BigDecimal[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + if (isColumn1Null(i) && isColumn2Null(i)) { + expectedResults[i] = (BigDecimal) NullValueUtils.getDefaultNullValue(FieldSpec.DataType.BIG_DECIMAL); + } else if (isColumn1Null(i)) { + expectedResults[i] = BigDecimal.valueOf(_intSVValues[i] + INT_VALUE_SHIFT); + } else if (isColumn2Null(i)) { + expectedResults[i] = BigDecimal.valueOf(_intSVValues[i]); + } else { + expectedResults[i] = BigDecimal.valueOf(_intSVValues[i]); + } + } + testBigDecimalTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, + _enableNullDataSourceMap); + testBigDecimalTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, + _disableNullDataSourceMap); + } + + // Test the Coalesce on two long columns where one or the other or both can be null. + @Test + public void testCoalesceLongColumns() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", LONG_SV_COLUMN1, LONG_SV_COLUMN2)); + TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce"); + long[] expectedResults = new long[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + if (isColumn1Null(i) && isColumn2Null(i)) { + expectedResults[i] = (long) NullValueUtils.getDefaultNullValue(FieldSpec.DataType.LONG); + } else if (isColumn1Null(i)) { + expectedResults[i] = _intSVValues[i] + INT_VALUE_SHIFT; + } else if (isColumn2Null(i)) { + expectedResults[i] = _intSVValues[i]; + } else { + expectedResults[i] = _intSVValues[i]; + } + } + testLongTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap); + testLongTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap); + } + + // Test the Coalesce on two double columns where one or the other or both can be null. + @Test + public void testCoalesceDoubleColumns() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", DOUBLE_SV_COLUMN1, DOUBLE_SV_COLUMN2)); + TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce"); + double[] expectedResults = new double[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + if (isColumn1Null(i) && isColumn2Null(i)) { + expectedResults[i] = (double) NullValueUtils.getDefaultNullValue(FieldSpec.DataType.DOUBLE); + } else if (isColumn1Null(i)) { + expectedResults[i] = _doubleValues[i] + DOUBLE_VALUE_SHIFT; + } else if (isColumn2Null(i)) { + expectedResults[i] = _doubleValues[i]; + } else { + expectedResults[i] = _doubleValues[i]; + } + } + testDoubleTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap); + testDoubleTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap); + } + + // Test the Coalesce on two float columns where one or the other or both can be null. + @Test + public void testCoalesceFloatColumns() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", FLOAT_SV_COLUMN1, FLOAT_SV_COLUMN2)); + TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce"); + float[] expectedResults = new float[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + if (isColumn1Null(i) && isColumn2Null(i)) { + expectedResults[i] = (float) NullValueUtils.getDefaultNullValue(FieldSpec.DataType.FLOAT); + } else if (isColumn1Null(i)) { + expectedResults[i] = _floatValues[i] + FLOAT_VALUE_SHIFT; + } else if (isColumn2Null(i)) { + expectedResults[i] = _floatValues[i]; + } else { + expectedResults[i] = _floatValues[i]; + } + } + testFloatTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap); + testFloatTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap); + } + + // Test that non-column-names appear in one of the argument. + @Test + public void testIllegalColumnName() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", _stringSVValues[0], STRING_SV_COLUMN1)); + Assert.assertThrows(RuntimeException.class, () -> { + TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + }); + Assert.assertThrows(RuntimeException.class, () -> { + TransformFunctionFactory.get(coalesceExpr, _disableNullDataSourceMap); + }); + } + + // Test that wrong data type is illegal argument. + @Test + public void testIllegalArgType() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", TIMESTAMP_COLUMN, STRING_SV_COLUMN)); + Assert.assertThrows(RuntimeException.class, () -> { + TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + }); + Assert.assertThrows(RuntimeException.class, () -> { + TransformFunctionFactory.get(coalesceExpr, _disableNullDataSourceMap); + }); + } + + // Test that all arguments have to be same type. + @Test + public void testDifferentArgumentType() + throws Exception { + ExpressionContext coalesceExpr = + RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", INT_SV_COLUMN1, STRING_SV_COLUMN1)); + Assert.assertThrows(RuntimeException.class, () -> { + TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap); + }); + Assert.assertThrows(RuntimeException.class, () -> { + TransformFunctionFactory.get(coalesceExpr, _disableNullDataSourceMap); + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org