This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d04785c  Introduce 'LOOKUP' Transform Function (#6383)
d04785c is described below

commit d04785c83f5740a5cec0a2c30d570949304cb8ad
Author: Caner Balci <canerba...@gmail.com>
AuthorDate: Thu Jan 7 18:00:33 2021 -0800

    Introduce 'LOOKUP' Transform Function (#6383)
    
    LOOKUP is a regular transform function which uses the previously added 
DimensionTableDataManager to execute a lookup from a Dimension table. Call 
signature is as follows:
    
    LOOKUP(TableName, ColumnName, JoinKey, JoinValue [, JoinKey2, JoinValue2 
...])
    
    - TableName: name of the dimension table which will be used
    - ColumnName: column name from the dimension table to look up
    - JoinKey: primary key column name for the dimension table. Note: Only 
primary key is supported for JoinKey
    - JoinValue: primary key value
    - *If the dimension table has more then one primary keys (composite PK), 
you can add more keys and values for the rest of the args: JoinKey2, JoinValue2 
... etc.
---
 .../common/function/TransformFunctionType.java     |   1 +
 .../manager/offline/DimensionTableDataManager.java |  11 +
 .../function/LookupTransformFunction.java          | 332 ++++++++++++++++++
 .../function/TransformFunctionFactory.java         |   1 +
 .../offline/DimensionTableDataManagerTest.java     |   5 +
 .../function/BaseTransformFunctionTest.java        |  21 ++
 .../function/LookupTransformFunctionTest.java      | 386 +++++++++++++++++++++
 .../org/apache/pinot/tools/JoinQuickStart.java     |   7 +
 8 files changed, 764 insertions(+)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index eabd94b..666d116 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -63,6 +63,7 @@ public enum TransformFunctionType {
   VALUEIN("valueIn"),
   MAPVALUE("mapValue"),
   INIDSET("inIdSet"),
+  LOOKUP("lookUp"),
   GROOVY("groovy"),
   // Special type for annotation based scalar functions
   SCALAR("scalar"),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index eea60f8..fafb23c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,6 +66,11 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     return _instances.computeIfAbsent(tableNameWithType, k -> new 
DimensionTableDataManager());
   }
 
+  @VisibleForTesting
+  public static DimensionTableDataManager registerDimensionTable(String 
tableNameWithType, DimensionTableDataManager instance) {
+    return _instances.computeIfAbsent(tableNameWithType, k -> instance);
+  }
+
   public static DimensionTableDataManager getInstanceByTableName(String 
tableNameWithType) {
     return _instances.get(tableNameWithType);
   }
@@ -161,4 +168,8 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
   public FieldSpec getColumnFieldSpec(String columnName) {
     return _tableSchema.getFieldSpecFor(columnName);
   }
+
+  public List<String> getPrimaryKeyColumns() {
+    return _primaryKeyColumns;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
new file mode 100644
index 0000000..eb1aeba
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * LOOKUP function takes 4 or more arguments:
+ * <ul>
+ *   <li><b>TableName:</b> name of the dimension table which will be used</li>
+ *   <li><b>ColumnName:</b> column name from the dimension table to look 
up</li>
+ *   <li><b>JoinKey:</b> primary key column name for the dimension table. 
Note: Only primary key[s] are supported for JoinKey</li>
+ *   <li><b>JoinValue:</b> primary key value</li>
+ *   ...<br>
+ *   *[If the dimension table has more then one primary keys (composite pk)]
+ *     <li><b>JoinKey2</b></li>
+ *     <li><b>JoinValue2</b></li>
+ *   ...
+ * </ul>
+ * <br>
+ * Example:
+ * <pre>{@code SELECT
+ *    baseballStats.playerName,
+ *    baseballStats.teamID,
+ *    LOOKUP('dimBaseballTeams', 'teamName', 'teamID', baseballStats.teamID)
+ * FROM
+ *    baseballStats
+ * LIMIT 10}</pre>
+ * <br>
+ * Above example joins the dimension table 'baseballTeams' into regular table 
'baseballStats' on 'teamID' key.
+ * Lookup function returns the value of the column 'teamName'.
+ */
+public class LookupTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "lookUp";
+
+  // Lookup parameters
+  private String _dimTableName;
+  private String _dimColumnName;
+  private final List<String> _joinKeys = new ArrayList<>();
+  private final List<FieldSpec> _joinValueFieldSpecs = new ArrayList<>();
+  private final List<TransformFunction> _joinValueFunctions = new 
ArrayList<>();
+
+  private DimensionTableDataManager _dataManager;
+  private FieldSpec _lookupColumnFieldSpec;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are correct number of arguments
+    Preconditions
+        .checkArgument(arguments.size() >= 4,
+            "At least 4 arguments are required for LOOKUP transform function: 
" +
+            "LOOKUP(TableName, ColumnName, JoinKey, JoinValue [, JoinKey2, 
JoinValue2 ...])");
+    Preconditions
+        .checkArgument(arguments.size() % 2 == 0, "Should have the same number 
of JoinKey and JoinValue arguments");
+
+    TransformFunction dimTableNameFunction = arguments.get(0);
+    Preconditions.checkArgument(dimTableNameFunction instanceof 
LiteralTransformFunction,
+        "First argument must be a literal(string) representing the dimension 
table name");
+    _dimTableName = TableNameBuilder.OFFLINE.tableNameWithType(
+        ((LiteralTransformFunction) dimTableNameFunction).getLiteral());
+
+    TransformFunction dimColumnFunction = arguments.get(1);
+    Preconditions.checkArgument(dimColumnFunction instanceof 
LiteralTransformFunction,
+        "Second argument must be a literal(string) representing the column 
name from dimension table to lookup");
+    _dimColumnName = ((LiteralTransformFunction) 
dimColumnFunction).getLiteral();
+
+    List<TransformFunction> joinArguments = arguments.subList(2, 
arguments.size());
+    int numJoinArguments = joinArguments.size();
+    for (int i = 0; i < numJoinArguments / 2; i++) {
+      TransformFunction dimJoinKeyFunction = joinArguments.get((i * 2));
+      Preconditions.checkArgument(dimJoinKeyFunction instanceof 
LiteralTransformFunction,
+          "JoinKey argument must be a literal(string) representing the primary 
key for the dimension table");
+      _joinKeys.add(((LiteralTransformFunction) 
dimJoinKeyFunction).getLiteral());
+
+      TransformFunction factJoinValueFunction = joinArguments.get((i * 2) + 1);
+      TransformResultMetadata factJoinValueFunctionResultMetadata = 
factJoinValueFunction.getResultMetadata();
+      
Preconditions.checkArgument(factJoinValueFunctionResultMetadata.isSingleValue(),
+          "JoinValue argument must be a single value expression");
+      _joinValueFunctions.add(factJoinValueFunction);
+    }
+
+    // Validate lookup table and relevant columns
+    _dataManager = 
DimensionTableDataManager.getInstanceByTableName(_dimTableName);
+    Preconditions.checkArgument(_dataManager != null,
+        "Dimension table does not exist: %s", _dimTableName);
+
+    _lookupColumnFieldSpec = _dataManager.getColumnFieldSpec(_dimColumnName);
+    Preconditions.checkArgument(_lookupColumnFieldSpec != null,
+        "Column does not exist in dimension table: %s:%s", _dimTableName, 
_dimColumnName);
+
+    for (String joinKey : _joinKeys) {
+      FieldSpec pkColumnSpec = _dataManager.getColumnFieldSpec(joinKey);
+      Preconditions.checkArgument(pkColumnSpec != null,
+          "Primary key column doesn't exist in dimension table: %s:%s", 
_dimTableName, joinKey);
+      _joinValueFieldSpecs.add(pkColumnSpec);
+    }
+
+    List<String> tablePrimaryKeyColumns = _dataManager.getPrimaryKeyColumns();
+    Preconditions.checkArgument(_joinKeys.equals(tablePrimaryKeyColumns),
+        "Provided join keys (%s) must be the same as table primary keys: %s", 
_joinKeys, tablePrimaryKeyColumns);
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return new TransformResultMetadata(_lookupColumnFieldSpec.getDataType(),
+        _lookupColumnFieldSpec.isSingleValueField(), false);
+  }
+
+  private Object[] lookup(ProjectionBlock projectionBlock) {
+    int numPkColumns = _joinKeys.size();
+    int numDocuments = projectionBlock.getNumDocs();
+    Object[][] pkColumns = new Object[numPkColumns][];
+    for (int c = 0; c < numPkColumns; c++) {
+      FieldSpec.DataType colType = _joinValueFieldSpecs.get(c).getDataType();
+      TransformFunction tf = _joinValueFunctions.get(c);
+      switch (colType) {
+        case INT:
+          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          break;
+        case LONG:
+          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          break;
+        case FLOAT:
+          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          break;
+        case DOUBLE:
+          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          break;
+        case STRING:
+          pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
+          break;
+        case BYTES:
+          byte[][] primitiveValues = 
tf.transformToBytesValuesSV(projectionBlock);
+          pkColumns[c] = new ByteArray[numDocuments];
+          for (int i = 0; i < numDocuments; i++) {
+            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
+          }
+          break;
+        default:
+          throw new IllegalStateException("Unknown column type for primary 
key");
+      }
+    }
+
+    Object[] resultSet = new Object[numDocuments];
+    Object[] pkValues = new Object[numPkColumns];
+    for (int i = 0; i < numDocuments; i++) {
+      // prepare pk
+      for (int c = 0; c < numPkColumns; c++) {
+        pkValues[c] = pkColumns[c][i];
+      }
+      // lookup
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(new 
PrimaryKey(pkValues));
+      if (row != null) {
+        resultSet[i] = row.getValue(_dimColumnName);
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    int[] resultSet = new int[lookupObjects.length];
+    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).intValue());
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = ((Number) lookupObjects[i]).intValue();
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    long[] resultSet = new long[lookupObjects.length];
+    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).longValue());
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = ((Number) lookupObjects[i]).longValue();
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    float[] resultSet = new float[lookupObjects.length];
+    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    double[] resultSet = new double[lookupObjects.length];
+    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    String[] resultSet = new String[lookupObjects.length];
+    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = lookupObjects[i].toString();
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    byte[][] resultSet = new byte[lookupObjects.length][0];
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = (byte[]) lookupObjects[i];
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    int[][] resultSet = new int[lookupObjects.length][0];
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = (int[]) lookupObjects[i];
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    long[][] resultSet = new long[lookupObjects.length][0];
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = (long[]) lookupObjects[i];
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
+    Object[] lookupObjects = lookup(projectionBlock);
+    float[][] resultSet = new float[lookupObjects.length][0];
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = (float[]) lookupObjects[i];
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) 
{
+    Object[] lookupObjects = lookup(projectionBlock);
+    double[][] resultSet = new double[lookupObjects.length][0];
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = (double[]) lookupObjects[i];
+      }
+    }
+    return resultSet;
+  }
+
+  @Override
+  public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) 
{
+    Object[] lookupObjects = lookup(projectionBlock);
+    String[][] resultSet = new String[lookupObjects.length][0];
+    for (int i = 0; i < lookupObjects.length; i++) {
+      if (lookupObjects[i] != null) {
+        resultSet[i] = (String[]) lookupObjects[i];
+      }
+    }
+    return resultSet;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index 3d6ff94..fb0c826 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -92,6 +92,7 @@ public class TransformFunctionFactory {
           
put(canonicalize(TransformFunctionType.VALUEIN.getName().toLowerCase()), 
ValueInTransformFunction.class);
           
put(canonicalize(TransformFunctionType.MAPVALUE.getName().toLowerCase()), 
MapValueTransformFunction.class);
           
put(canonicalize(TransformFunctionType.INIDSET.getName().toLowerCase()), 
InIdSetTransformFunction.class);
+          
put(canonicalize(TransformFunctionType.LOOKUP.getName().toLowerCase()), 
LookupTransformFunction.class);
 
           // Array functions
           
put(canonicalize(TransformFunctionType.ARRAYAVERAGE.getName().toLowerCase()), 
ArrayAverageTransformFunction.class);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index d489144..23dd01f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager.offline;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.AccessOption;
@@ -160,6 +161,10 @@ public class DimensionTableDataManagerTest {
     Assert.assertEquals(spec.getDataType(), FieldSpec.DataType.STRING,
         "Should return correct data type for teamName column");
 
+    // Confirm we can read primary column list
+    List<String> pkColumns = mgr.getPrimaryKeyColumns();
+    Assert.assertEquals(pkColumns, Arrays.asList("teamID"), "Should return PK 
column list");
+
     // Remove the segment
     List<SegmentDataManager> segmentManagers = mgr.acquireAllSegments();
     Assert.assertEquals(segmentManagers.size(), 1, "Should have exactly one 
segment manager");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index 91eb467..6beca8a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -247,6 +247,27 @@ public abstract class BaseTransformFunctionTest {
     }
   }
 
+  protected void testTransformFunctionMV(TransformFunction transformFunction, 
long[][] expectedValues) {
+    long[][] longMVValues = 
transformFunction.transformToLongValuesMV(_projectionBlock);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Assert.assertEquals(longMVValues[i], expectedValues[i]);
+    }
+  }
+
+  protected void testTransformFunctionMV(TransformFunction transformFunction, 
float[][] expectedValues) {
+    float[][] floatMVValues = 
transformFunction.transformToFloatValuesMV(_projectionBlock);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Assert.assertEquals(floatMVValues[i], expectedValues[i]);
+    }
+  }
+
+  protected void testTransformFunctionMV(TransformFunction transformFunction, 
double[][] expectedValues) {
+    double[][] doubleMVValues = 
transformFunction.transformToDoubleValuesMV(_projectionBlock);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Assert.assertEquals(doubleMVValues[i], expectedValues[i]);
+    }
+  }
+
   protected void testTransformFunctionMV(TransformFunction transformFunction, 
String[][] expectedValues) {
     String[][] stringMVValues = 
transformFunction.transformToStringValuesMV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
new file mode 100644
index 0000000..93e3633
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.Arrays;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pinot.common.utils.PrimitiveArrayUtils;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.*;
+
+
+public class LookupTransformFunctionTest extends BaseTransformFunctionTest {
+  private static final String TABLE_NAME = "baseballTeams_OFFLINE";
+  private DimensionTableDataManager tableManager;
+
+  @BeforeSuite
+  public void setUp()
+      throws Exception {
+    super.setUp();
+
+    createTestableTableManager();
+  }
+
+  private void createTestableTableManager() {
+    tableManager = mock(DimensionTableDataManager.class);
+    DimensionTableDataManager.registerDimensionTable(TABLE_NAME, tableManager);
+
+    // Creating a mock table which looks like:
+    // TeamID (PK, str) | TeamName(str) | TeamName_MV(str[]) | 
TeamInteger(int) | TeamInteger_MV(int[]) | TeamFloat(float) | ...
+    //
+    // All values are dynamically created to be variations of the primary key.
+    // e.g
+    // lookupRowByPrimaryKey(['FOO']) -> (TeamID: 'foo', TeamName: 
'teamName_for_foo', TeamInteger: hashCode(['foo']), ...
+    //
+    
when(tableManager.getPrimaryKeyColumns()).thenReturn(Arrays.asList("teamID"));
+    when(tableManager.getColumnFieldSpec("teamID"))
+        .thenReturn(new DimensionFieldSpec("teamID", 
FieldSpec.DataType.STRING, true));
+    when(tableManager.getColumnFieldSpec("teamName"))
+        .thenReturn(new DimensionFieldSpec("teamName", 
FieldSpec.DataType.STRING, true));
+    when(tableManager.getColumnFieldSpec("teamName_MV"))
+        .thenReturn(new DimensionFieldSpec("teamName_MV", 
FieldSpec.DataType.STRING, false));
+    when(tableManager.getColumnFieldSpec("teamInteger"))
+        .thenReturn(new DimensionFieldSpec("teamInteger", 
FieldSpec.DataType.INT, true));
+    when(tableManager.getColumnFieldSpec("teamInteger_MV"))
+        .thenReturn(new DimensionFieldSpec("teamInteger_MV", 
FieldSpec.DataType.INT, false));
+    when(tableManager.getColumnFieldSpec("teamFloat"))
+        .thenReturn(new DimensionFieldSpec("teamFloat", 
FieldSpec.DataType.FLOAT, true));
+    when(tableManager.getColumnFieldSpec("teamFloat_MV"))
+        .thenReturn(new DimensionFieldSpec("teamFloat_MV", 
FieldSpec.DataType.FLOAT, false));
+    when(tableManager.getColumnFieldSpec("teamDouble"))
+        .thenReturn(new DimensionFieldSpec("teamDouble", 
FieldSpec.DataType.DOUBLE, true));
+    when(tableManager.getColumnFieldSpec("teamDouble_MV"))
+        .thenReturn(new DimensionFieldSpec("teamDouble_MV", 
FieldSpec.DataType.DOUBLE, false));
+    when(tableManager.getColumnFieldSpec("teamLong"))
+        .thenReturn(new DimensionFieldSpec("teamLong", 
FieldSpec.DataType.LONG, true));
+    when(tableManager.getColumnFieldSpec("teamLong_MV"))
+        .thenReturn(new DimensionFieldSpec("teamLong_MV", 
FieldSpec.DataType.LONG, false));
+    when(tableManager.getColumnFieldSpec("teamBytes"))
+        .thenReturn(new DimensionFieldSpec("teamNameBytes", 
FieldSpec.DataType.BYTES, true));
+    
when(tableManager.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation
 -> {
+      PrimaryKey key = invocation.getArgument(0);
+      GenericRow row = new GenericRow();
+      row.putValue("teamName", "teamName_for_" + key.toString());
+      row.putValue("teamName_MV",
+          new String[]{"teamName_for_" + key.toString() + "_1", 
"teamName_for_" + key.toString() + "_2",});
+      row.putValue("teamInteger", key.hashCode());
+      row.putValue("teamInteger_MV", new int[]{key.hashCode(), 
key.hashCode()});
+      row.putValue("teamFloat", (float) key.hashCode());
+      row.putValue("teamFloat_MV", new float[]{(float) key.hashCode(), (float) 
key.hashCode()});
+      row.putValue("teamDouble", (double) key.hashCode());
+      row.putValue("teamDouble_MV", new double[]{(double) key.hashCode(), 
(double) key.hashCode()});
+      row.putValue("teamLong", (long) key.hashCode());
+      row.putValue("teamLong_MV", new long[]{(long) key.hashCode(), (long) 
key.hashCode()});
+      row.putValue("teamBytes", ("teamBytes_for_" + 
key.toString()).getBytes());
+      return row;
+    });
+  }
+
+  @Test
+  public void instantiationTests()
+      throws Exception {
+    // Success case
+    ExpressionContext expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)", 
STRING_SV_COLUMN));
+    TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof LookupTransformFunction);
+    Assert.assertEquals(transformFunction.getName(), 
LookupTransformFunction.FUNCTION_NAME);
+
+    // Wrong number of arguments
+    Assert.assertThrows(BadQueryRequestException.class, () -> {
+      TransformFunctionFactory
+          
.get(QueryContextConverterUtils.getExpression(String.format("lookup('baseballTeams','teamName','teamID')")),
+              _dataSourceMap);
+    });
+
+    // Wrong number of join keys
+    Assert.assertThrows(BadQueryRequestException.class, () -> {
+      TransformFunctionFactory.get(QueryContextConverterUtils.getExpression(
+          String.format("lookup('baseballTeams','teamName','teamID', %s, 
'danglingKey')", STRING_SV_COLUMN)),
+          _dataSourceMap);
+    });
+
+    // Non literal tableName argument
+    Assert.assertThrows(BadQueryRequestException.class, () -> {
+      TransformFunctionFactory.get(QueryContextConverterUtils
+              .getExpression(String.format("lookup(%s,'teamName','teamID', 
%s)", STRING_SV_COLUMN, INT_SV_COLUMN)),
+          _dataSourceMap);
+    });
+
+    // Non literal lookup columnName argument
+    Assert.assertThrows(BadQueryRequestException.class, () -> {
+      TransformFunctionFactory.get(QueryContextConverterUtils
+              
.getExpression(String.format("lookup('baseballTeams',%s,'teamID',%s)", 
STRING_SV_COLUMN, INT_SV_COLUMN)),
+          _dataSourceMap);
+    });
+
+    // Non literal lookup columnName argument
+    Assert.assertThrows(BadQueryRequestException.class, () -> {
+      TransformFunctionFactory.get(QueryContextConverterUtils
+              
.getExpression(String.format("lookup('baseballTeams','teamName',%s,%s)", 
STRING_SV_COLUMN, INT_SV_COLUMN)),
+          _dataSourceMap);
+    });
+  }
+
+  @Test
+  public void resultDataTypeTest()
+      throws Exception {
+    HashMap<String, FieldSpec.DataType> testCases = new HashMap<String, 
FieldSpec.DataType>() {{
+      put("teamName", FieldSpec.DataType.STRING);
+      put("teamInteger", FieldSpec.DataType.INT);
+      put("teamFloat", FieldSpec.DataType.FLOAT);
+      put("teamLong", FieldSpec.DataType.LONG);
+      put("teamDouble", FieldSpec.DataType.DOUBLE);
+    }};
+
+    for (Map.Entry<String, FieldSpec.DataType> testCase : 
testCases.entrySet()) {
+      ExpressionContext expression = QueryContextConverterUtils.getExpression(
+          String.format("lookup('baseballTeams','%s','teamID',%s)", 
testCase.getKey(), STRING_SV_COLUMN));
+      TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
+      Assert.assertEquals(transformFunction.getResultMetadata().getDataType(), 
testCase.getValue(),
+          String.format("Expecting %s data type for lookup column: '%s'", 
testCase.getKey(), testCase.getValue()));
+    }
+  }
+
+  @Test
+  public void basicLookupTests()
+      throws Exception {
+    // Lookup col: StringSV
+    // PK: [String]
+    ExpressionContext expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)", 
STRING_SV_COLUMN));
+    TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
+    String[] expectedStringValues = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedStringValues[i] = String.format("teamName_for_[%s]", 
_stringSVValues[i]);
+    }
+    testTransformFunction(transformFunction, expectedStringValues);
+
+    // Lookup col: IntSV
+    // PK: [String]
+    expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamInteger','teamID',%s)",
 STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    int[] expectedIntValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedIntValues[i] = (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode();
+    }
+    testTransformFunction(transformFunction, expectedIntValues);
+
+    // Lookup col: DoubleSV
+    // PK: [String]
+    expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamDouble','teamID',%s)",
 STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    double[] expectedDoubleValues = new double[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedDoubleValues[i] = (double) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode();
+    }
+    testTransformFunction(transformFunction, expectedDoubleValues);
+
+    // Lookup col: BytesSV
+    // PK: [String]
+    expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamBytes','teamID',%s)", 
STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    byte[][] expectedBytesValues = new byte[NUM_ROWS][];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedBytesValues[i] = String.format("teamBytes_for_[%s]", 
_stringSVValues[i]).getBytes();
+    }
+    testTransformFunction(transformFunction, expectedBytesValues);
+  }
+
+  @Test
+  public void multiValueLookupTests()
+      throws Exception {
+    // Lookup col: StringMV
+    // PK: [String]
+    ExpressionContext expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamName_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
+    String[][] expectedStringMVValues = new String[NUM_ROWS][];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedStringMVValues[i] =
+          new String[]{String.format("teamName_for_[%s]_1", 
_stringSVValues[i]), String.format("teamName_for_[%s]_2",
+              _stringSVValues[i]),};
+    }
+    testTransformFunctionMV(transformFunction, expectedStringMVValues);
+
+    // Lookup col: IntegerMV
+    // PK: [String]
+    expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamInteger_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    int[][] expectedIntegerMVValues = new int[NUM_ROWS][0];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedIntegerMVValues[i] =
+          new int[]{(new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(), (new PrimaryKey(
+              new Object[]{_stringSVValues[i]})).hashCode(),};
+    }
+    testTransformFunctionMV(transformFunction, expectedIntegerMVValues);
+
+    // Lookup col: FloatMV
+    // PK: [String]
+    expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamFloat_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    float[][] expectedFloatMVValues = new float[NUM_ROWS][0];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedFloatMVValues[i] =
+          new float[]{(float) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(), (float) (new PrimaryKey(
+              new Object[]{_stringSVValues[i]})).hashCode(),};
+    }
+    testTransformFunctionMV(transformFunction, expectedFloatMVValues);
+
+    // Lookup col: LongMV
+    // PK: [String]
+    expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamLong_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    long[][] expectedLongMVValues = new long[NUM_ROWS][0];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedLongMVValues[i] =
+          new long[]{(long) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(), (long) (new PrimaryKey(
+              new Object[]{_stringSVValues[i]})).hashCode(),};
+    }
+    testTransformFunctionMV(transformFunction, expectedLongMVValues);
+
+    // Lookup col: DoubleMV
+    // PK: [String]
+    expression = QueryContextConverterUtils
+        
.getExpression(String.format("lookup('baseballTeams','teamDouble_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    double[][] expectedDoubleMVValues = new double[NUM_ROWS][0];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedDoubleMVValues[i] =
+          new double[]{(double) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(), (double) (new PrimaryKey(
+              new Object[]{_stringSVValues[i]})).hashCode(),};
+    }
+    testTransformFunctionMV(transformFunction, expectedDoubleMVValues);
+  }
+
+  @Test
+  public void primaryKeyTypeTest()
+      throws Exception {
+    // preparing simple tables for testing different primary key types (INT, 
STRING, LONG)
+    Map<String, FieldSpec.DataType> testTables = new HashMap<String, 
FieldSpec.DataType>() {{
+      put("dimTableWithIntPK_OFFLINE", FieldSpec.DataType.INT);
+      put("dimTableWithStringPK_OFFLINE", FieldSpec.DataType.STRING);
+      put("dimTableWithLongPK_OFFLINE", FieldSpec.DataType.LONG);
+      put("dimTableWithFloatPK_OFFLINE", FieldSpec.DataType.FLOAT);
+      put("dimTableWithDoublePK_OFFLINE", FieldSpec.DataType.DOUBLE);
+      put("dimTableWithBytesPK_OFFLINE", FieldSpec.DataType.BYTES);
+    }};
+    for (Map.Entry<String, FieldSpec.DataType> table : testTables.entrySet()) {
+      DimensionTableDataManager mgr = mock(DimensionTableDataManager.class);
+      DimensionTableDataManager.registerDimensionTable(table.getKey(), mgr);
+      
when(mgr.getPrimaryKeyColumns()).thenReturn(Arrays.asList("primaryColumn"));
+      when(mgr.getColumnFieldSpec("primaryColumn"))
+          .thenReturn(new DimensionFieldSpec("primaryColumn", 
table.getValue(), true));
+      when(mgr.getColumnFieldSpec("lookupColumn"))
+          .thenReturn(new DimensionFieldSpec("lookupColumn", 
FieldSpec.DataType.STRING, true));
+      
when(mgr.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation -> 
{
+        PrimaryKey key = invocation.getArgument(0);
+        GenericRow row = new GenericRow();
+        row.putValue("lookupColumn", String.format("lookup_value_for_[%s]", 
key.hashCode()));
+        return row;
+      });
+    }
+
+    // PK: [Int]
+    ExpressionContext expression = QueryContextConverterUtils.getExpression(
+        String.format("lookup('dimTableWithIntPK', 'lookupColumn', 
'primaryColumn', %s)", INT_SV_COLUMN));
+    TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
+    String[] expectedResults = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      PrimaryKey key = new PrimaryKey(new Object[]{(Integer)_intSVValues[i]});
+      expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
+    }
+    testTransformFunction(transformFunction, expectedResults);
+
+    // PK: [String]
+    expression = QueryContextConverterUtils.getExpression(
+        String.format("lookup('dimTableWithStringPK', 'lookupColumn', 
'primaryColumn', %s)", STRING_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    expectedResults = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      PrimaryKey key = new PrimaryKey(new Object[]{_stringSVValues[i]});
+      expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
+    }
+    testTransformFunction(transformFunction, expectedResults);
+
+    // PK: [Long]
+    expression = QueryContextConverterUtils.getExpression(
+        String.format("lookup('dimTableWithLongPK', 'lookupColumn', 
'primaryColumn', %s)", LONG_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    expectedResults = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      PrimaryKey key = new PrimaryKey(new Object[]{(Long)_longSVValues[i]});
+      expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
+    }
+    testTransformFunction(transformFunction, expectedResults);
+
+    // PK: [Float]
+    expression = QueryContextConverterUtils.getExpression(
+        String.format("lookup('dimTableWithFloatPK', 'lookupColumn', 
'primaryColumn', %s)", FLOAT_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    expectedResults = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      PrimaryKey key = new PrimaryKey(new Object[]{(Float)_floatSVValues[i]});
+      expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
+    }
+    testTransformFunction(transformFunction, expectedResults);
+
+    // PK: [Double]
+    expression = QueryContextConverterUtils.getExpression(
+        String.format("lookup('dimTableWithDoublePK', 'lookupColumn', 
'primaryColumn', %s)", DOUBLE_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    expectedResults = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      PrimaryKey key = new PrimaryKey(new 
Object[]{(Double)_doubleSVValues[i]});
+      expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
+    }
+    testTransformFunction(transformFunction, expectedResults);
+
+    // PK: [Byte[]]
+    expression = QueryContextConverterUtils.getExpression(
+        String.format("lookup('dimTableWithBytesPK', 'lookupColumn', 
'primaryColumn', %s)", BYTES_SV_COLUMN));
+    transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
+    expectedResults = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      PrimaryKey key = new PrimaryKey(new Object[]{new 
ByteArray(_bytesSVValues[i])});
+      expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
+    }
+    testTransformFunction(transformFunction, expectedResults);
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java
index a4e6340..01fc669 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java
@@ -118,6 +118,13 @@ public class JoinQuickStart
         printStatus(Quickstart.Color.YELLOW, 
prettyPrintResponse(runner.runQuery(q2)));
         printStatus(Quickstart.Color.GREEN, 
"***************************************************");
 
+        String q3 = "select playerName, teamID, lookup('dimBaseballTeams', 
'teamName', 'teamID', teamID) from baseballStats limit 10";
+        printStatus(Quickstart.Color.YELLOW, "Baseball Stats with joined team 
names");
+        printStatus(Quickstart.Color.CYAN, "Query : " + q3);
+        printStatus(Quickstart.Color.YELLOW, 
prettyPrintResponse(runner.runQuery(q3)));
+        printStatus(Quickstart.Color.GREEN, 
"***************************************************");
+
+
         printStatus(Quickstart.Color.GREEN, "You can always go to 
http://localhost:9000 to play around in the query console");
 
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to