Jackie-Jiang commented on code in PR #15130:
URL: https://github.com/apache/pinot/pull/15130#discussion_r2001624953


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -187,43 +221,79 @@ private DimensionTable createFastLookupDimensionTable() {
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
-    Map<PrimaryKey, GenericRow> lookupTable = new HashMap<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
     try {
+      // count all documents to limit map re-sizings
+      int totalDocs = 0;
+      for (SegmentDataManager segmentManager : segmentDataManagers) {
+        IndexSegment indexSegment = segmentManager.getSegment();
+        totalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+      }
+
+      Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable =
+          new Object2ObjectOpenCustomHashMap<>(totalDocs, HASH_STRATEGY);
+
+      List<String> valueColumns = getValueColumns(schema.getColumnNames(), 
primaryKeyColumns);
+
       for (SegmentDataManager segmentManager : segmentDataManagers) {
         IndexSegment indexSegment = segmentManager.getSegment();
         int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
         if (numTotalDocs > 0) {
           try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
             recordReader.init(indexSegment);
+
+            int[] pkIndexes = 
recordReader.getIndexesForColumns(primaryKeyColumns);
+            int[] valIndexes = recordReader.getIndexesForColumns(valueColumns);
+
             for (int i = 0; i < numTotalDocs; i++) {
               if (_loadToken.get() != token) {
                 // Token changed during the loading, abort the loading
                 return null;
               }
-              GenericRow row = new GenericRow();
-              recordReader.getRecord(i, row);
-              GenericRow previousRow = 
lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
-              if (_errorOnDuplicatePrimaryKey && previousRow != null) {
+
+              //TODO: consider inlining primary keys and values tables to 
reduce memory overhead
+              Object[] primaryKey = recordReader.getRecordValues(i, pkIndexes);
+              Object[] values = recordReader.getRecordValues(i, valIndexes);
+
+              Object[] previousValue = lookupTable.put(primaryKey, values);
+              if (_errorOnDuplicatePrimaryKey && previousValue != null) {
                 throw new IllegalStateException(
                     "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName()
-                        + "primary key already exist for: " + 
row.getPrimaryKey(primaryKeyColumns));
+                        + "primary key already exist for: " + 
toString(primaryKey));
               }
             }
           } catch (Exception e) {
             throw new RuntimeException(
-                "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName());
+                "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName(), e);
           }
         }
       }
-      return new FastLookupDimensionTable(schema, primaryKeyColumns, 
lookupTable);
+      return new FastLookupDimensionTable(schema, primaryKeyColumns, 
valueColumns, lookupTable);
     } finally {
       for (SegmentDataManager segmentManager : segmentDataManagers) {
         releaseSegment(segmentManager);
       }
     }
   }
 
+  private static String toString(Object[] primaryKey) {
+    try {
+      return JsonUtils.objectToPrettyString(primaryKey);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static List<String> getValueColumns(NavigableSet<String> 
columnNames, List<String> primaryKeyColumns) {

Review Comment:
   Value columns only apply to `FastLookupDimensionTable`. Consider moving this 
into `FastLookupDimensionTable` class



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -187,43 +221,79 @@ private DimensionTable createFastLookupDimensionTable() {
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
-    Map<PrimaryKey, GenericRow> lookupTable = new HashMap<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
     try {
+      // count all documents to limit map re-sizings
+      int totalDocs = 0;
+      for (SegmentDataManager segmentManager : segmentDataManagers) {
+        IndexSegment indexSegment = segmentManager.getSegment();
+        totalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+      }
+
+      Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable =
+          new Object2ObjectOpenCustomHashMap<>(totalDocs, HASH_STRATEGY);
+
+      List<String> valueColumns = getValueColumns(schema.getColumnNames(), 
primaryKeyColumns);
+
       for (SegmentDataManager segmentManager : segmentDataManagers) {
         IndexSegment indexSegment = segmentManager.getSegment();
         int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
         if (numTotalDocs > 0) {
           try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
             recordReader.init(indexSegment);
+
+            int[] pkIndexes = 
recordReader.getIndexesForColumns(primaryKeyColumns);
+            int[] valIndexes = recordReader.getIndexesForColumns(valueColumns);
+
             for (int i = 0; i < numTotalDocs; i++) {
               if (_loadToken.get() != token) {
                 // Token changed during the loading, abort the loading
                 return null;
               }
-              GenericRow row = new GenericRow();
-              recordReader.getRecord(i, row);
-              GenericRow previousRow = 
lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
-              if (_errorOnDuplicatePrimaryKey && previousRow != null) {
+
+              //TODO: consider inlining primary keys and values tables to 
reduce memory overhead
+              Object[] primaryKey = recordReader.getRecordValues(i, pkIndexes);
+              Object[] values = recordReader.getRecordValues(i, valIndexes);
+
+              Object[] previousValue = lookupTable.put(primaryKey, values);
+              if (_errorOnDuplicatePrimaryKey && previousValue != null) {
                 throw new IllegalStateException(
                     "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName()
-                        + "primary key already exist for: " + 
row.getPrimaryKey(primaryKeyColumns));
+                        + "primary key already exist for: " + 
toString(primaryKey));
               }
             }
           } catch (Exception e) {
             throw new RuntimeException(
-                "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName());
+                "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName(), e);
           }
         }
       }
-      return new FastLookupDimensionTable(schema, primaryKeyColumns, 
lookupTable);
+      return new FastLookupDimensionTable(schema, primaryKeyColumns, 
valueColumns, lookupTable);
     } finally {
       for (SegmentDataManager segmentManager : segmentDataManagers) {
         releaseSegment(segmentManager);
       }
     }
   }
 
+  private static String toString(Object[] primaryKey) {

Review Comment:
   Use `Arrays.toString()` instead



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -153,6 +178,15 @@ protected void doShutdown() {
     closeDimensionTable(_dimensionTable.get());
   }
 
+  /* This method is necessary for testing because shutdown doesn't release

Review Comment:
   Use proper javadoc format (starts with `/**`)



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -187,43 +221,79 @@ private DimensionTable createFastLookupDimensionTable() {
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
-    Map<PrimaryKey, GenericRow> lookupTable = new HashMap<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
     try {
+      // count all documents to limit map re-sizings

Review Comment:
   Not introduced in this PR, but the loading logic is specific to the 
dimension table implementation. Should we consider moving the loading logic 
into the implementation class?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -153,6 +178,15 @@ protected void doShutdown() {
     closeDimensionTable(_dimensionTable.get());
   }
 
+  /* This method is necessary for testing because shutdown doesn't release

Review Comment:
   Should we use this method in `doShutDown()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to