Copilot commented on code in PR #16344:
URL: https://github.com/apache/pinot/pull/16344#discussion_r2254990199


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java:
##########
@@ -89,8 +189,6 @@ public boolean hasNext() {
       _nextRowReturned = false;

Review Comment:
   The logic for handling sorted vs unsorted document IDs is duplicated between 
`getNextRowFromSortedValidDocIds()` and `getNextRowFromBitmapIterator()`. 
Consider extracting the common record validation logic into a helper method to 
reduce duplication.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedRawIndexDictColumnStatistics.java:
##########
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Column statistics for dictionary columns with raw (non-dictionary-encoded) 
forward indexes.
+ * Reads raw values from forward index and maps them to dictionary IDs.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is NOT dictionary-encoded 
(forwardIndex.isDictionaryEncoded() == false)
+ * - Commit-time compaction is enabled
+ *
+ * Common scenarios:
+ * - Multi-value columns where forward index stores raw values but dictionary 
exists for other operations
+ * - Variable-length string columns optimized for sequential access
+ * - Consuming segments where dictionary is built separately from forward index
+ */
+public class CompactedRawIndexDictColumnStatistics extends 
MutableColumnStatistics {
+  private final Set<Integer> _usedDictIds;
+  private final int _compactedCardinality;
+  private final Object _compactedUniqueValues;
+
+  public CompactedRawIndexDictColumnStatistics(DataSource dataSource, int[] 
sortedDocIds,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    super(dataSource, sortedDocIds);
+
+    String columnName = 
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+    Dictionary dictionary = dataSource.getDictionary();
+    MutableForwardIndex forwardIndex = (MutableForwardIndex) 
dataSource.getForwardIndex();
+
+    // Since forward index is not dictionary-encoded, we need to read raw 
values and map them to dictionary IDs
+    _usedDictIds = new HashSet<>();
+    int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
+
+    // Read raw values from valid documents and find corresponding dictionary 
IDs
+    for (int docId : validDocIdsArray) {
+      Object rawValue = getRawValue(forwardIndex, docId);
+      if (rawValue != null) {
+        // Find the dictionary ID for this raw value using type-specific lookup
+        int dictId = getDictIdForValue(dictionary, rawValue, 
forwardIndex.getStoredType());
+        if (dictId >= 0) {
+          _usedDictIds.add(dictId);
+        }
+      }
+    }
+
+    _compactedCardinality = _usedDictIds.size();
+
+    // Create compacted unique values array
+    List<Object> usedValues = new ArrayList<>();
+    for (Integer dictId : _usedDictIds) {
+      usedValues.add(dictionary.get(dictId));
+    }
+
+    // Sort values for consistency
+    usedValues.sort((a, b) -> {
+      @SuppressWarnings("unchecked")
+      Comparable<Object> comparableA = (Comparable<Object>) a;
+      return comparableA.compareTo(b);

Review Comment:
   The unchecked cast suppression could be avoided by using a more type-safe 
approach. Consider using a Comparator with proper generic bounds instead of 
casting to Comparable<Object>.
   ```suggestion
         if (a == null && b == null) {
           return 0;
         }
         if (a == null) {
           return -1;
         }
         if (b == null) {
           return 1;
         }
         if (a instanceof Comparable && a.getClass().isInstance(b)) {
           @SuppressWarnings("unchecked")
           Comparable<Object> comparableA = (Comparable<Object>) a;
           return comparableA.compareTo(b);
         }
         throw new IllegalArgumentException("Values are not mutually 
comparable: " + a + ", " + b);
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedDictEncodedColumnStatistics.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Column statistics for dictionary columns with dictionary-encoded forward 
indexes.
+ * Uses direct getDictId() calls for single-value columns and getDictIdMV() 
calls for multi-value columns
+ * to find which dictionary entries are used by valid documents.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is dictionary-encoded (forwardIndex.isDictionaryEncoded() 
== true)
+ * - Commit-time compaction is enabled
+ */
+public class CompactedDictEncodedColumnStatistics extends 
MutableColumnStatistics {
+  private final Set<Integer> _usedDictIds;
+  private final int _compactedCardinality;
+  private final DataSource _dataSource;
+  private final Object _compactedUniqueValues;
+
+  public CompactedDictEncodedColumnStatistics(DataSource dataSource, int[] 
sortedDocIds,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    super(dataSource, sortedDocIds);
+    _dataSource = dataSource;
+
+    String columnName = 
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+
+    // Find which dictionary IDs are actually used by valid documents
+    _usedDictIds = new HashSet<>();
+    MutableForwardIndex forwardIndex = (MutableForwardIndex) 
dataSource.getForwardIndex();
+
+    // Iterate through valid document IDs
+    int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
+    boolean isSingleValue = forwardIndex.isSingleValue();
+
+    for (int docId : validDocIdsArray) {
+      if (isSingleValue) {
+        // Single-value column: use getDictId()
+        int dictId = forwardIndex.getDictId(docId);
+        _usedDictIds.add(dictId);
+      } else {
+        // Multi-value column: use getDictIdMV()
+        int[] dictIds = forwardIndex.getDictIdMV(docId);
+        for (int dictId : dictIds) {
+          _usedDictIds.add(dictId);
+        }
+      }
+    }
+
+    _compactedCardinality = _usedDictIds.size();
+
+    // Create compacted unique values array with only used dictionary values
+    Dictionary dictionary = dataSource.getDictionary();
+    Object originalValues = dictionary.getSortedValues();
+
+    // Extract the used values and sort them by value (not by dictionary ID)
+    List<ValueWithOriginalId> usedValuesWithIds = new ArrayList<>();
+    for (Integer dictId : _usedDictIds) {
+      Object value = dictionary.get(dictId);
+      usedValuesWithIds.add(new ValueWithOriginalId(value, dictId));
+    }
+
+    // Sort by values to ensure the compacted array is value-sorted
+    usedValuesWithIds.sort((a, b) -> {
+      @SuppressWarnings("unchecked")
+      Comparable<Object> comparableA = (Comparable<Object>) a._value;
+      return comparableA.compareTo(b._value);
+    });
+
+    // Create a compacted array containing only the used dictionary values in 
sorted order by value
+    Class<?> componentType = originalValues.getClass().getComponentType();
+    Object compacted = Array.newInstance(componentType, _compactedCardinality);
+
+    for (int i = 0; i < _compactedCardinality; i++) {
+      ValueWithOriginalId entry = usedValuesWithIds.get(i);

Review Comment:
   Similar to the raw index statistics class, the unchecked cast suppression 
could be avoided by using a more type-safe approach with proper generic bounds 
for the Comparator.
   ```suggestion
       Class<?> componentType = originalValues.getClass().getComponentType();
       // The cast is safe because dictionary values are always Comparable
       List<ValueWithOriginalId<Comparable<Object>>> usedValuesWithIds = new 
ArrayList<>();
       for (Integer dictId : _usedDictIds) {
         Comparable<Object> value = (Comparable<Object>) dictionary.get(dictId);
         usedValuesWithIds.add(new ValueWithOriginalId<>(value, dictId));
       }
   
       // Sort by values to ensure the compacted array is value-sorted
       usedValuesWithIds.sort((a, b) -> a._value.compareTo(b._value));
   
       // Create a compacted array containing only the used dictionary values 
in sorted order by value
       Object compacted = Array.newInstance(componentType, 
_compactedCardinality);
   
       for (int i = 0; i < _compactedCardinality; i++) {
         ValueWithOriginalId<Comparable<Object>> entry = 
usedValuesWithIds.get(i);
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -125,10 +162,58 @@ public void build(@Nullable SegmentVersion 
segmentVersion, @Nullable ServerMetri
     }
   }
 
+  /**
+   * Publishes detailed commit-time compaction metrics
+   */
+  private void publishCompactionMetrics(ServerMetrics serverMetrics, int 
preCompactionRowCount,
+      SegmentIndexCreationDriverImpl driver, long compactionStartTime) {
+    try {
+      int postCompactionRowCount = driver.getSegmentStats().getTotalDocCount();
+      long compactionProcessingTime = System.currentTimeMillis() - 
compactionStartTime;
+      int rowsRemoved = preCompactionRowCount - postCompactionRowCount;
+
+      // Publish basic row count metrics
+      serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_POST_COMPACTION,
+          postCompactionRowCount);
+      serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_REMOVED, rowsRemoved);
+      serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_BUILD_TIME_MS,
+          compactionProcessingTime);
+
+      // Calculate and publish compaction ratio percentage (only if we had 
rows to compact)
+      if (preCompactionRowCount > 0) {
+        double compactionRatioPercent = (double) rowsRemoved / 
preCompactionRowCount * 100.0;
+        serverMetrics.setOrUpdateTableGauge(_tableName, 
ServerGauge.COMMIT_TIME_COMPACTION_RATIO_PERCENT,
+            (long) compactionRatioPercent);
+      }
+    } catch (Exception e) {
+      //no-op.
+    }
+  }
+
+  /**
+   * Checks if commit-time compaction is enabled for upsert tables
+   */
+  private boolean isCommitTimeCompactionEnabled() {
+    if (_tableConfig.getUpsertConfig() == null) {
+      return false;
+    }
+
+    boolean commitTimeCompactionEnabled = 
_tableConfig.getUpsertConfig().isEnableCommitTimeCompaction();
+
+    // Validation: Commit-time compaction is currently only supported when 
column major build is disabled
+    if (commitTimeCompactionEnabled && _enableColumnMajor) {
+      throw new IllegalStateException(
+          "Commit-time compaction is not supported when column major segment 
builder is enabled. "
+              + "Please disable column major segment builder (set 
columnMajorSegmentBuilderEnabled=false) "
+              + "to use commit-time compaction for table: " + _tableName);
+    }
+

Review Comment:
   This validation logic is duplicated between `RealtimeSegmentConverter.java` 
and `TableConfigUtils.java`. Consider centralizing this validation in one 
location to avoid duplication and ensure consistency.
   ```suggestion
       // Centralized validation
       TableConfigUtils.validateCommitTimeCompactionConfig(_tableConfig, 
_enableColumnMajor, _tableName);
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java:
##########
@@ -97,24 +103,32 @@ synchronized void 
onSegmentCommit(CommittingSegmentDescriptor committingSegmentD
       }
       return;
     }
+
     long timeConsumed = _clock.millis() - 
committingSegmentZKMetadata.getCreationTime();
     int rowsThreshold = 
committingSegmentZKMetadata.getSizeThresholdToFlushSegment();
+
+    // Store values using the actual rows consumed for threshold calculations
     _timeConsumedForLastSegment = timeConsumed;
-    _rowsConsumedForLastSegment = rowsConsumed;
+    _rowsConsumedForLastSegment = (int) rowsForCalculation;
     _sizeForLastSegment = sizeInBytes;
     _rowsThresholdForLastSegment = rowsThreshold;
-    double segmentRatio = (double) rowsConsumed / sizeInBytes;
+
+    // Calculate ratio using actual rows (pre-commit if available)
+    double segmentRatio = (double) rowsForCalculation / sizeInBytes;
     double currentRatio = _segmentRowsToSizeRatio;
+
+      // Update the segment rows to size ratio using weighted average
     if (currentRatio > 0) {
       _segmentRowsToSizeRatio =
           CURRENT_SEGMENT_RATIO_WEIGHT * segmentRatio + 
PREVIOUS_SEGMENT_RATIO_WEIGHT * currentRatio;
     } else {
       _segmentRowsToSizeRatio = segmentRatio;
     }
-    LOGGER.info("Updated with segment: {}, time: {}, rows: {}, size: {}, 
ratio: {}, threshold: {}. "
-            + "Segment rows to size ratio got updated from: {} to: {}", 
segmentName,
-        TimeUtils.convertMillisToPeriod(timeConsumed), rowsConsumed, 
sizeInBytes, segmentRatio, rowsThreshold,
-        currentRatio, _segmentRowsToSizeRatio);
+    LOGGER.info("Updated segment: {}, time: {}, rows used for calculation: {} 
(pre-commit: {}, post-commit: {}), "
+            + "size: {}, consuming ratio: {:.6f}, threshold: {}. Segment rows 
to size ratio updated from: {:.6f} to: "
+            + "{:.6f}",

Review Comment:
   The logging format uses string concatenation in the log message. Consider 
using parameterized logging to avoid string construction when the log level is 
not enabled.
   ```suggestion
       LOGGER.info("Updated segment: {}, time: {}, rows used for calculation: 
{} (pre-commit: {}, post-commit: {}), size: {}, consuming ratio: {:.6f}, 
threshold: {}. Segment rows to size ratio updated from: {:.6f} to: {:.6f}",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to