This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ec75972a Optimize initialization time and memory usage of dimension 
table (#15130)
f3ec75972a is described below

commit f3ec75972aae72aeffef833d4a8d5bc62fa461e7
Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com>
AuthorDate: Tue Mar 25 11:52:24 2025 +0100

    Optimize initialization time and memory usage of dimension table (#15130)
    
    * Optimize creation and memory usage of eagerly-loaded dimension table.
    
    * Add missing header.
    
    * Trigger CI.
    
    * Optimize memory usage and initialization time of  memory-optimized 
dimension table.
    
    * Fix bad conditional.
    
    * Fixed dimension table shutdown.
---
 .../controller/helix/ControllerRequestClient.java  |   8 +-
 .../pinot/controller/helix/ControllerTest.java     |   5 +
 .../core/data/manager/offline/DimensionTable.java  |   7 +
 .../manager/offline/DimensionTableDataManager.java | 126 +++++++--
 .../manager/offline/FastLookupDimensionTable.java  |  83 +++++-
 .../offline/MemoryOptimizedDimensionTable.java     |  51 +++-
 .../offline/DimensionTableDataManagerTest.java     |  47 +++-
 .../tests/DimensionTableIntegrationTest.java       | 169 ++++++++++++
 .../perf/BenchmarkDimensionTableOverhead.java      | 291 +++++++++++++++++++++
 .../query/runtime/operator/LookupJoinOperator.java |  17 +-
 .../segment/readers/PinotSegmentRecordReader.java  |  45 +++-
 .../utils/builder/ControllerRequestURLBuilder.java |  10 +-
 12 files changed, 790 insertions(+), 69 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 311a1caada..754367fef5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -139,9 +139,15 @@ public class ControllerRequestClient {
 
   public void deleteTable(String tableNameWithType)
       throws IOException {
+    deleteTable(tableNameWithType, null);
+  }
+
+  public void deleteTable(String tableNameWithType, String retentionPeriod)
+      throws IOException {
     try {
       HttpClient.wrapAndThrowHttpException(
-          _httpClient.sendDeleteRequest(new 
URI(_controllerRequestURLBuilder.forTableDelete(tableNameWithType)),
+          _httpClient.sendDeleteRequest(
+              new 
URI(_controllerRequestURLBuilder.forTableDelete(tableNameWithType, 
retentionPeriod)),
               _headers));
     } catch (HttpErrorStatusException | URISyntaxException e) {
       throw new IOException(e);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 5b213da026..7a00a4d8e4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -702,6 +702,11 @@ public class ControllerTest {
     
getControllerRequestClient().deleteTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
   }
 
+  public void dropOfflineTable(String tableName, String retentionPeriod)
+      throws IOException {
+    
getControllerRequestClient().deleteTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
 retentionPeriod);
+  }
+
   public void dropRealtimeTable(String tableName)
       throws IOException {
     
getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
index 92637a45fd..07c2f7ade8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
@@ -37,6 +37,13 @@ public interface DimensionTable extends Closeable {
 
   boolean containsKey(PrimaryKey pk);
 
+  /**
+   * Deprecated because GenericRow is an inefficient data structure.
+   * Use getValue() or getValues() instead.
+   * @param pk primary key
+   * @return primary key and value as GenericRow.
+   */
+  @Deprecated
   @Nullable
   GenericRow getRow(PrimaryKey pk);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 6ed4edfd48..5e36d847e4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -20,11 +20,15 @@ package org.apache.pinot.core.data.manager.offline;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.Hash;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenCustomHashMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -59,6 +63,17 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
 
   // Storing singletons per table in a map
   private static final Map<String, DimensionTableDataManager> INSTANCES = new 
ConcurrentHashMap<>();
+  public static final Hash.Strategy<Object[]> HASH_STRATEGY = new 
Hash.Strategy<>() {
+    @Override
+    public int hashCode(Object[] o) {
+      return Arrays.hashCode(o);
+    }
+
+    @Override
+    public boolean equals(Object[] a, Object[] b) {
+      return Arrays.equals(a, b);
+    }
+  };
 
   private DimensionTableDataManager() {
   }
@@ -110,11 +125,19 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     }
 
     if (_disablePreload) {
+      Object2LongOpenCustomHashMap<Object[]> lookupTable = new 
Object2LongOpenCustomHashMap<>(HASH_STRATEGY);
+      lookupTable.defaultReturnValue(Long.MIN_VALUE);
+
       _dimensionTable.set(
-          new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, 
Collections.emptyMap(), Collections.emptyList(),
+          new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, 
lookupTable, Collections.emptyList(),
               Collections.emptyList(), this));
     } else {
-      _dimensionTable.set(new FastLookupDimensionTable(schema, 
primaryKeyColumns, new HashMap<>()));
+      List<String> valueColumns = getValueColumns(schema.getColumnNames(), 
primaryKeyColumns);
+
+      Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable =
+          new Object2ObjectOpenCustomHashMap<>(HASH_STRATEGY);
+
+      _dimensionTable.set(new FastLookupDimensionTable(schema, 
primaryKeyColumns, valueColumns, lookupTable));
     }
   }
 
@@ -151,8 +174,10 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
   protected void doShutdown() {
     releaseAndRemoveAllSegments();
     closeDimensionTable(_dimensionTable.get());
+    INSTANCES.remove(_tableNameWithType);
   }
 
+
   private void closeDimensionTable(DimensionTable dimensionTable) {
     try {
       dimensionTable.close();
@@ -187,36 +212,53 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
-    Map<PrimaryKey, GenericRow> lookupTable = new HashMap<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
     try {
+      // count all documents to limit map re-sizings
+      int totalDocs = 0;
+      for (SegmentDataManager segmentManager : segmentDataManagers) {
+        IndexSegment indexSegment = segmentManager.getSegment();
+        totalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+      }
+
+      Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable =
+          new Object2ObjectOpenCustomHashMap<>(totalDocs, HASH_STRATEGY);
+
+      List<String> valueColumns = getValueColumns(schema.getColumnNames(), 
primaryKeyColumns);
+
       for (SegmentDataManager segmentManager : segmentDataManagers) {
         IndexSegment indexSegment = segmentManager.getSegment();
         int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
         if (numTotalDocs > 0) {
           try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
             recordReader.init(indexSegment);
+
+            int[] pkIndexes = 
recordReader.getIndexesForColumns(primaryKeyColumns);
+            int[] valIndexes = recordReader.getIndexesForColumns(valueColumns);
+
             for (int i = 0; i < numTotalDocs; i++) {
               if (_loadToken.get() != token) {
                 // Token changed during the loading, abort the loading
                 return null;
               }
-              GenericRow row = new GenericRow();
-              recordReader.getRecord(i, row);
-              GenericRow previousRow = 
lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
-              if (_errorOnDuplicatePrimaryKey && previousRow != null) {
+
+              Object[] primaryKey = recordReader.getRecordValues(i, pkIndexes);
+              Object[] values = recordReader.getRecordValues(i, valIndexes);
+
+              Object[] previousValue = lookupTable.put(primaryKey, values);
+              if (_errorOnDuplicatePrimaryKey && previousValue != null) {
                 throw new IllegalStateException(
                     "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName()
-                        + "primary key already exist for: " + 
row.getPrimaryKey(primaryKeyColumns));
+                        + "primary key already exist for: " + 
Arrays.toString(primaryKey));
               }
             }
           } catch (Exception e) {
             throw new RuntimeException(
-                "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName());
+                "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName(), e);
           }
         }
       }
-      return new FastLookupDimensionTable(schema, primaryKeyColumns, 
lookupTable);
+      return new FastLookupDimensionTable(schema, primaryKeyColumns, 
valueColumns, lookupTable);
     } finally {
       for (SegmentDataManager segmentManager : segmentDataManagers) {
         releaseSegment(segmentManager);
@@ -224,6 +266,16 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     }
   }
 
+  private static List<String> getValueColumns(NavigableSet<String> 
columnNames, List<String> primaryKeyColumns) {
+    List<String> nonPkColumns = new ArrayList<>(columnNames.size() - 
primaryKeyColumns.size());
+    for (String columnName : columnNames) {
+      if (!primaryKeyColumns.contains(columnName)) {
+        nonPkColumns.add(columnName);
+      }
+    }
+    return nonPkColumns;
+  }
+
   @Nullable
   private DimensionTable createMemOptimisedDimensionTable() {
     // Acquire a token in the beginning. Abort the loading and return null 
when the token changes because another
@@ -235,40 +287,41 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
-    int numPrimaryKeyColumns = primaryKeyColumns.size();
 
-    Map<PrimaryKey, LookupRecordLocation> lookupTable = new HashMap<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
     List<PinotSegmentRecordReader> recordReaders = new 
ArrayList<>(segmentDataManagers.size());
+
+    int totalDocs = 0;
+    for (SegmentDataManager segmentManager : segmentDataManagers) {
+      IndexSegment indexSegment = segmentManager.getSegment();
+      totalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+    }
+
+    Object2LongOpenCustomHashMap<Object[]> lookupTable = new 
Object2LongOpenCustomHashMap<>(totalDocs, HASH_STRATEGY);
+    lookupTable.defaultReturnValue(Long.MIN_VALUE);
+
     for (SegmentDataManager segmentManager : segmentDataManagers) {
       IndexSegment indexSegment = segmentManager.getSegment();
       int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
       if (numTotalDocs > 0) {
         try {
+          int readerIdx = recordReaders.size();
           PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader();
           recordReader.init(indexSegment);
           recordReaders.add(recordReader);
+          int[] pkIndexes = 
recordReader.getIndexesForColumns(primaryKeyColumns);
+
           for (int i = 0; i < numTotalDocs; i++) {
             if (_loadToken.get() != token) {
               // Token changed during the loading, abort the loading
-              for (PinotSegmentRecordReader reader : recordReaders) {
-                try {
-                  reader.close();
-                } catch (Exception e) {
-                  _logger.error("Caught exception while closing record reader 
for segment: {}", reader.getSegmentName(),
-                      e);
-                }
-              }
-              for (SegmentDataManager dataManager : segmentDataManagers) {
-                releaseSegment(dataManager);
-              }
+              releaseResources(recordReaders, segmentDataManagers);
               return null;
             }
-            Object[] values = new Object[numPrimaryKeyColumns];
-            for (int j = 0; j < numPrimaryKeyColumns; j++) {
-              values[j] = recordReader.getValue(i, primaryKeyColumns.get(j));
-            }
-            lookupTable.put(new PrimaryKey(values), new 
LookupRecordLocation(recordReader, i));
+
+            Object[] primaryKey = recordReader.getRecordValues(i, pkIndexes);
+
+            long readerIdxAndDocId = (((long) readerIdx) << 32) | (i & 
0xffffffffL);
+            lookupTable.put(primaryKey, readerIdxAndDocId);
           }
         } catch (Exception e) {
           throw new RuntimeException(
@@ -280,6 +333,21 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
         this);
   }
 
+  private void releaseResources(List<PinotSegmentRecordReader> recordReaders,
+      List<SegmentDataManager> segmentDataManagers) {
+    for (PinotSegmentRecordReader reader : recordReaders) {
+      try {
+        reader.close();
+      } catch (Exception e) {
+        _logger.error("Caught exception while closing record reader for 
segment: {}", reader.getSegmentName(),
+            e);
+      }
+    }
+    for (SegmentDataManager dataManager : segmentDataManagers) {
+      releaseSegment(dataManager);
+    }
+  }
+
   public boolean isPopulated() {
     return !_dimensionTable.get().isEmpty();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
index ccbce7439c..a764d73c3b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
@@ -18,8 +18,9 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap;
 import java.util.List;
-import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -28,15 +29,35 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
 
 
 public class FastLookupDimensionTable implements DimensionTable {
-  private final Map<PrimaryKey, GenericRow> _lookupTable;
+
+  private final Object2ObjectOpenCustomHashMap<Object[], Object[]> 
_lookupTable;
   private final Schema _tableSchema;
   private final List<String> _primaryKeyColumns;
+  private final List<String> _valueColumns;
+  private final int _keysNum;
+
+  private final Object2IntOpenHashMap<String> _columnNamesToIdx;
 
-  FastLookupDimensionTable(Schema tableSchema, List<String> primaryKeyColumns,
-      Map<PrimaryKey, GenericRow> lookupTable) {
+  FastLookupDimensionTable(Schema tableSchema,
+      List<String> primaryKeyColumns,
+      List<String> valueColumns,
+      Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable) {
     _lookupTable = lookupTable;
     _tableSchema = tableSchema;
     _primaryKeyColumns = primaryKeyColumns;
+    _keysNum = _primaryKeyColumns.size();
+    _valueColumns = valueColumns;
+
+    _columnNamesToIdx = new Object2IntOpenHashMap<>(_primaryKeyColumns.size() 
+ valueColumns.size());
+    _columnNamesToIdx.defaultReturnValue(Integer.MIN_VALUE);
+
+    int idx = 0;
+    for (String column : primaryKeyColumns) {
+      _columnNamesToIdx.put(column, idx++);
+    }
+    for (String column : valueColumns) {
+      _columnNamesToIdx.put(column, idx++);
+    }
   }
 
   @Override
@@ -57,35 +78,71 @@ public class FastLookupDimensionTable implements 
DimensionTable {
 
   @Override
   public boolean containsKey(PrimaryKey pk) {
-    return _lookupTable.containsKey(pk);
+    return _lookupTable.containsKey(pk.getValues());
   }
 
+  /**
+   * This method returns GenericRow, which has big memory and cpu overhead.
+   */
+  @Deprecated
   @Nullable
   @Override
   public GenericRow getRow(PrimaryKey pk) {
-    return _lookupTable.get(pk);
+    Object[] rawPk = pk.getValues();
+    Object[] value = _lookupTable.get(rawPk);
+    if (value == null) {
+      return null;
+    }
+
+    GenericRow row = new GenericRow();
+    int pIdx = 0;
+    for (String column : _primaryKeyColumns) {
+      row.putValue(column, rawPk[pIdx++]);
+    }
+
+    int vIdx = 0;
+    for (String column : _valueColumns) {
+      row.putValue(column, value[vIdx++]);
+    }
+
+    return row;
   }
 
   @Nullable
   @Override
   public Object getValue(PrimaryKey pk, String columnName) {
-    GenericRow row = _lookupTable.get(pk);
-    return row != null ? row.getValue(columnName) : null;
+    Object[] value = _lookupTable.get(pk.getValues());
+    if (value == null) {
+      return null;
+    }
+
+    return getValue(pk, columnName, value);
+  }
+
+  private Object getValue(PrimaryKey pk, String columnName, Object[] values) {
+    int idx = _columnNamesToIdx.getInt(columnName);
+    if (idx < 0) {
+      return null;
+    } else if (idx < _keysNum) {
+      return pk.getValues()[idx];
+    } else {
+      return values[idx - _keysNum];
+    }
   }
 
   @Nullable
   @Override
   public Object[] getValues(PrimaryKey pk, String[] columnNames) {
-    GenericRow row = _lookupTable.get(pk);
-    if (row == null) {
+    Object[] rowValues = _lookupTable.get(pk.getValues());
+    if (rowValues == null) {
       return null;
     }
     int numColumns = columnNames.length;
-    Object[] values = new Object[numColumns];
+    Object[] result = new Object[numColumns];
     for (int i = 0; i < numColumns; i++) {
-      values[i] = row.getValue(columnNames[i]);
+      result[i] = getValue(pk, columnNames[i], rowValues);
     }
-    return values;
+    return result;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
index 1303bf3bd8..b04d9a7805 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
+import it.unimi.dsi.fastutil.objects.Object2LongOpenCustomHashMap;
 import java.util.List;
-import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 public class MemoryOptimizedDimensionTable implements DimensionTable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryOptimizedDimensionTable.class);
 
-  private final Map<PrimaryKey, LookupRecordLocation> _lookupTable;
+  private final Object2LongOpenCustomHashMap<Object[]> _lookupTable;
   private final Schema _tableSchema;
   private final List<String> _primaryKeyColumns;
   private final ThreadLocal<GenericRow> _reuse = 
ThreadLocal.withInitial(GenericRow::new);
@@ -43,9 +43,12 @@ public class MemoryOptimizedDimensionTable implements 
DimensionTable {
   private final List<PinotSegmentRecordReader> _recordReaders;
   private final TableDataManager _tableDataManager;
 
-  MemoryOptimizedDimensionTable(Schema tableSchema, List<String> 
primaryKeyColumns,
-      Map<PrimaryKey, LookupRecordLocation> lookupTable, 
List<SegmentDataManager> segmentDataManagers,
-      List<PinotSegmentRecordReader> recordReaders, TableDataManager 
tableDataManager) {
+  MemoryOptimizedDimensionTable(Schema tableSchema,
+      List<String> primaryKeyColumns,
+      Object2LongOpenCustomHashMap<Object[]> lookupTable,
+      List<SegmentDataManager> segmentDataManagers,
+      List<PinotSegmentRecordReader> recordReaders,
+      TableDataManager tableDataManager) {
     _tableSchema = tableSchema;
     _primaryKeyColumns = primaryKeyColumns;
     _lookupTable = lookupTable;
@@ -71,39 +74,59 @@ public class MemoryOptimizedDimensionTable implements 
DimensionTable {
 
   @Override
   public boolean containsKey(PrimaryKey pk) {
-    return _lookupTable.containsKey(pk);
+    return _lookupTable.containsKey(pk.getValues());
   }
 
   @Nullable
   @Override
   public GenericRow getRow(PrimaryKey pk) {
-    LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
-    if (lookupRecordLocation == null) {
+    long readerIdxAndDocId = _lookupTable.getLong(pk.getValues());
+    if (readerIdxAndDocId < 0) {
       return null;
     }
+
     GenericRow reuse = _reuse.get();
     reuse.clear();
-    return lookupRecordLocation.getRecord(reuse);
+
+    int docId = (int) (readerIdxAndDocId & 0xffffffffL);
+    int readerIdx = (int) (readerIdxAndDocId >> 32);
+
+    PinotSegmentRecordReader recordReader = _recordReaders.get(readerIdx);
+    recordReader.getRecord(docId, reuse);
+    return reuse;
   }
 
   @Nullable
   @Override
   public Object getValue(PrimaryKey pk, String columnName) {
-    LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
-    return lookupRecordLocation != null ? 
lookupRecordLocation.getValue(columnName) : null;
+    long readerIdxAndDocId = _lookupTable.getLong(pk.getValues());
+    if (readerIdxAndDocId < 0) {
+      return null;
+    }
+
+    int docId = (int) (readerIdxAndDocId & 0xffffffffL);
+    int readerIdx = (int) (readerIdxAndDocId >> 32);
+
+    PinotSegmentRecordReader recordReader = _recordReaders.get(readerIdx);
+    return recordReader.getValue(docId, columnName);
   }
 
   @Nullable
   @Override
   public Object[] getValues(PrimaryKey pk, String[] columnNames) {
-    LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk);
-    if (lookupRecordLocation == null) {
+    long readerIdxAndDocId = _lookupTable.getLong(pk.getValues());
+    if (readerIdxAndDocId < 0) {
       return null;
     }
+
+    int docId = (int) (readerIdxAndDocId & 0xffffffffL);
+    int readerIdx = (int) (readerIdxAndDocId >> 32);
+    PinotSegmentRecordReader recordReader = _recordReaders.get(readerIdx);
+
     int numColumns = columnNames.length;
     Object[] values = new Object[numColumns];
     for (int i = 0; i < numColumns; i++) {
-      values[i] = lookupRecordLocation.getValue(columnNames[i]);
+      values[i] = recordReader.getValue(docId, columnNames[i]);
     }
     return values;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 5c8edb06b6..469fd658fc 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -129,21 +130,28 @@ public class DimensionTableDataManagerTest {
   }
 
   private Schema getSchema() {
-    return new Schema.SchemaBuilder().setSchemaName("dimBaseballTeams")
-        .addSingleValueDimension("teamID", 
DataType.STRING).addSingleValueDimension("teamName", DataType.STRING)
-        .setPrimaryKeyColumns(Collections.singletonList("teamID")).build();
+    return new Schema.SchemaBuilder()
+        .setSchemaName("dimBaseballTeams")
+        .addSingleValueDimension("teamID", DataType.STRING)
+        .addSingleValueDimension("teamName", DataType.STRING)
+        .setPrimaryKeyColumns(Collections.singletonList("teamID"))
+        .build();
   }
 
   private TableConfig getTableConfig(boolean disablePreload, boolean 
errorOnDuplicatePrimaryKey) {
     DimensionTableConfig dimensionTableConfig = new 
DimensionTableConfig(disablePreload, errorOnDuplicatePrimaryKey);
-    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName("dimBaseballTeams")
-        .setDimensionTableConfig(dimensionTableConfig).build();
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("dimBaseballTeams")
+        .setDimensionTableConfig(dimensionTableConfig)
+        .build();
   }
 
   private Schema getSchemaWithExtraColumn() {
     return new Schema.SchemaBuilder().setSchemaName("dimBaseballTeams")
-        .addSingleValueDimension("teamID", 
DataType.STRING).addSingleValueDimension("teamName", DataType.STRING)
-        .addSingleValueDimension("teamCity", 
DataType.STRING).setPrimaryKeyColumns(Collections.singletonList("teamID"))
+        .addSingleValueDimension("teamID", DataType.STRING)
+        .addSingleValueDimension("teamName", DataType.STRING)
+        .addSingleValueDimension("teamCity", DataType.STRING)
+        .setPrimaryKeyColumns(Collections.singletonList("teamID"))
         .build();
   }
 
@@ -350,6 +358,31 @@ public class DimensionTableDataManagerTest {
     assertNull(tableDataManager.lookupRow(key));
   }
 
+  @DataProvider(name = "options")
+  private Object[] getOptions() {
+    return new Boolean[]{true, false};
+  }
+
+  @Test(dataProvider = "options")
+  public void testDeleteTableRemovesManagerFromMemory(boolean disablePreload)
+      throws Exception {
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, 
AccessOption.PERSISTENT)).thenReturn(
+        SchemaUtils.toZNRecord(getSchema()));
+    when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, 
AccessOption.PERSISTENT)).thenReturn(
+        TableConfigUtils.toZNRecord(getTableConfig(disablePreload, false)));
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    DimensionTableDataManager tableDataManager = 
makeTableDataManager(helixManager);
+
+    tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, 
_indexLoadingConfig,
+        SEGMENT_OPERATIONS_THROTTLER));
+
+    tableDataManager.shutDown();
+
+    
Assert.assertNull(DimensionTableDataManager.getInstanceByTableName(tableDataManager.getTableName()));
+  }
+
   @Test
   public void testLookupErrorOnDuplicatePrimaryKey()
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DimensionTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DimensionTableIntegrationTest.java
new file mode 100644
index 0000000000..945f2acbd7
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DimensionTableIntegrationTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.spi.config.table.DimensionTableConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class DimensionTableIntegrationTest extends BaseClusterIntegrationTest {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(DimensionTableIntegrationTest.class);
+  private static final String LONG_COL = "longCol";
+  private static final String INT_COL = "intCol";
+
+  @Test
+  public void testDelete()
+      throws Exception {
+    JsonNode node = postQuery("select count(*) from " + getTableName());
+    assertNoError(node);
+    
Assert.assertEquals(node.get("resultTable").get("rows").get(0).get(0).asInt(), 
getCountStarResult());
+
+    dropOfflineTable(getTableName(), "-1d");
+
+    
waitForEVToDisappear(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()));
+
+    Assert.assertNull(
+        
DimensionTableDataManager.getInstanceByTableName(TableNameBuilder.OFFLINE.tableNameWithType(getTableName())));
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder()
+        .setSchemaName(getTableName())
+        .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .setPrimaryKeyColumns(Collections.singletonList(LONG_COL))
+        .build();
+  }
+
+  List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+        new org.apache.avro.Schema.Field(LONG_COL, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
+            null, null),
+        new org.apache.avro.Schema.Field(INT_COL, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT),
+            null, null)));
+
+    ArrayList<File> files = new ArrayList<>();
+
+    for (int fi = 0; fi < 2; fi++) {
+      File file = new File(_tempDir, "data" + fi + ".avro");
+      try (DataFileWriter<GenericData.Record> writer = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+        writer.create(avroSchema, file);
+        for (int i = 0; i < getCountStarResult() / 2; i++) {
+          GenericData.Record record = new GenericData.Record(avroSchema);
+          record.put(LONG_COL, i);
+          record.put(INT_COL, i);
+          writer.append(record);
+        }
+      }
+      files.add(file);
+    }
+    return files;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 1000;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    LOGGER.warn("Setting up integration test class: {}", 
getClass().getSimpleName());
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    if (_controllerRequestURLBuilder == null) {
+      _controllerRequestURLBuilder =
+          ControllerRequestURLBuilder.baseUrl("http://localhost:"; + 
getControllerPort());
+    }
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    Schema schema = createSchema();
+    addSchema(schema);
+
+    List<File> avroFiles = createAvroFiles();
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    waitForAllDocsLoaded(60_000);
+    LOGGER.warn("Finished setting up integration test class: {}", 
getClass().getSimpleName());
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    LOGGER.warn("Tearing down integration test class: {}", 
getClass().getSimpleName());
+    FileUtils.deleteDirectory(_tempDir);
+
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+    LOGGER.warn("Finished tearing down integration test class: {}", 
getClass().getSimpleName());
+  }
+
+  @Override
+  public TableConfig createOfflineTableConfig() {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(getTableName())
+        .setDimensionTableConfig(new DimensionTableConfig(false, false))
+        .setIsDimTable(true)
+        .build();
+  }
+}
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
new file mode 100644
index 0000000000..6dd6143009
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.SchemaUtils;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.queries.BaseQueriesTest;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler;
+import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
+import org.apache.pinot.segment.local.utils.SegmentStarTreePreprocessThrottler;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.DimensionTableConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+// Tests initialization of dimension tables
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkDimensionTableOverhead extends BaseQueriesTest {
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new OptionsBuilder()
+        .include(BenchmarkDimensionTableOverhead.class.getSimpleName())
+        .shouldDoGC(true);
+    new Runner(opt.build()).run();
+  }
+
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"FilteredAggregationsTest");
+  private static final String TABLE_NAME = "MyTable";
+  private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d";
+  private static final String INT_COL_NAME = "INT_COL";
+  private static final String SORTED_COL_NAME = "SORTED_COL";
+  private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
+  private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
+  private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
+  private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
+  private static final String LOW_CARDINALITY_STRING_COL = 
"LOW_CARDINALITY_STRING_COL";
+  private static final String TIMESTAMP_COL = "TSTMP_COL";
+  private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>();
+
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .setSchemaName(TABLE_NAME)
+      .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
+      .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
+      .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
+      .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
+      .addSingleValueDimension(RAW_STRING_COL_NAME, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(NO_INDEX_STRING_COL, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(LOW_CARDINALITY_STRING_COL, 
FieldSpec.DataType.STRING)
+      .addSingleValueDimension(TIMESTAMP_COL, FieldSpec.DataType.TIMESTAMP)
+      .setPrimaryKeyColumns(Arrays.asList(SORTED_COL_NAME, 
RAW_STRING_COL_NAME, NO_INDEX_STRING_COL, RAW_INT_COL_NAME))
+      .build();
+
+  private static final SegmentOperationsThrottler SEGMENT_OPERATIONS_THROTTLER 
= new SegmentOperationsThrottler(
+      new SegmentAllIndexPreprocessThrottler(1, 2, true),
+      new SegmentStarTreePreprocessThrottler(1, 2, true),
+      new SegmentDownloadThrottler(1, 2, true));
+
+  @Param({"1"})
+  private int _numSegments;
+
+  @Param("3000000")
+  private int _numRows;
+
+  @Param({"true", "false"})
+  boolean _disablePreload;
+
+  private static int _iteration = 0;
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+  private Distribution.DataSupplier _supplier;
+  private DimensionTableDataManager _tableDataManager;
+
+  @Setup(Level.Iteration)
+  public void setUp()
+      throws Exception {
+    _supplier = Distribution.createSupplier(42, "EXP(0.001)");
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    _indexSegments = new ArrayList<>();
+    TableConfig tableConfig = getTableConfig(false);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
+    for (int i = 0; i < _numSegments; i++) {
+      buildSegment(String.format(SEGMENT_NAME_TEMPLATE, i), tableConfig);
+      _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, 
String.format(SEGMENT_NAME_TEMPLATE, i)),
+          indexLoadingConfig));
+    }
+    _indexSegment = _indexSegments.get(0);
+
+    System.gc();
+  }
+
+  @Benchmark
+  public DimensionTableDataManager benchmark()
+      throws JsonProcessingException {
+    TableConfig tableConfig = getTableConfig(_disablePreload);
+
+    HelixManager helixManager = Mockito.mock(HelixManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
Mockito.mock(ZkHelixPropertyStore.class);
+
+    Mockito.when(propertyStore.get("/SCHEMAS/" + TABLE_NAME, null, 
AccessOption.PERSISTENT))
+        .thenReturn(SchemaUtils.toZNRecord(SCHEMA));
+
+    Mockito.when(propertyStore.get("/CONFIGS/TABLE/MyTable_OFFLINE", null, 
AccessOption.PERSISTENT))
+        .thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+
+    
Mockito.when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
+    InstanceDataManagerConfig instanceDataManagerConfig = 
Mockito.mock(InstanceDataManagerConfig.class);
+    Mockito.when(instanceDataManagerConfig.getInstanceDataDir())
+        .thenReturn(INDEX_DIR.getParentFile().getAbsolutePath());
+
+    String tableName = TABLE_NAME + "_" + _iteration;
+    _tableDataManager = 
DimensionTableDataManager.createInstanceByTableName(tableName);
+    _tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, null, null,
+        SEGMENT_OPERATIONS_THROTTLER);
+    _tableDataManager.start();
+
+    for (int i = 0; i < _indexSegments.size(); i++) {
+      _tableDataManager.addSegment((ImmutableSegment) _indexSegments.get(i));
+    }
+
+    return _tableDataManager;
+  }
+
+  @TearDown(Level.Iteration)
+  public void tearDown() {
+    _tableDataManager.shutDown();
+
+    for (IndexSegment indexSegment : _indexSegments) {
+      indexSegment.destroy();
+    }
+
+    FileUtils.deleteQuietly(INDEX_DIR);
+    EXECUTOR_SERVICE.shutdownNow();
+
+    _iteration++;
+  }
+
+  private TableConfig getTableConfig(boolean disablePreload) {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(TABLE_NAME)
+        .setFieldConfigList(FIELD_CONFIGS)
+        .setNoDictionaryColumns(List.of(RAW_INT_COL_NAME, RAW_STRING_COL_NAME, 
TIMESTAMP_COL))
+        .setSortedColumn(SORTED_COL_NAME)
+        .setDimensionTableConfig(new DimensionTableConfig(disablePreload, 
false))
+        .build();
+  }
+
+  static LazyDataGenerator createTestData(int numRows, 
Distribution.DataSupplier supplier) {
+    //create data lazily to prevent OOM and speed up setup
+
+    return new LazyDataGenerator() {
+      private final Map<Integer, UUID> _strings = new HashMap<>();
+      private final String[] _lowCardinalityValues =
+          IntStream.range(0, 10).mapToObj(i -> "value" + 
i).toArray(String[]::new);
+      private Distribution.DataSupplier _supplier = supplier;
+
+      @Override
+      public int size() {
+        return numRows;
+      }
+
+      @Override
+      public GenericRow next(GenericRow row, int i) {
+        row.putValue(SORTED_COL_NAME, numRows - i);
+        row.putValue(INT_COL_NAME, (int) _supplier.getAsLong());
+        row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong());
+        row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong());
+        long rawStrKey = (_supplier.getAsLong());
+        row.putValue(RAW_STRING_COL_NAME,
+            _strings.computeIfAbsent((int) rawStrKey, k -> 
UUID.randomUUID()).toString());
+        row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME));
+        row.putValue(LOW_CARDINALITY_STRING_COL, _lowCardinalityValues[i % 
_lowCardinalityValues.length]);
+        row.putValue(TIMESTAMP_COL, i * 1200 * 1000L);
+
+        return null;
+      }
+
+      @Override
+      public void rewind() {
+        _strings.clear();
+        _supplier.reset();
+      }
+    };
+  }
+
+  private void buildSegment(String segmentName, TableConfig tableConfig)
+      throws Exception {
+
+    LazyDataGenerator dataGenerator = createTestData(_numRows, _supplier);
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
SCHEMA);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(segmentName);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new 
GeneratedDataRecordReader(dataGenerator)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+
+    //save generator state so that other segments are not identical to this one
+    _supplier.snapshot();
+  }
+
+  @Override
+  protected String getFilter() {
+    return null;
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
index 1f43543018..6e07ed66a8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
@@ -190,8 +190,11 @@ public class LookupJoinOperator extends MultiStageOperator 
{
   private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) 
{
     List<Object[]> container = leftBlock.getContainer();
     List<Object[]> rows = new ArrayList<>(container.size());
+    PrimaryKey key = new PrimaryKey(new Object[_leftKeyIds.length]);
+
     for (Object[] leftRow : container) {
-      if (_rightTable.containsKey(getKey(leftRow))) {
+      fillKey(leftRow, key);
+      if (_rightTable.containsKey(key)) {
         rows.add(leftRow);
       }
     }
@@ -201,8 +204,11 @@ public class LookupJoinOperator extends MultiStageOperator 
{
   private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) 
{
     List<Object[]> container = leftBlock.getContainer();
     List<Object[]> rows = new ArrayList<>(container.size());
+    PrimaryKey key = new PrimaryKey(new Object[_leftKeyIds.length]);
+
     for (Object[] leftRow : container) {
-      if (!_rightTable.containsKey(getKey(leftRow))) {
+      fillKey(leftRow, key);
+      if (!_rightTable.containsKey(key)) {
         rows.add(leftRow);
       }
     }
@@ -217,6 +223,13 @@ public class LookupJoinOperator extends MultiStageOperator 
{
     return new PrimaryKey(values);
   }
 
+  private void fillKey(Object[] row, PrimaryKey key) {
+    Object[] values = key.getValues();
+    for (int i = 0; i < _leftKeyIds.length; i++) {
+      values[i] = row[_leftKeyIds[i]];
+    }
+  }
+
   private Object[] joinRow(Object[] leftRow, @Nullable Object[] rightRow) {
     Object[] resultRow = new Object[_resultColumnSize];
     System.arraycopy(leftRow, 0, resultRow, 0, leftRow.length);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
index 11e256f886..f1e5834205 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.segment.readers;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,11 @@ public class PinotSegmentRecordReader implements 
RecordReader {
   private IndexSegment _indexSegment;
   private boolean _destroySegmentOnClose;
   private int _numDocs;
+
+  // These two collection allow for more efficient value reading - via indexes
+  private ArrayList<String> _columnNames;
+  private ArrayList<PinotSegmentColumnReader> _columnReaders;
+
   private Map<String, PinotSegmentColumnReader> _columnReaderMap;
   private int[] _sortedDocIds;
   private boolean _skipDefaultNullValues;
@@ -157,15 +163,23 @@ public class PinotSegmentRecordReader implements 
RecordReader {
 
     if (_numDocs > 0) {
       _columnReaderMap = new HashMap<>();
+      _columnReaders = new ArrayList<>();
+      _columnNames = new ArrayList<>();
       Set<String> columnsInSegment = _indexSegment.getPhysicalColumnNames();
       if (CollectionUtils.isEmpty(fieldsToRead)) {
         for (String column : columnsInSegment) {
-          _columnReaderMap.put(column, new 
PinotSegmentColumnReader(indexSegment, column));
+          PinotSegmentColumnReader reader = new 
PinotSegmentColumnReader(indexSegment, column);
+          _columnReaderMap.put(column, reader);
+          _columnNames.add(column);
+          _columnReaders.add(reader);
         }
       } else {
         for (String column : fieldsToRead) {
           if (columnsInSegment.contains(column)) {
-            _columnReaderMap.put(column, new 
PinotSegmentColumnReader(indexSegment, column));
+            PinotSegmentColumnReader reader = new 
PinotSegmentColumnReader(indexSegment, column);
+            _columnReaderMap.put(column, reader);
+            _columnNames.add(column);
+            _columnReaders.add(reader);
           } else {
             LOGGER.warn("Ignoring column: {} that does not exist in the 
segment", column);
           }
@@ -226,6 +240,33 @@ public class PinotSegmentRecordReader implements 
RecordReader {
     }
   }
 
+  public Object[] getRecordValues(int docId, int[] columnIndexes) {
+    Object[] values = new Object[columnIndexes.length];
+    for (int i = 0, n = columnIndexes.length; i < n; i++) {
+      int columnIndex = columnIndexes[i];
+      if (columnIndex > -1) {
+        PinotSegmentColumnReader columnReader = 
_columnReaders.get(columnIndex);
+        if (!columnReader.isNull(docId)) {
+          values[i] = columnReader.getValue(docId);
+        } else if (!_skipDefaultNullValues) {
+          values[i] = columnReader.getValue(docId);
+        } // else null value is kept
+      } // else keep null value
+    }
+
+    return values;
+  }
+
+  public int[] getIndexesForColumns(List<String> columnNames) {
+    int[] indexes = new int[columnNames.size()];
+
+    for (int i = 0, n = columnNames.size(); i < n; i++) {
+      indexes[i] = _columnNames.indexOf(columnNames.get(i));
+    }
+
+    return indexes;
+  }
+
   // TODO:
   //   - Currently there is no check on column existence
   //   - Null value is not handled (default null value is returned)
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 54c7ac9cb4..081647b7d4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -282,7 +282,15 @@ public class ControllerRequestURLBuilder {
   }
 
   public String forTableDelete(String tableName) {
-    return StringUtil.join("/", _baseUrl, "tables", tableName);
+    return forTableDelete(tableName, null);
+  }
+
+  public String forTableDelete(String tableName, String retention) {
+    String url = StringUtil.join("/", _baseUrl, "tables", tableName);
+    if (retention != null) {
+      url += "?retention=" + retention;
+    }
+    return url;
   }
 
   public String forTableView(String tableName, String view, @Nullable String 
tableType) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to