This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new d04785c Introduce 'LOOKUP' Transform Function (#6383) d04785c is described below commit d04785c83f5740a5cec0a2c30d570949304cb8ad Author: Caner Balci <canerba...@gmail.com> AuthorDate: Thu Jan 7 18:00:33 2021 -0800 Introduce 'LOOKUP' Transform Function (#6383) LOOKUP is a regular transform function which uses the previously added DimensionTableDataManager to execute a lookup from a Dimension table. Call signature is as follows: LOOKUP(TableName, ColumnName, JoinKey, JoinValue [, JoinKey2, JoinValue2 ...]) - TableName: name of the dimension table which will be used - ColumnName: column name from the dimension table to look up - JoinKey: primary key column name for the dimension table. Note: Only primary key is supported for JoinKey - JoinValue: primary key value - *If the dimension table has more then one primary keys (composite PK), you can add more keys and values for the rest of the args: JoinKey2, JoinValue2 ... etc. --- .../common/function/TransformFunctionType.java | 1 + .../manager/offline/DimensionTableDataManager.java | 11 + .../function/LookupTransformFunction.java | 332 ++++++++++++++++++ .../function/TransformFunctionFactory.java | 1 + .../offline/DimensionTableDataManagerTest.java | 5 + .../function/BaseTransformFunctionTest.java | 21 ++ .../function/LookupTransformFunctionTest.java | 386 +++++++++++++++++++++ .../org/apache/pinot/tools/JoinQuickStart.java | 7 + 8 files changed, 764 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java index eabd94b..666d116 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java @@ -63,6 +63,7 @@ public enum TransformFunctionType { VALUEIN("valueIn"), MAPVALUE("mapValue"), INIDSET("inIdSet"), + LOOKUP("lookUp"), GROOVY("groovy"), // Special type for annotation based scalar functions SCALAR("scalar"), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index eea60f8..fafb23c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.core.data.manager.offline; +import com.google.common.annotations.VisibleForTesting; import java.io.File; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,6 +66,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager { return _instances.computeIfAbsent(tableNameWithType, k -> new DimensionTableDataManager()); } + @VisibleForTesting + public static DimensionTableDataManager registerDimensionTable(String tableNameWithType, DimensionTableDataManager instance) { + return _instances.computeIfAbsent(tableNameWithType, k -> instance); + } + public static DimensionTableDataManager getInstanceByTableName(String tableNameWithType) { return _instances.get(tableNameWithType); } @@ -161,4 +168,8 @@ public class DimensionTableDataManager extends OfflineTableDataManager { public FieldSpec getColumnFieldSpec(String columnName) { return _tableSchema.getFieldSpecFor(columnName); } + + public List<String> getPrimaryKeyColumns() { + return _primaryKeyColumns; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java new file mode 100644 index 0000000..eb1aeba --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager; +import org.apache.pinot.core.operator.blocks.ProjectionBlock; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * LOOKUP function takes 4 or more arguments: + * <ul> + * <li><b>TableName:</b> name of the dimension table which will be used</li> + * <li><b>ColumnName:</b> column name from the dimension table to look up</li> + * <li><b>JoinKey:</b> primary key column name for the dimension table. Note: Only primary key[s] are supported for JoinKey</li> + * <li><b>JoinValue:</b> primary key value</li> + * ...<br> + * *[If the dimension table has more then one primary keys (composite pk)] + * <li><b>JoinKey2</b></li> + * <li><b>JoinValue2</b></li> + * ... + * </ul> + * <br> + * Example: + * <pre>{@code SELECT + * baseballStats.playerName, + * baseballStats.teamID, + * LOOKUP('dimBaseballTeams', 'teamName', 'teamID', baseballStats.teamID) + * FROM + * baseballStats + * LIMIT 10}</pre> + * <br> + * Above example joins the dimension table 'baseballTeams' into regular table 'baseballStats' on 'teamID' key. + * Lookup function returns the value of the column 'teamName'. + */ +public class LookupTransformFunction extends BaseTransformFunction { + public static final String FUNCTION_NAME = "lookUp"; + + // Lookup parameters + private String _dimTableName; + private String _dimColumnName; + private final List<String> _joinKeys = new ArrayList<>(); + private final List<FieldSpec> _joinValueFieldSpecs = new ArrayList<>(); + private final List<TransformFunction> _joinValueFunctions = new ArrayList<>(); + + private DimensionTableDataManager _dataManager; + private FieldSpec _lookupColumnFieldSpec; + + @Override + public String getName() { + return FUNCTION_NAME; + } + + @Override + public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) { + // Check that there are correct number of arguments + Preconditions + .checkArgument(arguments.size() >= 4, + "At least 4 arguments are required for LOOKUP transform function: " + + "LOOKUP(TableName, ColumnName, JoinKey, JoinValue [, JoinKey2, JoinValue2 ...])"); + Preconditions + .checkArgument(arguments.size() % 2 == 0, "Should have the same number of JoinKey and JoinValue arguments"); + + TransformFunction dimTableNameFunction = arguments.get(0); + Preconditions.checkArgument(dimTableNameFunction instanceof LiteralTransformFunction, + "First argument must be a literal(string) representing the dimension table name"); + _dimTableName = TableNameBuilder.OFFLINE.tableNameWithType( + ((LiteralTransformFunction) dimTableNameFunction).getLiteral()); + + TransformFunction dimColumnFunction = arguments.get(1); + Preconditions.checkArgument(dimColumnFunction instanceof LiteralTransformFunction, + "Second argument must be a literal(string) representing the column name from dimension table to lookup"); + _dimColumnName = ((LiteralTransformFunction) dimColumnFunction).getLiteral(); + + List<TransformFunction> joinArguments = arguments.subList(2, arguments.size()); + int numJoinArguments = joinArguments.size(); + for (int i = 0; i < numJoinArguments / 2; i++) { + TransformFunction dimJoinKeyFunction = joinArguments.get((i * 2)); + Preconditions.checkArgument(dimJoinKeyFunction instanceof LiteralTransformFunction, + "JoinKey argument must be a literal(string) representing the primary key for the dimension table"); + _joinKeys.add(((LiteralTransformFunction) dimJoinKeyFunction).getLiteral()); + + TransformFunction factJoinValueFunction = joinArguments.get((i * 2) + 1); + TransformResultMetadata factJoinValueFunctionResultMetadata = factJoinValueFunction.getResultMetadata(); + Preconditions.checkArgument(factJoinValueFunctionResultMetadata.isSingleValue(), + "JoinValue argument must be a single value expression"); + _joinValueFunctions.add(factJoinValueFunction); + } + + // Validate lookup table and relevant columns + _dataManager = DimensionTableDataManager.getInstanceByTableName(_dimTableName); + Preconditions.checkArgument(_dataManager != null, + "Dimension table does not exist: %s", _dimTableName); + + _lookupColumnFieldSpec = _dataManager.getColumnFieldSpec(_dimColumnName); + Preconditions.checkArgument(_lookupColumnFieldSpec != null, + "Column does not exist in dimension table: %s:%s", _dimTableName, _dimColumnName); + + for (String joinKey : _joinKeys) { + FieldSpec pkColumnSpec = _dataManager.getColumnFieldSpec(joinKey); + Preconditions.checkArgument(pkColumnSpec != null, + "Primary key column doesn't exist in dimension table: %s:%s", _dimTableName, joinKey); + _joinValueFieldSpecs.add(pkColumnSpec); + } + + List<String> tablePrimaryKeyColumns = _dataManager.getPrimaryKeyColumns(); + Preconditions.checkArgument(_joinKeys.equals(tablePrimaryKeyColumns), + "Provided join keys (%s) must be the same as table primary keys: %s", _joinKeys, tablePrimaryKeyColumns); + } + + @Override + public TransformResultMetadata getResultMetadata() { + return new TransformResultMetadata(_lookupColumnFieldSpec.getDataType(), + _lookupColumnFieldSpec.isSingleValueField(), false); + } + + private Object[] lookup(ProjectionBlock projectionBlock) { + int numPkColumns = _joinKeys.size(); + int numDocuments = projectionBlock.getNumDocs(); + Object[][] pkColumns = new Object[numPkColumns][]; + for (int c = 0; c < numPkColumns; c++) { + FieldSpec.DataType colType = _joinValueFieldSpecs.get(c).getDataType(); + TransformFunction tf = _joinValueFunctions.get(c); + switch (colType) { + case INT: + pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock)); + break; + case LONG: + pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock)); + break; + case FLOAT: + pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock)); + break; + case DOUBLE: + pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock)); + break; + case STRING: + pkColumns[c] = tf.transformToStringValuesSV(projectionBlock); + break; + case BYTES: + byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock); + pkColumns[c] = new ByteArray[numDocuments]; + for (int i = 0; i < numDocuments; i++) { + pkColumns[c][i] = new ByteArray(primitiveValues[i]); + } + break; + default: + throw new IllegalStateException("Unknown column type for primary key"); + } + } + + Object[] resultSet = new Object[numDocuments]; + Object[] pkValues = new Object[numPkColumns]; + for (int i = 0; i < numDocuments; i++) { + // prepare pk + for (int c = 0; c < numPkColumns; c++) { + pkValues[c] = pkColumns[c][i]; + } + // lookup + GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues)); + if (row != null) { + resultSet[i] = row.getValue(_dimColumnName); + } + } + return resultSet; + } + + @Override + public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + int[] resultSet = new int[lookupObjects.length]; + Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue()); + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = ((Number) lookupObjects[i]).intValue(); + } + } + return resultSet; + } + + @Override + public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + long[] resultSet = new long[lookupObjects.length]; + Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue()); + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = ((Number) lookupObjects[i]).longValue(); + } + } + return resultSet; + } + + @Override + public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + float[] resultSet = new float[lookupObjects.length]; + Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue()); + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = ((Number) lookupObjects[i]).floatValue(); + } + } + return resultSet; + } + + @Override + public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + double[] resultSet = new double[lookupObjects.length]; + Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue()); + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = ((Number) lookupObjects[i]).doubleValue(); + } + } + return resultSet; + } + + @Override + public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + String[] resultSet = new String[lookupObjects.length]; + Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString()); + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = lookupObjects[i].toString(); + } + } + return resultSet; + } + + @Override + public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + byte[][] resultSet = new byte[lookupObjects.length][0]; + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = (byte[]) lookupObjects[i]; + } + } + return resultSet; + } + + @Override + public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + int[][] resultSet = new int[lookupObjects.length][0]; + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = (int[]) lookupObjects[i]; + } + } + return resultSet; + } + + @Override + public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + long[][] resultSet = new long[lookupObjects.length][0]; + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = (long[]) lookupObjects[i]; + } + } + return resultSet; + } + + @Override + public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + float[][] resultSet = new float[lookupObjects.length][0]; + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = (float[]) lookupObjects[i]; + } + } + return resultSet; + } + + @Override + public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + double[][] resultSet = new double[lookupObjects.length][0]; + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = (double[]) lookupObjects[i]; + } + } + return resultSet; + } + + @Override + public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) { + Object[] lookupObjects = lookup(projectionBlock); + String[][] resultSet = new String[lookupObjects.length][0]; + for (int i = 0; i < lookupObjects.length; i++) { + if (lookupObjects[i] != null) { + resultSet[i] = (String[]) lookupObjects[i]; + } + } + return resultSet; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java index 3d6ff94..fb0c826 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java @@ -92,6 +92,7 @@ public class TransformFunctionFactory { put(canonicalize(TransformFunctionType.VALUEIN.getName().toLowerCase()), ValueInTransformFunction.class); put(canonicalize(TransformFunctionType.MAPVALUE.getName().toLowerCase()), MapValueTransformFunction.class); put(canonicalize(TransformFunctionType.INIDSET.getName().toLowerCase()), InIdSetTransformFunction.class); + put(canonicalize(TransformFunctionType.LOOKUP.getName().toLowerCase()), LookupTransformFunction.class); // Array functions put(canonicalize(TransformFunctionType.ARRAYAVERAGE.getName().toLowerCase()), ArrayAverageTransformFunction.class); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index d489144..23dd01f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager.offline; import com.yammer.metrics.core.MetricsRegistry; import java.io.File; import java.net.URL; +import java.util.Arrays; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.helix.AccessOption; @@ -160,6 +161,10 @@ public class DimensionTableDataManagerTest { Assert.assertEquals(spec.getDataType(), FieldSpec.DataType.STRING, "Should return correct data type for teamName column"); + // Confirm we can read primary column list + List<String> pkColumns = mgr.getPrimaryKeyColumns(); + Assert.assertEquals(pkColumns, Arrays.asList("teamID"), "Should return PK column list"); + // Remove the segment List<SegmentDataManager> segmentManagers = mgr.acquireAllSegments(); Assert.assertEquals(segmentManagers.size(), 1, "Should have exactly one segment manager"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java index 91eb467..6beca8a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java @@ -247,6 +247,27 @@ public abstract class BaseTransformFunctionTest { } } + protected void testTransformFunctionMV(TransformFunction transformFunction, long[][] expectedValues) { + long[][] longMVValues = transformFunction.transformToLongValuesMV(_projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(longMVValues[i], expectedValues[i]); + } + } + + protected void testTransformFunctionMV(TransformFunction transformFunction, float[][] expectedValues) { + float[][] floatMVValues = transformFunction.transformToFloatValuesMV(_projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(floatMVValues[i], expectedValues[i]); + } + } + + protected void testTransformFunctionMV(TransformFunction transformFunction, double[][] expectedValues) { + double[][] doubleMVValues = transformFunction.transformToDoubleValuesMV(_projectionBlock); + for (int i = 0; i < NUM_ROWS; i++) { + Assert.assertEquals(doubleMVValues[i], expectedValues[i]); + } + } + protected void testTransformFunctionMV(TransformFunction transformFunction, String[][] expectedValues) { String[][] stringMVValues = transformFunction.transformToStringValuesMV(_projectionBlock); for (int i = 0; i < NUM_ROWS; i++) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java new file mode 100644 index 0000000..93e3633 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.util.Arrays; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.pinot.common.utils.PrimitiveArrayUtils; +import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager; +import org.apache.pinot.core.query.exception.BadQueryRequestException; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.ByteArray; +import org.testng.Assert; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.*; + + +public class LookupTransformFunctionTest extends BaseTransformFunctionTest { + private static final String TABLE_NAME = "baseballTeams_OFFLINE"; + private DimensionTableDataManager tableManager; + + @BeforeSuite + public void setUp() + throws Exception { + super.setUp(); + + createTestableTableManager(); + } + + private void createTestableTableManager() { + tableManager = mock(DimensionTableDataManager.class); + DimensionTableDataManager.registerDimensionTable(TABLE_NAME, tableManager); + + // Creating a mock table which looks like: + // TeamID (PK, str) | TeamName(str) | TeamName_MV(str[]) | TeamInteger(int) | TeamInteger_MV(int[]) | TeamFloat(float) | ... + // + // All values are dynamically created to be variations of the primary key. + // e.g + // lookupRowByPrimaryKey(['FOO']) -> (TeamID: 'foo', TeamName: 'teamName_for_foo', TeamInteger: hashCode(['foo']), ... + // + when(tableManager.getPrimaryKeyColumns()).thenReturn(Arrays.asList("teamID")); + when(tableManager.getColumnFieldSpec("teamID")) + .thenReturn(new DimensionFieldSpec("teamID", FieldSpec.DataType.STRING, true)); + when(tableManager.getColumnFieldSpec("teamName")) + .thenReturn(new DimensionFieldSpec("teamName", FieldSpec.DataType.STRING, true)); + when(tableManager.getColumnFieldSpec("teamName_MV")) + .thenReturn(new DimensionFieldSpec("teamName_MV", FieldSpec.DataType.STRING, false)); + when(tableManager.getColumnFieldSpec("teamInteger")) + .thenReturn(new DimensionFieldSpec("teamInteger", FieldSpec.DataType.INT, true)); + when(tableManager.getColumnFieldSpec("teamInteger_MV")) + .thenReturn(new DimensionFieldSpec("teamInteger_MV", FieldSpec.DataType.INT, false)); + when(tableManager.getColumnFieldSpec("teamFloat")) + .thenReturn(new DimensionFieldSpec("teamFloat", FieldSpec.DataType.FLOAT, true)); + when(tableManager.getColumnFieldSpec("teamFloat_MV")) + .thenReturn(new DimensionFieldSpec("teamFloat_MV", FieldSpec.DataType.FLOAT, false)); + when(tableManager.getColumnFieldSpec("teamDouble")) + .thenReturn(new DimensionFieldSpec("teamDouble", FieldSpec.DataType.DOUBLE, true)); + when(tableManager.getColumnFieldSpec("teamDouble_MV")) + .thenReturn(new DimensionFieldSpec("teamDouble_MV", FieldSpec.DataType.DOUBLE, false)); + when(tableManager.getColumnFieldSpec("teamLong")) + .thenReturn(new DimensionFieldSpec("teamLong", FieldSpec.DataType.LONG, true)); + when(tableManager.getColumnFieldSpec("teamLong_MV")) + .thenReturn(new DimensionFieldSpec("teamLong_MV", FieldSpec.DataType.LONG, false)); + when(tableManager.getColumnFieldSpec("teamBytes")) + .thenReturn(new DimensionFieldSpec("teamNameBytes", FieldSpec.DataType.BYTES, true)); + when(tableManager.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation -> { + PrimaryKey key = invocation.getArgument(0); + GenericRow row = new GenericRow(); + row.putValue("teamName", "teamName_for_" + key.toString()); + row.putValue("teamName_MV", + new String[]{"teamName_for_" + key.toString() + "_1", "teamName_for_" + key.toString() + "_2",}); + row.putValue("teamInteger", key.hashCode()); + row.putValue("teamInteger_MV", new int[]{key.hashCode(), key.hashCode()}); + row.putValue("teamFloat", (float) key.hashCode()); + row.putValue("teamFloat_MV", new float[]{(float) key.hashCode(), (float) key.hashCode()}); + row.putValue("teamDouble", (double) key.hashCode()); + row.putValue("teamDouble_MV", new double[]{(double) key.hashCode(), (double) key.hashCode()}); + row.putValue("teamLong", (long) key.hashCode()); + row.putValue("teamLong_MV", new long[]{(long) key.hashCode(), (long) key.hashCode()}); + row.putValue("teamBytes", ("teamBytes_for_" + key.toString()).getBytes()); + return row; + }); + } + + @Test + public void instantiationTests() + throws Exception { + // Success case + ExpressionContext expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)", STRING_SV_COLUMN)); + TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + Assert.assertTrue(transformFunction instanceof LookupTransformFunction); + Assert.assertEquals(transformFunction.getName(), LookupTransformFunction.FUNCTION_NAME); + + // Wrong number of arguments + Assert.assertThrows(BadQueryRequestException.class, () -> { + TransformFunctionFactory + .get(QueryContextConverterUtils.getExpression(String.format("lookup('baseballTeams','teamName','teamID')")), + _dataSourceMap); + }); + + // Wrong number of join keys + Assert.assertThrows(BadQueryRequestException.class, () -> { + TransformFunctionFactory.get(QueryContextConverterUtils.getExpression( + String.format("lookup('baseballTeams','teamName','teamID', %s, 'danglingKey')", STRING_SV_COLUMN)), + _dataSourceMap); + }); + + // Non literal tableName argument + Assert.assertThrows(BadQueryRequestException.class, () -> { + TransformFunctionFactory.get(QueryContextConverterUtils + .getExpression(String.format("lookup(%s,'teamName','teamID', %s)", STRING_SV_COLUMN, INT_SV_COLUMN)), + _dataSourceMap); + }); + + // Non literal lookup columnName argument + Assert.assertThrows(BadQueryRequestException.class, () -> { + TransformFunctionFactory.get(QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams',%s,'teamID',%s)", STRING_SV_COLUMN, INT_SV_COLUMN)), + _dataSourceMap); + }); + + // Non literal lookup columnName argument + Assert.assertThrows(BadQueryRequestException.class, () -> { + TransformFunctionFactory.get(QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamName',%s,%s)", STRING_SV_COLUMN, INT_SV_COLUMN)), + _dataSourceMap); + }); + } + + @Test + public void resultDataTypeTest() + throws Exception { + HashMap<String, FieldSpec.DataType> testCases = new HashMap<String, FieldSpec.DataType>() {{ + put("teamName", FieldSpec.DataType.STRING); + put("teamInteger", FieldSpec.DataType.INT); + put("teamFloat", FieldSpec.DataType.FLOAT); + put("teamLong", FieldSpec.DataType.LONG); + put("teamDouble", FieldSpec.DataType.DOUBLE); + }}; + + for (Map.Entry<String, FieldSpec.DataType> testCase : testCases.entrySet()) { + ExpressionContext expression = QueryContextConverterUtils.getExpression( + String.format("lookup('baseballTeams','%s','teamID',%s)", testCase.getKey(), STRING_SV_COLUMN)); + TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + Assert.assertEquals(transformFunction.getResultMetadata().getDataType(), testCase.getValue(), + String.format("Expecting %s data type for lookup column: '%s'", testCase.getKey(), testCase.getValue())); + } + } + + @Test + public void basicLookupTests() + throws Exception { + // Lookup col: StringSV + // PK: [String] + ExpressionContext expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)", STRING_SV_COLUMN)); + TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + String[] expectedStringValues = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedStringValues[i] = String.format("teamName_for_[%s]", _stringSVValues[i]); + } + testTransformFunction(transformFunction, expectedStringValues); + + // Lookup col: IntSV + // PK: [String] + expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamInteger','teamID',%s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + int[] expectedIntValues = new int[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedIntValues[i] = (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(); + } + testTransformFunction(transformFunction, expectedIntValues); + + // Lookup col: DoubleSV + // PK: [String] + expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamDouble','teamID',%s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + double[] expectedDoubleValues = new double[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedDoubleValues[i] = (double) (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(); + } + testTransformFunction(transformFunction, expectedDoubleValues); + + // Lookup col: BytesSV + // PK: [String] + expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamBytes','teamID',%s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + byte[][] expectedBytesValues = new byte[NUM_ROWS][]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedBytesValues[i] = String.format("teamBytes_for_[%s]", _stringSVValues[i]).getBytes(); + } + testTransformFunction(transformFunction, expectedBytesValues); + } + + @Test + public void multiValueLookupTests() + throws Exception { + // Lookup col: StringMV + // PK: [String] + ExpressionContext expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamName_MV','teamID',%s)", STRING_SV_COLUMN)); + TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + String[][] expectedStringMVValues = new String[NUM_ROWS][]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedStringMVValues[i] = + new String[]{String.format("teamName_for_[%s]_1", _stringSVValues[i]), String.format("teamName_for_[%s]_2", + _stringSVValues[i]),}; + } + testTransformFunctionMV(transformFunction, expectedStringMVValues); + + // Lookup col: IntegerMV + // PK: [String] + expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamInteger_MV','teamID',%s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + int[][] expectedIntegerMVValues = new int[NUM_ROWS][0]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedIntegerMVValues[i] = + new int[]{(new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(), (new PrimaryKey( + new Object[]{_stringSVValues[i]})).hashCode(),}; + } + testTransformFunctionMV(transformFunction, expectedIntegerMVValues); + + // Lookup col: FloatMV + // PK: [String] + expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamFloat_MV','teamID',%s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + float[][] expectedFloatMVValues = new float[NUM_ROWS][0]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedFloatMVValues[i] = + new float[]{(float) (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(), (float) (new PrimaryKey( + new Object[]{_stringSVValues[i]})).hashCode(),}; + } + testTransformFunctionMV(transformFunction, expectedFloatMVValues); + + // Lookup col: LongMV + // PK: [String] + expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamLong_MV','teamID',%s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + long[][] expectedLongMVValues = new long[NUM_ROWS][0]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedLongMVValues[i] = + new long[]{(long) (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(), (long) (new PrimaryKey( + new Object[]{_stringSVValues[i]})).hashCode(),}; + } + testTransformFunctionMV(transformFunction, expectedLongMVValues); + + // Lookup col: DoubleMV + // PK: [String] + expression = QueryContextConverterUtils + .getExpression(String.format("lookup('baseballTeams','teamDouble_MV','teamID',%s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + double[][] expectedDoubleMVValues = new double[NUM_ROWS][0]; + for (int i = 0; i < NUM_ROWS; i++) { + expectedDoubleMVValues[i] = + new double[]{(double) (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(), (double) (new PrimaryKey( + new Object[]{_stringSVValues[i]})).hashCode(),}; + } + testTransformFunctionMV(transformFunction, expectedDoubleMVValues); + } + + @Test + public void primaryKeyTypeTest() + throws Exception { + // preparing simple tables for testing different primary key types (INT, STRING, LONG) + Map<String, FieldSpec.DataType> testTables = new HashMap<String, FieldSpec.DataType>() {{ + put("dimTableWithIntPK_OFFLINE", FieldSpec.DataType.INT); + put("dimTableWithStringPK_OFFLINE", FieldSpec.DataType.STRING); + put("dimTableWithLongPK_OFFLINE", FieldSpec.DataType.LONG); + put("dimTableWithFloatPK_OFFLINE", FieldSpec.DataType.FLOAT); + put("dimTableWithDoublePK_OFFLINE", FieldSpec.DataType.DOUBLE); + put("dimTableWithBytesPK_OFFLINE", FieldSpec.DataType.BYTES); + }}; + for (Map.Entry<String, FieldSpec.DataType> table : testTables.entrySet()) { + DimensionTableDataManager mgr = mock(DimensionTableDataManager.class); + DimensionTableDataManager.registerDimensionTable(table.getKey(), mgr); + when(mgr.getPrimaryKeyColumns()).thenReturn(Arrays.asList("primaryColumn")); + when(mgr.getColumnFieldSpec("primaryColumn")) + .thenReturn(new DimensionFieldSpec("primaryColumn", table.getValue(), true)); + when(mgr.getColumnFieldSpec("lookupColumn")) + .thenReturn(new DimensionFieldSpec("lookupColumn", FieldSpec.DataType.STRING, true)); + when(mgr.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation -> { + PrimaryKey key = invocation.getArgument(0); + GenericRow row = new GenericRow(); + row.putValue("lookupColumn", String.format("lookup_value_for_[%s]", key.hashCode())); + return row; + }); + } + + // PK: [Int] + ExpressionContext expression = QueryContextConverterUtils.getExpression( + String.format("lookup('dimTableWithIntPK', 'lookupColumn', 'primaryColumn', %s)", INT_SV_COLUMN)); + TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + String[] expectedResults = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + PrimaryKey key = new PrimaryKey(new Object[]{(Integer)_intSVValues[i]}); + expectedResults[i] = String.format("lookup_value_for_[%s]", key.hashCode()); + } + testTransformFunction(transformFunction, expectedResults); + + // PK: [String] + expression = QueryContextConverterUtils.getExpression( + String.format("lookup('dimTableWithStringPK', 'lookupColumn', 'primaryColumn', %s)", STRING_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + expectedResults = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + PrimaryKey key = new PrimaryKey(new Object[]{_stringSVValues[i]}); + expectedResults[i] = String.format("lookup_value_for_[%s]", key.hashCode()); + } + testTransformFunction(transformFunction, expectedResults); + + // PK: [Long] + expression = QueryContextConverterUtils.getExpression( + String.format("lookup('dimTableWithLongPK', 'lookupColumn', 'primaryColumn', %s)", LONG_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + expectedResults = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + PrimaryKey key = new PrimaryKey(new Object[]{(Long)_longSVValues[i]}); + expectedResults[i] = String.format("lookup_value_for_[%s]", key.hashCode()); + } + testTransformFunction(transformFunction, expectedResults); + + // PK: [Float] + expression = QueryContextConverterUtils.getExpression( + String.format("lookup('dimTableWithFloatPK', 'lookupColumn', 'primaryColumn', %s)", FLOAT_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + expectedResults = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + PrimaryKey key = new PrimaryKey(new Object[]{(Float)_floatSVValues[i]}); + expectedResults[i] = String.format("lookup_value_for_[%s]", key.hashCode()); + } + testTransformFunction(transformFunction, expectedResults); + + // PK: [Double] + expression = QueryContextConverterUtils.getExpression( + String.format("lookup('dimTableWithDoublePK', 'lookupColumn', 'primaryColumn', %s)", DOUBLE_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + expectedResults = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + PrimaryKey key = new PrimaryKey(new Object[]{(Double)_doubleSVValues[i]}); + expectedResults[i] = String.format("lookup_value_for_[%s]", key.hashCode()); + } + testTransformFunction(transformFunction, expectedResults); + + // PK: [Byte[]] + expression = QueryContextConverterUtils.getExpression( + String.format("lookup('dimTableWithBytesPK', 'lookupColumn', 'primaryColumn', %s)", BYTES_SV_COLUMN)); + transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap); + expectedResults = new String[NUM_ROWS]; + for (int i = 0; i < NUM_ROWS; i++) { + PrimaryKey key = new PrimaryKey(new Object[]{new ByteArray(_bytesSVValues[i])}); + expectedResults[i] = String.format("lookup_value_for_[%s]", key.hashCode()); + } + testTransformFunction(transformFunction, expectedResults); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java index a4e6340..01fc669 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java @@ -118,6 +118,13 @@ public class JoinQuickStart printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q2))); printStatus(Quickstart.Color.GREEN, "***************************************************"); + String q3 = "select playerName, teamID, lookup('dimBaseballTeams', 'teamName', 'teamID', teamID) from baseballStats limit 10"; + printStatus(Quickstart.Color.YELLOW, "Baseball Stats with joined team names"); + printStatus(Quickstart.Color.CYAN, "Query : " + q3); + printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q3))); + printStatus(Quickstart.Color.GREEN, "***************************************************"); + + printStatus(Quickstart.Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org