This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d04e361ef1 Improve dimension table handing (#13967)
d04e361ef1 is described below

commit d04e361ef1c7b4d424fbc451085c20a109e62f87
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Sep 9 17:18:39 2024 -0700

    Improve dimension table handing (#13967)
---
 .../core/data/manager/offline/DimensionTable.java  |  15 +-
 .../manager/offline/DimensionTableDataManager.java |  20 +-
 .../manager/offline/FastLookupDimensionTable.java  |  38 ++-
 .../data/manager/offline/LookupRecordLocation.java |   4 +
 .../offline/MemoryOptimizedDimensionTable.java     |  39 ++-
 .../function/LookupTransformFunction.java          |   4 +-
 .../offline/DimensionTableDataManagerTest.java     |  92 ++++--
 .../function/BaseTransformFunctionTest.java        |   1 -
 .../function/LookupTransformFunctionTest.java      | 341 ++++++++++-----------
 .../segment/readers/PinotSegmentRecordReader.java  |   3 +
 10 files changed, 335 insertions(+), 222 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
index b98d1d51e0..92637a45fd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.offline;
 
 import java.io.Closeable;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -29,9 +30,19 @@ public interface DimensionTable extends Closeable {
 
   List<String> getPrimaryKeyColumns();
 
-  GenericRow get(PrimaryKey pk);
+  @Nullable
+  FieldSpec getFieldSpecFor(String columnName);
 
   boolean isEmpty();
 
-  FieldSpec getFieldSpecFor(String columnName);
+  boolean containsKey(PrimaryKey pk);
+
+  @Nullable
+  GenericRow getRow(PrimaryKey pk);
+
+  @Nullable
+  Object getValue(PrimaryKey pk, String columnName);
+
+  @Nullable
+  Object[] getValues(PrimaryKey pk, String[] columnNames);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index a5ec45750e..20759934ca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -283,10 +283,26 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     return !_dimensionTable.get().isEmpty();
   }
 
-  public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {
-    return _dimensionTable.get().get(pk);
+  public boolean containsKey(PrimaryKey pk) {
+    return _dimensionTable.get().containsKey(pk);
   }
 
+  @Nullable
+  public GenericRow lookupRow(PrimaryKey pk) {
+    return _dimensionTable.get().getRow(pk);
+  }
+
+  @Nullable
+  public Object lookupValue(PrimaryKey pk, String columnName) {
+    return _dimensionTable.get().getValue(pk, columnName);
+  }
+
+  @Nullable
+  public Object[] lookupValues(PrimaryKey pk, String[] columnNames) {
+    return _dimensionTable.get().getValues(pk, columnNames);
+  }
+
+  @Nullable
   public FieldSpec getColumnFieldSpec(String columnName) {
     return _dimensionTable.get().getFieldSpecFor(columnName);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
index 81798e4cce..ccbce7439c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.offline;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -43,9 +44,10 @@ public class FastLookupDimensionTable implements 
DimensionTable {
     return _primaryKeyColumns;
   }
 
+  @Nullable
   @Override
-  public GenericRow get(PrimaryKey pk) {
-    return _lookupTable.get(pk);
+  public FieldSpec getFieldSpecFor(String columnName) {
+    return _tableSchema.getFieldSpecFor(columnName);
   }
 
   @Override
@@ -54,8 +56,36 @@ public class FastLookupDimensionTable implements 
DimensionTable {
   }
 
   @Override
-  public FieldSpec getFieldSpecFor(String columnName) {
-    return _tableSchema.getFieldSpecFor(columnName);
+  public boolean containsKey(PrimaryKey pk) {
+    return _lookupTable.containsKey(pk);
+  }
+
+  @Nullable
+  @Override
+  public GenericRow getRow(PrimaryKey pk) {
+    return _lookupTable.get(pk);
+  }
+
+  @Nullable
+  @Override
+  public Object getValue(PrimaryKey pk, String columnName) {
+    GenericRow row = _lookupTable.get(pk);
+    return row != null ? row.getValue(columnName) : null;
+  }
+
+  @Nullable
+  @Override
+  public Object[] getValues(PrimaryKey pk, String[] columnNames) {
+    GenericRow row = _lookupTable.get(pk);
+    if (row == null) {
+      return null;
+    }
+    int numColumns = columnNames.length;
+    Object[] values = new Object[numColumns];
+    for (int i = 0; i < numColumns; i++) {
+      values[i] = row.getValue(columnNames[i]);
+    }
+    return values;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/LookupRecordLocation.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/LookupRecordLocation.java
index 483760b6a3..32f25ed969 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/LookupRecordLocation.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/LookupRecordLocation.java
@@ -43,4 +43,8 @@ public class LookupRecordLocation {
     _pinotSegmentRecordReader.getRecord(_docId, reuse);
     return reuse;
   }
+
+  public Object getValue(String column) {
+    return _pinotSegmentRecordReader.getValue(_docId, column);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
index 96fe847e54..1303bf3bd8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.offline;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
@@ -59,7 +60,23 @@ public class MemoryOptimizedDimensionTable implements 
DimensionTable {
   }
 
   @Override
-  public GenericRow get(PrimaryKey pk) {
+  public FieldSpec getFieldSpecFor(String columnName) {
+    return _tableSchema.getFieldSpecFor(columnName);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return _lookupTable.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(PrimaryKey pk) {
+    return _lookupTable.containsKey(pk);
+  }
+
+  @Nullable
+  @Override
+  public GenericRow getRow(PrimaryKey pk) {
     LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
     if (lookupRecordLocation == null) {
       return null;
@@ -69,14 +86,26 @@ public class MemoryOptimizedDimensionTable implements 
DimensionTable {
     return lookupRecordLocation.getRecord(reuse);
   }
 
+  @Nullable
   @Override
-  public boolean isEmpty() {
-    return _lookupTable.isEmpty();
+  public Object getValue(PrimaryKey pk, String columnName) {
+    LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
+    return lookupRecordLocation != null ? 
lookupRecordLocation.getValue(columnName) : null;
   }
 
+  @Nullable
   @Override
-  public FieldSpec getFieldSpecFor(String columnName) {
-    return _tableSchema.getFieldSpecFor(columnName);
+  public Object[] getValues(PrimaryKey pk, String[] columnNames) {
+    LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
+    if (lookupRecordLocation == null) {
+      return null;
+    }
+    int numColumns = columnNames.length;
+    Object[] values = new Object[numColumns];
+    for (int i = 0; i < numColumns; i++) {
+      values[i] = lookupRecordLocation.getValue(columnNames[i]);
+    }
+    return values;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
index a3dec3e008..826380638c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
@@ -29,7 +29,6 @@ import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -219,8 +218,7 @@ public class LookupTransformFunction extends 
BaseTransformFunction {
         }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
-      Object value = row == null ? null : row.getValue(_dimColumnName);
+      Object value = _dataManager.lookupValue(primaryKey, _dimColumnName);
       valueAcceptor.accept(i, value);
     }
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index fd6769a0ad..7568f8be73 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -65,10 +65,7 @@ import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
 
 
 @SuppressWarnings("unchecked")
@@ -196,17 +193,29 @@ public class DimensionTableDataManagerTest {
     DimensionTableDataManager tableDataManager = 
makeTableDataManager(helixManager);
 
     // try fetching data BEFORE loading segment
-    GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new 
PrimaryKey(new String[]{"SF"}));
-    assertNull(resp, "Response should be null if no segment is loaded");
+    PrimaryKey key = new PrimaryKey(new String[]{"SF"});
+    assertFalse(tableDataManager.containsKey(key));
+    assertNull(tableDataManager.lookupRow(key));
+    assertNull(tableDataManager.lookupValue(key, "teamID"));
+    assertNull(tableDataManager.lookupValue(key, "teamName"));
+    assertNull(tableDataManager.lookupValues(key, new String[]{"teamID", 
"teamName"}));
 
     tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, 
_indexLoadingConfig));
 
     // Confirm table is loaded and available for lookup
-    resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new 
String[]{"SF"}));
-    assertNotNull(resp, "Should return response after segment load");
-    assertEquals(resp.getFieldToValueMap().size(), 2);
-    assertEquals(resp.getValue("teamID"), "SF");
-    assertEquals(resp.getValue("teamName"), "San Francisco Giants");
+    assertTrue(tableDataManager.containsKey(key));
+    GenericRow row = tableDataManager.lookupRow(key);
+    assertNotNull(row);
+    assertEquals(row.getFieldToValueMap().size(), 2);
+    assertEquals(row.getValue("teamID"), "SF");
+    assertEquals(row.getValue("teamName"), "San Francisco Giants");
+    assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
+    assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco 
Giants");
+    Object[] values = tableDataManager.lookupValues(key, new 
String[]{"teamID", "teamName"});
+    assertNotNull(values);
+    assertEquals(values.length, 2);
+    assertEquals(values[0], "SF");
+    assertEquals(values[1], "San Francisco Giants");
 
     // Confirm we can get FieldSpec for loaded tables columns.
     FieldSpec spec = tableDataManager.getColumnFieldSpec("teamName");
@@ -224,8 +233,11 @@ public class DimensionTableDataManagerTest {
     String segmentName = segMgr.getSegmentName();
     tableDataManager.offloadSegment(segmentName);
     // confirm table is cleaned up
-    resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new 
String[]{"SF"}));
-    assertNull(resp, "Response should be null if no segment is loaded");
+    assertFalse(tableDataManager.containsKey(key));
+    assertNull(tableDataManager.lookupRow(key));
+    assertNull(tableDataManager.lookupValue(key, "teamID"));
+    assertNull(tableDataManager.lookupValue(key, "teamName"));
+    assertNull(tableDataManager.lookupValues(key, new String[]{"teamID", 
"teamName"}));
   }
 
   @Test
@@ -240,11 +252,15 @@ public class DimensionTableDataManagerTest {
     tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, 
_indexLoadingConfig));
 
     // Confirm table is loaded and available for lookup
-    GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new 
PrimaryKey(new String[]{"SF"}));
-    assertNotNull(resp, "Should return response after segment load");
-    assertEquals(resp.getFieldToValueMap().size(), 2);
-    assertEquals(resp.getValue("teamID"), "SF");
-    assertEquals(resp.getValue("teamName"), "San Francisco Giants");
+    PrimaryKey key = new PrimaryKey(new String[]{"SF"});
+    assertTrue(tableDataManager.containsKey(key));
+    GenericRow row = tableDataManager.lookupRow(key);
+    assertNotNull(row);
+    assertEquals(row.getFieldToValueMap().size(), 2);
+    assertEquals(row.getValue("teamID"), "SF");
+    assertEquals(row.getValue("teamName"), "San Francisco Giants");
+    assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
+    assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco 
Giants");
 
     // Confirm the new column does not exist
     FieldSpec teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity");
@@ -261,11 +277,16 @@ public class DimensionTableDataManagerTest {
     teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity");
     assertNotNull(teamCitySpec, "Should return spec for existing column");
     assertEquals(teamCitySpec.getDataType(), DataType.STRING, "Should return 
correct data type for teamCity column");
-    resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new 
String[]{"SF"}));
-    assertEquals(resp.getFieldToValueMap().size(), 3);
-    assertEquals(resp.getValue("teamID"), "SF");
-    assertEquals(resp.getValue("teamName"), "San Francisco Giants");
-    assertEquals(resp.getValue("teamCity"), "null");
+    assertTrue(tableDataManager.containsKey(key));
+    row = tableDataManager.lookupRow(key);
+    assertNotNull(row, "Should return response after segment reload");
+    assertEquals(row.getFieldToValueMap().size(), 3);
+    assertEquals(row.getValue("teamID"), "SF");
+    assertEquals(row.getValue("teamName"), "San Francisco Giants");
+    assertEquals(row.getValue("teamCity"), "null");
+    assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
+    assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco 
Giants");
+    assertEquals(tableDataManager.lookupValue(key, "teamCity"), "null");
   }
 
   @Test
@@ -281,17 +302,21 @@ public class DimensionTableDataManagerTest {
     DimensionTableDataManager tableDataManager = 
makeTableDataManager(helixManager);
 
     // try fetching data BEFORE loading segment
-    GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new 
PrimaryKey(new String[]{"SF"}));
-    assertNull(resp, "Response should be null if no segment is loaded");
+    PrimaryKey key = new PrimaryKey(new String[]{"SF"});
+    assertFalse(tableDataManager.containsKey(key));
+    assertNull(tableDataManager.lookupRow(key));
 
     tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, 
_indexLoadingConfig));
 
     // Confirm table is loaded and available for lookup
-    resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new 
String[]{"SF"}));
-    assertNotNull(resp, "Should return response after segment load");
-    assertEquals(resp.getFieldToValueMap().size(), 2);
-    assertEquals(resp.getValue("teamID"), "SF");
-    assertEquals(resp.getValue("teamName"), "San Francisco Giants");
+    assertTrue(tableDataManager.containsKey(key));
+    GenericRow row = tableDataManager.lookupRow(key);
+    assertNotNull(row, "Should return response after segment load");
+    assertEquals(row.getFieldToValueMap().size(), 2);
+    assertEquals(row.getValue("teamID"), "SF");
+    assertEquals(row.getValue("teamName"), "San Francisco Giants");
+    assertEquals(tableDataManager.lookupValue(key, "teamID"), "SF");
+    assertEquals(tableDataManager.lookupValue(key, "teamName"), "San Francisco 
Giants");
 
     // Confirm we can get FieldSpec for loaded tables columns.
     FieldSpec spec = tableDataManager.getColumnFieldSpec("teamName");
@@ -309,8 +334,8 @@ public class DimensionTableDataManagerTest {
     String segmentName = segMgr.getSegmentName();
     tableDataManager.offloadSegment(segmentName);
     // confirm table is cleaned up
-    resp = tableDataManager.lookupRowByPrimaryKey(new PrimaryKey(new 
String[]{"SF"}));
-    assertNull(resp, "Response should be null if no segment is loaded");
+    assertFalse(tableDataManager.containsKey(key));
+    assertNull(tableDataManager.lookupRow(key));
   }
 
   @Test
@@ -326,8 +351,7 @@ public class DimensionTableDataManagerTest {
     DimensionTableDataManager tableDataManager = 
makeTableDataManager(helixManager);
 
     // try fetching data BEFORE loading segment
-    GenericRow resp = tableDataManager.lookupRowByPrimaryKey(new 
PrimaryKey(new String[]{"SF"}));
-    assertNull(resp, "Response should be null if no segment is loaded");
+    assertFalse(tableDataManager.containsKey(new PrimaryKey(new 
String[]{"SF"})));
 
     try {
       tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, 
_indexLoadingConfig));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index 165af0aec9..17816ae6b3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -202,7 +202,6 @@ public abstract class BaseTransformFunctionTest {
         _doubleMV2Values[i][j] = 1.0;
       }
 
-      float range = 1.0f - 0.0f;
       Random random = new Random();
       for (int j = 0; j < VECTOR_DIM_SIZE; j++) {
         _vector1Values[i][j] = random.nextFloat();;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
index a873744d96..9edfcece30 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
@@ -18,212 +18,217 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
-import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.testng.Assert;
-import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
 
 public class LookupTransformFunctionTest extends BaseTransformFunctionTest {
   private static final String TABLE_NAME = "baseballTeams_OFFLINE";
-  private DimensionTableDataManager _tableManager;
 
-  @BeforeSuite
+  @BeforeClass
+  @Override
   public void setUp()
       throws Exception {
     super.setUp();
-
-    createTestableTableManager();
+    DimensionTableDataManager.registerDimensionTable(TABLE_NAME, 
mockDataManager());
   }
 
-  private void createTestableTableManager() {
-    _tableManager = mock(DimensionTableDataManager.class);
-    DimensionTableDataManager.registerDimensionTable(TABLE_NAME, 
_tableManager);
-
-    // Creating a mock table which looks like:
-    // TeamID (PK, str) | TeamName(str) | TeamName_MV(str[]) | 
TeamInteger(int) | TeamInteger_MV(int[]) | TeamFloat
-    // (float) | ...
-    //
-    // All values are dynamically created to be variations of the primary key.
-    // e.g
-    // lookupRowByPrimaryKey(['FOO']) -> (TeamID: 'foo', TeamName: 
'teamName_for_foo', TeamInteger: hashCode(['foo'])
-    // , ...
-    //
-    
when(_tableManager.getPrimaryKeyColumns()).thenReturn(Arrays.asList("teamID"));
-    when(_tableManager.isPopulated()).thenReturn(true);
-    when(_tableManager.getColumnFieldSpec("teamID"))
-        .thenReturn(new DimensionFieldSpec("teamID", 
FieldSpec.DataType.STRING, true));
-    when(_tableManager.getColumnFieldSpec("teamName"))
-        .thenReturn(new DimensionFieldSpec("teamName", 
FieldSpec.DataType.STRING, true));
-    when(_tableManager.getColumnFieldSpec("teamName_MV"))
-        .thenReturn(new DimensionFieldSpec("teamName_MV", 
FieldSpec.DataType.STRING, false));
-    when(_tableManager.getColumnFieldSpec("teamInteger"))
-        .thenReturn(new DimensionFieldSpec("teamInteger", 
FieldSpec.DataType.INT, true));
-    when(_tableManager.getColumnFieldSpec("teamInteger_MV"))
-        .thenReturn(new DimensionFieldSpec("teamInteger_MV", 
FieldSpec.DataType.INT, false));
-    when(_tableManager.getColumnFieldSpec("teamFloat"))
-        .thenReturn(new DimensionFieldSpec("teamFloat", 
FieldSpec.DataType.FLOAT, true));
-    when(_tableManager.getColumnFieldSpec("teamFloat_MV"))
-        .thenReturn(new DimensionFieldSpec("teamFloat_MV", 
FieldSpec.DataType.FLOAT, false));
-    when(_tableManager.getColumnFieldSpec("teamDouble"))
-        .thenReturn(new DimensionFieldSpec("teamDouble", 
FieldSpec.DataType.DOUBLE, true));
-    when(_tableManager.getColumnFieldSpec("teamDouble_MV"))
-        .thenReturn(new DimensionFieldSpec("teamDouble_MV", 
FieldSpec.DataType.DOUBLE, false));
-    when(_tableManager.getColumnFieldSpec("teamLong"))
-        .thenReturn(new DimensionFieldSpec("teamLong", 
FieldSpec.DataType.LONG, true));
-    when(_tableManager.getColumnFieldSpec("teamLong_MV"))
-        .thenReturn(new DimensionFieldSpec("teamLong_MV", 
FieldSpec.DataType.LONG, false));
-    when(_tableManager.getColumnFieldSpec("teamBytes"))
-        .thenReturn(new DimensionFieldSpec("teamNameBytes", 
FieldSpec.DataType.BYTES, true));
-    
when(_tableManager.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation
 -> {
+  /**
+   * Mocks a {@link DimensionTableDataManager} which looks like:
+   * TeamID (PK, str) | TeamName(str) | TeamName_MV(str[]) | TeamInteger(int) 
| TeamInteger_MV(int[]) | TeamFloat
+   * (float) | ...
+   *
+   * All values are dynamically created to be variations of the primary key.
+   * e.g.
+   * lookupRowByPrimaryKey(['FOO']) -> (TeamID: 'foo', TeamName: 
'teamName_for_foo', TeamInteger: hashCode(['foo']), ...
+   */
+  private DimensionTableDataManager mockDataManager() {
+    DimensionTableDataManager tableManager = 
mock(DimensionTableDataManager.class);
+
+    when(tableManager.getPrimaryKeyColumns()).thenReturn(List.of("teamID"));
+    when(tableManager.isPopulated()).thenReturn(true);
+    when(tableManager.getColumnFieldSpec("teamID")).thenReturn(new 
DimensionFieldSpec("teamID", DataType.STRING, true));
+    when(tableManager.getColumnFieldSpec("teamName")).thenReturn(
+        new DimensionFieldSpec("teamName", DataType.STRING, true));
+    when(tableManager.getColumnFieldSpec("teamName_MV")).thenReturn(
+        new DimensionFieldSpec("teamName_MV", DataType.STRING, false));
+    when(tableManager.getColumnFieldSpec("teamInteger")).thenReturn(
+        new DimensionFieldSpec("teamInteger", DataType.INT, true));
+    when(tableManager.getColumnFieldSpec("teamInteger_MV")).thenReturn(
+        new DimensionFieldSpec("teamInteger_MV", DataType.INT, false));
+    when(tableManager.getColumnFieldSpec("teamFloat")).thenReturn(
+        new DimensionFieldSpec("teamFloat", DataType.FLOAT, true));
+    when(tableManager.getColumnFieldSpec("teamFloat_MV")).thenReturn(
+        new DimensionFieldSpec("teamFloat_MV", DataType.FLOAT, false));
+    when(tableManager.getColumnFieldSpec("teamDouble")).thenReturn(
+        new DimensionFieldSpec("teamDouble", DataType.DOUBLE, true));
+    when(tableManager.getColumnFieldSpec("teamDouble_MV")).thenReturn(
+        new DimensionFieldSpec("teamDouble_MV", DataType.DOUBLE, false));
+    when(tableManager.getColumnFieldSpec("teamLong")).thenReturn(
+        new DimensionFieldSpec("teamLong", DataType.LONG, true));
+    when(tableManager.getColumnFieldSpec("teamLong_MV")).thenReturn(
+        new DimensionFieldSpec("teamLong_MV", DataType.LONG, false));
+    when(tableManager.getColumnFieldSpec("teamBytes")).thenReturn(
+        new DimensionFieldSpec("teamNameBytes", DataType.BYTES, true));
+    when(tableManager.lookupValue(any(PrimaryKey.class), 
any(String.class))).thenAnswer(invocation -> {
       PrimaryKey key = invocation.getArgument(0);
-      GenericRow row = new GenericRow();
-      row.putValue("teamName", "teamName_for_" + key.toString());
-      row.putValue("teamName_MV",
-          new String[]{"teamName_for_" + key.toString() + "_1", 
"teamName_for_" + key.toString() + "_2"});
-      row.putValue("teamInteger", key.hashCode());
-      row.putValue("teamInteger_MV", new int[]{key.hashCode(), 
key.hashCode()});
-      row.putValue("teamFloat", (float) key.hashCode());
-      row.putValue("teamFloat_MV", new float[]{(float) key.hashCode(), (float) 
key.hashCode()});
-      row.putValue("teamDouble", (double) key.hashCode());
-      row.putValue("teamDouble_MV", new double[]{(double) key.hashCode(), 
(double) key.hashCode()});
-      row.putValue("teamLong", (long) key.hashCode());
-      row.putValue("teamLong_MV", new long[]{(long) key.hashCode(), (long) 
key.hashCode()});
-      row.putValue("teamBytes", ("teamBytes_for_" + 
key.toString()).getBytes());
-      return row;
+      String column = invocation.getArgument(1);
+      switch (column) {
+        case "teamName":
+          return String.format("teamName_for_%s", key);
+        case "teamName_MV":
+          return new String[]{String.format("teamName_for_%s_1", key), 
String.format("teamName_for_%s_2", key)};
+        case "teamInteger":
+          return key.hashCode();
+        case "teamInteger_MV":
+          return new int[]{key.hashCode(), key.hashCode()};
+        case "teamFloat":
+          return (float) key.hashCode();
+        case "teamFloat_MV":
+          return new float[]{(float) key.hashCode(), (float) key.hashCode()};
+        case "teamDouble":
+          return (double) key.hashCode();
+        case "teamDouble_MV":
+          return new double[]{(double) key.hashCode(), (double) 
key.hashCode()};
+        case "teamLong":
+          return (long) key.hashCode();
+        case "teamLong_MV":
+          return new long[]{(long) key.hashCode(), (long) key.hashCode()};
+        case "teamBytes":
+          return String.format("teamBytes_for_%s", key).getBytes();
+        default:
+          throw new IllegalArgumentException("Unknown column: " + column);
+      }
     });
+
+    return tableManager;
   }
 
   @Test
-  public void instantiationTests()
-      throws Exception {
+  public void instantiationTests() {
     // Success case
-    ExpressionContext expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)", 
STRING_SV_COLUMN));
+    ExpressionContext expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamName','teamID',%s)", 
STRING_SV_COLUMN));
     TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
-    Assert.assertTrue(transformFunction instanceof LookupTransformFunction);
-    Assert.assertEquals(transformFunction.getName(), 
LookupTransformFunction.FUNCTION_NAME);
+    assertTrue(transformFunction instanceof LookupTransformFunction);
+    assertEquals(transformFunction.getName(), 
LookupTransformFunction.FUNCTION_NAME);
 
     // Wrong number of arguments
-    Assert.assertThrows(BadQueryRequestException.class, () -> {
-      TransformFunctionFactory
-          
.get(RequestContextUtils.getExpression(String.format("lookup('baseballTeams','teamName','teamID')")),
-              _dataSourceMap);
+    assertThrows(BadQueryRequestException.class, () -> {
+      
TransformFunctionFactory.get(RequestContextUtils.getExpression("lookup('baseballTeams','teamName','teamID')"),
+          _dataSourceMap);
     });
 
     // Wrong number of join keys
-    Assert.assertThrows(BadQueryRequestException.class, () -> {
+    assertThrows(BadQueryRequestException.class, () -> {
       TransformFunctionFactory.get(RequestContextUtils.getExpression(
-          String.format("lookup('baseballTeams','teamName','teamID', %s, 
'danglingKey')", STRING_SV_COLUMN)),
+              String.format("lookup('baseballTeams','teamName','teamID', %s, 
'danglingKey')", STRING_SV_COLUMN)),
           _dataSourceMap);
     });
 
     // Non literal tableName argument
-    Assert.assertThrows(BadQueryRequestException.class, () -> {
-      TransformFunctionFactory.get(RequestContextUtils
-              .getExpression(String.format("lookup(%s,'teamName','teamID', 
%s)", STRING_SV_COLUMN,
-                  INT_SV_COLUMN)),
-          _dataSourceMap);
+    assertThrows(BadQueryRequestException.class, () -> {
+      TransformFunctionFactory.get(RequestContextUtils.getExpression(
+          String.format("lookup(%s,'teamName','teamID', %s)", 
STRING_SV_COLUMN, INT_SV_COLUMN)), _dataSourceMap);
     });
 
     // Non literal lookup columnName argument
-    Assert.assertThrows(BadQueryRequestException.class, () -> {
+    assertThrows(BadQueryRequestException.class, () -> {
       TransformFunctionFactory.get(RequestContextUtils.getExpression(
           String.format("lookup('baseballTeams',%s,'teamID',%s)", 
STRING_SV_COLUMN, INT_SV_COLUMN)), _dataSourceMap);
     });
 
     // Non literal lookup columnName argument
-    Assert.assertThrows(BadQueryRequestException.class, () -> {
+    assertThrows(BadQueryRequestException.class, () -> {
       TransformFunctionFactory.get(RequestContextUtils.getExpression(
           String.format("lookup('baseballTeams','teamName',%s,%s)", 
STRING_SV_COLUMN, INT_SV_COLUMN)), _dataSourceMap);
     });
   }
 
   @Test
-  public void resultDataTypeTest()
-      throws Exception {
-    HashMap<String, FieldSpec.DataType> testCases = new HashMap<String, 
FieldSpec.DataType>() {{
-      put("teamName", FieldSpec.DataType.STRING);
-      put("teamInteger", FieldSpec.DataType.INT);
-      put("teamFloat", FieldSpec.DataType.FLOAT);
-      put("teamLong", FieldSpec.DataType.LONG);
-      put("teamDouble", FieldSpec.DataType.DOUBLE);
+  public void resultDataTypeTest() {
+    HashMap<String, DataType> testCases = new HashMap<>() {{
+      put("teamName", DataType.STRING);
+      put("teamInteger", DataType.INT);
+      put("teamFloat", DataType.FLOAT);
+      put("teamLong", DataType.LONG);
+      put("teamDouble", DataType.DOUBLE);
     }};
 
-    for (Map.Entry<String, FieldSpec.DataType> testCase : 
testCases.entrySet()) {
+    for (Map.Entry<String, DataType> testCase : testCases.entrySet()) {
       ExpressionContext expression = RequestContextUtils.getExpression(
           String.format("lookup('baseballTeams','%s','teamID',%s)", 
testCase.getKey(), STRING_SV_COLUMN));
       TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
-      Assert.assertEquals(transformFunction.getResultMetadata().getDataType(), 
testCase.getValue(),
+      assertEquals(transformFunction.getResultMetadata().getDataType(), 
testCase.getValue(),
           String.format("Expecting %s data type for lookup column: '%s'", 
testCase.getKey(), testCase.getValue()));
     }
   }
 
   @Test
-  public void dimensionTableNotPopulated() throws Exception {
+  public void dimensionTableNotPopulated() {
     DimensionTableDataManager tableManager = 
mock(DimensionTableDataManager.class);
     when(tableManager.isPopulated()).thenReturn(false);
-    
when(tableManager.getPrimaryKeyColumns()).thenReturn(Arrays.asList("leagueID"));
-    when(tableManager.getColumnFieldSpec("leagueID"))
-        .thenReturn(new DimensionFieldSpec("leagueID", 
FieldSpec.DataType.STRING, true));
-    when(tableManager.getColumnFieldSpec("leagueName"))
-        .thenReturn(new DimensionFieldSpec("leagueName", 
FieldSpec.DataType.STRING, true));
+    when(tableManager.getPrimaryKeyColumns()).thenReturn(List.of("leagueID"));
+    when(tableManager.getColumnFieldSpec("leagueID")).thenReturn(
+        new DimensionFieldSpec("leagueID", DataType.STRING, true));
+    when(tableManager.getColumnFieldSpec("leagueName")).thenReturn(
+        new DimensionFieldSpec("leagueName", DataType.STRING, true));
 
     
DimensionTableDataManager.registerDimensionTable("baseballLeagues_OFFLINE", 
tableManager);
 
     try {
-      ExpressionContext expression = RequestContextUtils
-          
.getExpression(String.format("lookup('baseballLeagues','leagueName','leagueID',%s)",
-              STRING_SV_COLUMN));
+      ExpressionContext expression = RequestContextUtils.getExpression(
+          
String.format("lookup('baseballLeagues','leagueName','leagueID',%s)", 
STRING_SV_COLUMN));
       TransformFunctionFactory.get(expression, _dataSourceMap);
       fail("Should have thrown BadQueryRequestException");
     } catch (Exception ex) {
-      Assert.assertEquals(ex.getCause().getMessage(), "Dimension table is not 
populated: baseballLeagues_OFFLINE");
+      assertEquals(ex.getCause().getMessage(), "Dimension table is not 
populated: baseballLeagues_OFFLINE");
     }
   }
 
   @Test
-  public void dimensionTableIsPopulated() throws Exception {
+  public void dimensionTableIsPopulated() {
     DimensionTableDataManager tableManager = 
mock(DimensionTableDataManager.class);
     when(tableManager.isPopulated()).thenReturn(true);
-    
when(tableManager.getPrimaryKeyColumns()).thenReturn(Arrays.asList("playerID"));
-    when(tableManager.getColumnFieldSpec("playerID"))
-        .thenReturn(new DimensionFieldSpec("playerID", 
FieldSpec.DataType.STRING, true));
-    when(tableManager.getColumnFieldSpec("playerName"))
-        .thenReturn(new DimensionFieldSpec("playerName", 
FieldSpec.DataType.STRING, true));
+    when(tableManager.getPrimaryKeyColumns()).thenReturn(List.of("playerID"));
+    when(tableManager.getColumnFieldSpec("playerID")).thenReturn(
+        new DimensionFieldSpec("playerID", DataType.STRING, true));
+    when(tableManager.getColumnFieldSpec("playerName")).thenReturn(
+        new DimensionFieldSpec("playerName", DataType.STRING, true));
 
     
DimensionTableDataManager.registerDimensionTable("baseballPlayers_OFFLINE", 
tableManager);
 
-      ExpressionContext expression = RequestContextUtils
-          
.getExpression(String.format("lookup('baseballPlayers','playerName','playerID',%s)",
-              STRING_SV_COLUMN));
-      TransformFunctionFactory.get(expression, _dataSourceMap);
+    ExpressionContext expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballPlayers','playerName','playerID',%s)", 
STRING_SV_COLUMN));
+    TransformFunctionFactory.get(expression, _dataSourceMap);
   }
 
-
   @Test
-  public void basicLookupTests()
-      throws Exception {
+  public void basicLookupTests() {
     // Lookup col: StringSV
     // PK: [String]
-    ExpressionContext expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)", 
STRING_SV_COLUMN));
+    ExpressionContext expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamName','teamID',%s)", 
STRING_SV_COLUMN));
     TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
     String[] expectedStringValues = new String[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
@@ -233,8 +238,8 @@ public class LookupTransformFunctionTest extends 
BaseTransformFunctionTest {
 
     // Lookup col: IntSV
     // PK: [String]
-    expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamInteger','teamID',%s)",
 STRING_SV_COLUMN));
+    expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamInteger','teamID',%s)", 
STRING_SV_COLUMN));
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     int[] expectedIntValues = new int[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
@@ -244,19 +249,19 @@ public class LookupTransformFunctionTest extends 
BaseTransformFunctionTest {
 
     // Lookup col: DoubleSV
     // PK: [String]
-    expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamDouble','teamID',%s)",
 STRING_SV_COLUMN));
+    expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamDouble','teamID',%s)", 
STRING_SV_COLUMN));
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     double[] expectedDoubleValues = new double[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      expectedDoubleValues[i] = (double) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode();
+      expectedDoubleValues[i] = (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode();
     }
     testTransformFunction(transformFunction, expectedDoubleValues);
 
     // Lookup col: BytesSV
     // PK: [String]
-    expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamBytes','teamID',%s)", 
STRING_SV_COLUMN));
+    expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamBytes','teamID',%s)", 
STRING_SV_COLUMN));
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     byte[][] expectedBytesValues = new byte[NUM_ROWS][];
     for (int i = 0; i < NUM_ROWS; i++) {
@@ -266,106 +271,100 @@ public class LookupTransformFunctionTest extends 
BaseTransformFunctionTest {
   }
 
   @Test
-  public void multiValueLookupTests()
-      throws Exception {
+  public void multiValueLookupTests() {
     // Lookup col: StringMV
     // PK: [String]
-    ExpressionContext expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamName_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    ExpressionContext expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamName_MV','teamID',%s)", 
STRING_SV_COLUMN));
     TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
     String[][] expectedStringMVValues = new String[NUM_ROWS][];
     for (int i = 0; i < NUM_ROWS; i++) {
       expectedStringMVValues[i] = new String[]{
-          String.format("teamName_for_[%s]_1", _stringSVValues[i]),
-          String.format("teamName_for_[%s]_2", _stringSVValues[i]),
+          String.format("teamName_for_[%s]_1", _stringSVValues[i]), 
String.format("teamName_for_[%s]_2",
+          _stringSVValues[i]),
       };
     }
     testTransformFunctionMV(transformFunction, expectedStringMVValues);
 
     // Lookup col: IntegerMV
     // PK: [String]
-    expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamInteger_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamInteger_MV','teamID',%s)", 
STRING_SV_COLUMN));
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     int[][] expectedIntegerMVValues = new int[NUM_ROWS][0];
     for (int i = 0; i < NUM_ROWS; i++) {
       expectedIntegerMVValues[i] = new int[]{
-          (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(),
-          (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(),
+          (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(), (new 
PrimaryKey(
+          new Object[]{_stringSVValues[i]})).hashCode(),
       };
     }
     testTransformFunctionMV(transformFunction, expectedIntegerMVValues);
 
     // Lookup col: FloatMV
     // PK: [String]
-    expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamFloat_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamFloat_MV','teamID',%s)", 
STRING_SV_COLUMN));
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     float[][] expectedFloatMVValues = new float[NUM_ROWS][0];
     for (int i = 0; i < NUM_ROWS; i++) {
       expectedFloatMVValues[i] = new float[]{
-          (float) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(),
-          (float) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(),
+          (float) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(), (float) (new PrimaryKey(
+          new Object[]{_stringSVValues[i]})).hashCode(),
       };
     }
     testTransformFunctionMV(transformFunction, expectedFloatMVValues);
 
     // Lookup col: LongMV
     // PK: [String]
-    expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamLong_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamLong_MV','teamID',%s)", 
STRING_SV_COLUMN));
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     long[][] expectedLongMVValues = new long[NUM_ROWS][0];
     for (int i = 0; i < NUM_ROWS; i++) {
       expectedLongMVValues[i] = new long[]{
-          (long) (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(),
-          (long) (new PrimaryKey(new Object[]{_stringSVValues[i]})).hashCode(),
+          (long) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(), (long) (new PrimaryKey(
+          new Object[]{_stringSVValues[i]})).hashCode(),
       };
     }
     testTransformFunctionMV(transformFunction, expectedLongMVValues);
 
     // Lookup col: DoubleMV
     // PK: [String]
-    expression = RequestContextUtils
-        
.getExpression(String.format("lookup('baseballTeams','teamDouble_MV','teamID',%s)",
 STRING_SV_COLUMN));
+    expression = RequestContextUtils.getExpression(
+        String.format("lookup('baseballTeams','teamDouble_MV','teamID',%s)", 
STRING_SV_COLUMN));
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     double[][] expectedDoubleMVValues = new double[NUM_ROWS][0];
     for (int i = 0; i < NUM_ROWS; i++) {
       expectedDoubleMVValues[i] = new double[]{
-          (double) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(),
-          (double) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(),
+          (double) (new PrimaryKey(new 
Object[]{_stringSVValues[i]})).hashCode(), (double) (new PrimaryKey(
+          new Object[]{_stringSVValues[i]})).hashCode(),
       };
     }
     testTransformFunctionMV(transformFunction, expectedDoubleMVValues);
   }
 
   @Test
-  public void primaryKeyTypeTest()
-      throws Exception {
+  public void primaryKeyTypeTest() {
     // preparing simple tables for testing different primary key types (INT, 
STRING, LONG)
-    Map<String, FieldSpec.DataType> testTables = new HashMap<String, 
FieldSpec.DataType>() {{
-      put("dimTableWithIntPK_OFFLINE", FieldSpec.DataType.INT);
-      put("dimTableWithStringPK_OFFLINE", FieldSpec.DataType.STRING);
-      put("dimTableWithLongPK_OFFLINE", FieldSpec.DataType.LONG);
-      put("dimTableWithFloatPK_OFFLINE", FieldSpec.DataType.FLOAT);
-      put("dimTableWithDoublePK_OFFLINE", FieldSpec.DataType.DOUBLE);
-      put("dimTableWithBytesPK_OFFLINE", FieldSpec.DataType.BYTES);
+    Map<String, DataType> testTables = new HashMap<>() {{
+      put("dimTableWithIntPK_OFFLINE", DataType.INT);
+      put("dimTableWithStringPK_OFFLINE", DataType.STRING);
+      put("dimTableWithLongPK_OFFLINE", DataType.LONG);
+      put("dimTableWithFloatPK_OFFLINE", DataType.FLOAT);
+      put("dimTableWithDoublePK_OFFLINE", DataType.DOUBLE);
+      put("dimTableWithBytesPK_OFFLINE", DataType.BYTES);
     }};
-    for (Map.Entry<String, FieldSpec.DataType> table : testTables.entrySet()) {
+    for (Map.Entry<String, DataType> table : testTables.entrySet()) {
       DimensionTableDataManager mgr = mock(DimensionTableDataManager.class);
       DimensionTableDataManager.registerDimensionTable(table.getKey(), mgr);
       when(mgr.isPopulated()).thenReturn(true);
-      
when(mgr.getPrimaryKeyColumns()).thenReturn(Arrays.asList("primaryColumn"));
-      when(mgr.getColumnFieldSpec("primaryColumn"))
-          .thenReturn(new DimensionFieldSpec("primaryColumn", 
table.getValue(), true));
-      when(mgr.getColumnFieldSpec("lookupColumn"))
-          .thenReturn(new DimensionFieldSpec("lookupColumn", 
FieldSpec.DataType.STRING, true));
-      
when(mgr.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation -> 
{
-        PrimaryKey key = invocation.getArgument(0);
-        GenericRow row = new GenericRow();
-        row.putValue("lookupColumn", String.format("lookup_value_for_[%s]", 
key.hashCode()));
-        return row;
-      });
+      when(mgr.getPrimaryKeyColumns()).thenReturn(List.of("primaryColumn"));
+      when(mgr.getColumnFieldSpec("primaryColumn")).thenReturn(
+          new DimensionFieldSpec("primaryColumn", table.getValue(), true));
+      when(mgr.getColumnFieldSpec("lookupColumn")).thenReturn(
+          new DimensionFieldSpec("lookupColumn", DataType.STRING, true));
+      when(mgr.lookupValue(any(PrimaryKey.class), 
eq("lookupColumn"))).thenAnswer(
+          invocation -> String.format("lookup_value_for_[%s]", 
invocation.getArgument(0).hashCode()));
     }
 
     // PK: [Int]
@@ -374,7 +373,7 @@ public class LookupTransformFunctionTest extends 
BaseTransformFunctionTest {
     TransformFunction transformFunction = 
TransformFunctionFactory.get(expression, _dataSourceMap);
     String[] expectedResults = new String[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      PrimaryKey key = new PrimaryKey(new Object[]{(Integer) _intSVValues[i]});
+      PrimaryKey key = new PrimaryKey(new Object[]{_intSVValues[i]});
       expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
     }
     testTransformFunction(transformFunction, expectedResults);
@@ -396,7 +395,7 @@ public class LookupTransformFunctionTest extends 
BaseTransformFunctionTest {
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     expectedResults = new String[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      PrimaryKey key = new PrimaryKey(new Object[]{(Long) _longSVValues[i]});
+      PrimaryKey key = new PrimaryKey(new Object[]{_longSVValues[i]});
       expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
     }
     testTransformFunction(transformFunction, expectedResults);
@@ -407,7 +406,7 @@ public class LookupTransformFunctionTest extends 
BaseTransformFunctionTest {
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     expectedResults = new String[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      PrimaryKey key = new PrimaryKey(new Object[]{(Float) _floatSVValues[i]});
+      PrimaryKey key = new PrimaryKey(new Object[]{_floatSVValues[i]});
       expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
     }
     testTransformFunction(transformFunction, expectedResults);
@@ -418,7 +417,7 @@ public class LookupTransformFunctionTest extends 
BaseTransformFunctionTest {
     transformFunction = TransformFunctionFactory.get(expression, 
_dataSourceMap);
     expectedResults = new String[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      PrimaryKey key = new PrimaryKey(new Object[]{(Double) 
_doubleSVValues[i]});
+      PrimaryKey key = new PrimaryKey(new Object[]{_doubleSVValues[i]});
       expectedResults[i] = String.format("lookup_value_for_[%s]", 
key.hashCode());
     }
     testTransformFunction(transformFunction, expectedResults);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
index 2b9f28e5ec..ff2ee59f9e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
@@ -231,6 +231,9 @@ public class PinotSegmentRecordReader implements 
RecordReader {
     }
   }
 
+  // TODO:
+  //   - Currently there is no check on column existence
+  //   - Null value is not handled (default null value is returned)
   public Object getValue(int docId, String column) {
     return _columnReaderMap.get(column).getValue(docId);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to