Copilot commented on code in PR #16344:
URL: https://github.com/apache/pinot/pull/16344#discussion_r2254990199
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java:
##########
@@ -89,8 +189,6 @@ public boolean hasNext() {
_nextRowReturned = false;
Review Comment:
The logic for handling sorted vs unsorted document IDs is duplicated between
`getNextRowFromSortedValidDocIds()` and `getNextRowFromBitmapIterator()`.
Consider extracting the common record validation logic into a helper method to
reduce duplication.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedRawIndexDictColumnStatistics.java:
##########
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Column statistics for dictionary columns with raw (non-dictionary-encoded)
forward indexes.
+ * Reads raw values from forward index and maps them to dictionary IDs.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is NOT dictionary-encoded
(forwardIndex.isDictionaryEncoded() == false)
+ * - Commit-time compaction is enabled
+ *
+ * Common scenarios:
+ * - Multi-value columns where forward index stores raw values but dictionary
exists for other operations
+ * - Variable-length string columns optimized for sequential access
+ * - Consuming segments where dictionary is built separately from forward index
+ */
+public class CompactedRawIndexDictColumnStatistics extends
MutableColumnStatistics {
+ private final Set<Integer> _usedDictIds;
+ private final int _compactedCardinality;
+ private final Object _compactedUniqueValues;
+
+ public CompactedRawIndexDictColumnStatistics(DataSource dataSource, int[]
sortedDocIds,
+ ThreadSafeMutableRoaringBitmap validDocIds) {
+ super(dataSource, sortedDocIds);
+
+ String columnName =
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+ Dictionary dictionary = dataSource.getDictionary();
+ MutableForwardIndex forwardIndex = (MutableForwardIndex)
dataSource.getForwardIndex();
+
+ // Since forward index is not dictionary-encoded, we need to read raw
values and map them to dictionary IDs
+ _usedDictIds = new HashSet<>();
+ int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
+
+ // Read raw values from valid documents and find corresponding dictionary
IDs
+ for (int docId : validDocIdsArray) {
+ Object rawValue = getRawValue(forwardIndex, docId);
+ if (rawValue != null) {
+ // Find the dictionary ID for this raw value using type-specific lookup
+ int dictId = getDictIdForValue(dictionary, rawValue,
forwardIndex.getStoredType());
+ if (dictId >= 0) {
+ _usedDictIds.add(dictId);
+ }
+ }
+ }
+
+ _compactedCardinality = _usedDictIds.size();
+
+ // Create compacted unique values array
+ List<Object> usedValues = new ArrayList<>();
+ for (Integer dictId : _usedDictIds) {
+ usedValues.add(dictionary.get(dictId));
+ }
+
+ // Sort values for consistency
+ usedValues.sort((a, b) -> {
+ @SuppressWarnings("unchecked")
+ Comparable<Object> comparableA = (Comparable<Object>) a;
+ return comparableA.compareTo(b);
Review Comment:
The unchecked cast suppression could be avoided by using a more type-safe
approach. Consider using a Comparator with proper generic bounds instead of
casting to Comparable<Object>.
```suggestion
if (a == null && b == null) {
return 0;
}
if (a == null) {
return -1;
}
if (b == null) {
return 1;
}
if (a instanceof Comparable && a.getClass().isInstance(b)) {
@SuppressWarnings("unchecked")
Comparable<Object> comparableA = (Comparable<Object>) a;
return comparableA.compareTo(b);
}
throw new IllegalArgumentException("Values are not mutually
comparable: " + a + ", " + b);
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedDictEncodedColumnStatistics.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Column statistics for dictionary columns with dictionary-encoded forward
indexes.
+ * Uses direct getDictId() calls for single-value columns and getDictIdMV()
calls for multi-value columns
+ * to find which dictionary entries are used by valid documents.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is dictionary-encoded (forwardIndex.isDictionaryEncoded()
== true)
+ * - Commit-time compaction is enabled
+ */
+public class CompactedDictEncodedColumnStatistics extends
MutableColumnStatistics {
+ private final Set<Integer> _usedDictIds;
+ private final int _compactedCardinality;
+ private final DataSource _dataSource;
+ private final Object _compactedUniqueValues;
+
+ public CompactedDictEncodedColumnStatistics(DataSource dataSource, int[]
sortedDocIds,
+ ThreadSafeMutableRoaringBitmap validDocIds) {
+ super(dataSource, sortedDocIds);
+ _dataSource = dataSource;
+
+ String columnName =
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+
+ // Find which dictionary IDs are actually used by valid documents
+ _usedDictIds = new HashSet<>();
+ MutableForwardIndex forwardIndex = (MutableForwardIndex)
dataSource.getForwardIndex();
+
+ // Iterate through valid document IDs
+ int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
+ boolean isSingleValue = forwardIndex.isSingleValue();
+
+ for (int docId : validDocIdsArray) {
+ if (isSingleValue) {
+ // Single-value column: use getDictId()
+ int dictId = forwardIndex.getDictId(docId);
+ _usedDictIds.add(dictId);
+ } else {
+ // Multi-value column: use getDictIdMV()
+ int[] dictIds = forwardIndex.getDictIdMV(docId);
+ for (int dictId : dictIds) {
+ _usedDictIds.add(dictId);
+ }
+ }
+ }
+
+ _compactedCardinality = _usedDictIds.size();
+
+ // Create compacted unique values array with only used dictionary values
+ Dictionary dictionary = dataSource.getDictionary();
+ Object originalValues = dictionary.getSortedValues();
+
+ // Extract the used values and sort them by value (not by dictionary ID)
+ List<ValueWithOriginalId> usedValuesWithIds = new ArrayList<>();
+ for (Integer dictId : _usedDictIds) {
+ Object value = dictionary.get(dictId);
+ usedValuesWithIds.add(new ValueWithOriginalId(value, dictId));
+ }
+
+ // Sort by values to ensure the compacted array is value-sorted
+ usedValuesWithIds.sort((a, b) -> {
+ @SuppressWarnings("unchecked")
+ Comparable<Object> comparableA = (Comparable<Object>) a._value;
+ return comparableA.compareTo(b._value);
+ });
+
+ // Create a compacted array containing only the used dictionary values in
sorted order by value
+ Class<?> componentType = originalValues.getClass().getComponentType();
+ Object compacted = Array.newInstance(componentType, _compactedCardinality);
+
+ for (int i = 0; i < _compactedCardinality; i++) {
+ ValueWithOriginalId entry = usedValuesWithIds.get(i);
Review Comment:
Similar to the raw index statistics class, the unchecked cast suppression
could be avoided by using a more type-safe approach with proper generic bounds
for the Comparator.
```suggestion
Class<?> componentType = originalValues.getClass().getComponentType();
// The cast is safe because dictionary values are always Comparable
List<ValueWithOriginalId<Comparable<Object>>> usedValuesWithIds = new
ArrayList<>();
for (Integer dictId : _usedDictIds) {
Comparable<Object> value = (Comparable<Object>) dictionary.get(dictId);
usedValuesWithIds.add(new ValueWithOriginalId<>(value, dictId));
}
// Sort by values to ensure the compacted array is value-sorted
usedValuesWithIds.sort((a, b) -> a._value.compareTo(b._value));
// Create a compacted array containing only the used dictionary values
in sorted order by value
Object compacted = Array.newInstance(componentType,
_compactedCardinality);
for (int i = 0; i < _compactedCardinality; i++) {
ValueWithOriginalId<Comparable<Object>> entry =
usedValuesWithIds.get(i);
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -125,10 +162,58 @@ public void build(@Nullable SegmentVersion
segmentVersion, @Nullable ServerMetri
}
}
+ /**
+ * Publishes detailed commit-time compaction metrics
+ */
+ private void publishCompactionMetrics(ServerMetrics serverMetrics, int
preCompactionRowCount,
+ SegmentIndexCreationDriverImpl driver, long compactionStartTime) {
+ try {
+ int postCompactionRowCount = driver.getSegmentStats().getTotalDocCount();
+ long compactionProcessingTime = System.currentTimeMillis() -
compactionStartTime;
+ int rowsRemoved = preCompactionRowCount - postCompactionRowCount;
+
+ // Publish basic row count metrics
+ serverMetrics.addMeteredTableValue(_tableName,
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_POST_COMPACTION,
+ postCompactionRowCount);
+ serverMetrics.addMeteredTableValue(_tableName,
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_REMOVED, rowsRemoved);
+ serverMetrics.addMeteredTableValue(_tableName,
ServerMeter.COMMIT_TIME_COMPACTION_BUILD_TIME_MS,
+ compactionProcessingTime);
+
+ // Calculate and publish compaction ratio percentage (only if we had
rows to compact)
+ if (preCompactionRowCount > 0) {
+ double compactionRatioPercent = (double) rowsRemoved /
preCompactionRowCount * 100.0;
+ serverMetrics.setOrUpdateTableGauge(_tableName,
ServerGauge.COMMIT_TIME_COMPACTION_RATIO_PERCENT,
+ (long) compactionRatioPercent);
+ }
+ } catch (Exception e) {
+ //no-op.
+ }
+ }
+
+ /**
+ * Checks if commit-time compaction is enabled for upsert tables
+ */
+ private boolean isCommitTimeCompactionEnabled() {
+ if (_tableConfig.getUpsertConfig() == null) {
+ return false;
+ }
+
+ boolean commitTimeCompactionEnabled =
_tableConfig.getUpsertConfig().isEnableCommitTimeCompaction();
+
+ // Validation: Commit-time compaction is currently only supported when
column major build is disabled
+ if (commitTimeCompactionEnabled && _enableColumnMajor) {
+ throw new IllegalStateException(
+ "Commit-time compaction is not supported when column major segment
builder is enabled. "
+ + "Please disable column major segment builder (set
columnMajorSegmentBuilderEnabled=false) "
+ + "to use commit-time compaction for table: " + _tableName);
+ }
+
Review Comment:
This validation logic is duplicated between `RealtimeSegmentConverter.java`
and `TableConfigUtils.java`. Consider centralizing this validation in one
location to avoid duplication and ensure consistency.
```suggestion
// Centralized validation
TableConfigUtils.validateCommitTimeCompactionConfig(_tableConfig,
_enableColumnMajor, _tableName);
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java:
##########
@@ -97,24 +103,32 @@ synchronized void
onSegmentCommit(CommittingSegmentDescriptor committingSegmentD
}
return;
}
+
long timeConsumed = _clock.millis() -
committingSegmentZKMetadata.getCreationTime();
int rowsThreshold =
committingSegmentZKMetadata.getSizeThresholdToFlushSegment();
+
+ // Store values using the actual rows consumed for threshold calculations
_timeConsumedForLastSegment = timeConsumed;
- _rowsConsumedForLastSegment = rowsConsumed;
+ _rowsConsumedForLastSegment = (int) rowsForCalculation;
_sizeForLastSegment = sizeInBytes;
_rowsThresholdForLastSegment = rowsThreshold;
- double segmentRatio = (double) rowsConsumed / sizeInBytes;
+
+ // Calculate ratio using actual rows (pre-commit if available)
+ double segmentRatio = (double) rowsForCalculation / sizeInBytes;
double currentRatio = _segmentRowsToSizeRatio;
+
+ // Update the segment rows to size ratio using weighted average
if (currentRatio > 0) {
_segmentRowsToSizeRatio =
CURRENT_SEGMENT_RATIO_WEIGHT * segmentRatio +
PREVIOUS_SEGMENT_RATIO_WEIGHT * currentRatio;
} else {
_segmentRowsToSizeRatio = segmentRatio;
}
- LOGGER.info("Updated with segment: {}, time: {}, rows: {}, size: {},
ratio: {}, threshold: {}. "
- + "Segment rows to size ratio got updated from: {} to: {}",
segmentName,
- TimeUtils.convertMillisToPeriod(timeConsumed), rowsConsumed,
sizeInBytes, segmentRatio, rowsThreshold,
- currentRatio, _segmentRowsToSizeRatio);
+ LOGGER.info("Updated segment: {}, time: {}, rows used for calculation: {}
(pre-commit: {}, post-commit: {}), "
+ + "size: {}, consuming ratio: {:.6f}, threshold: {}. Segment rows
to size ratio updated from: {:.6f} to: "
+ + "{:.6f}",
Review Comment:
The logging format uses string concatenation in the log message. Consider
using parameterized logging to avoid string construction when the log level is
not enabled.
```suggestion
LOGGER.info("Updated segment: {}, time: {}, rows used for calculation:
{} (pre-commit: {}, post-commit: {}), size: {}, consuming ratio: {:.6f},
threshold: {}. Segment rows to size ratio updated from: {:.6f} to: {:.6f}",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]