tarun11Mavani commented on code in PR #16344:
URL: https://github.com/apache/pinot/pull/16344#discussion_r2284727860
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java:
##########
@@ -57,28 +67,118 @@ public CompactedPinotSegmentRecordReader(RoaringBitmap
validDocIds,
_deleteRecordColumn = deleteRecordColumn;
}
+ public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap
validDocIds) {
+ this(validDocIds, null);
+ }
+
+ public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap
validDocIds,
+ @Nullable String deleteRecordColumn) {
+ Preconditions.checkNotNull(validDocIds, "Valid document IDs cannot be
null");
+ _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+ _validDocIdsBitmap =
validDocIds.getMutableRoaringBitmap().toRoaringBitmap();
+ _validDocIdsIterator = _validDocIdsBitmap.getIntIterator();
+ _deleteRecordColumn = deleteRecordColumn;
+ }
+
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
// lazy init the record reader
_pinotSegmentRecordReader.init(dataFile, null, null);
+ prepareSortedValidDocIds();
+ }
+
+ /**
+ * Initializes the record reader from a mutable segment with valid document
ids and optional sorted document ids.
+ *
+ * @param mutableSegment Mutable segment
+ * @param sortedDocIds Array of sorted document ids (can be null)
+ */
+ public void init(MutableSegment mutableSegment, @Nullable int[]
sortedDocIds) {
+ _pinotSegmentRecordReader.init(mutableSegment, sortedDocIds);
+ prepareSortedValidDocIds();
+ }
+
+ /**
+ * Prepares the sorted valid document IDs array based on whether sorted
document IDs are available.
+ * If sorted document IDs are available, creates an array of valid document
IDs in sorted order.
+ * If not available, falls back to bitmap iteration order.
+ */
+ private void prepareSortedValidDocIds() {
+ int[] sortedDocIds = _pinotSegmentRecordReader.getSortedDocIds();
Review Comment:
Done.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -96,23 +97,61 @@ public void build(@Nullable SegmentVersion segmentVersion,
@Nullable ServerMetri
_realtimeSegmentImpl.commit();
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
- String sortedColumn = null;
- List<String> columnSortOrder = genConfig.getColumnSortOrder();
- if (CollectionUtils.isNotEmpty(columnSortOrder)) {
- sortedColumn = columnSortOrder.get(0);
+
+ // Check if commit-time compaction is enabled for upsert tables
+ boolean useCompactedReader =
TableConfigUtils.isCommitTimeCompactionEnabled(_tableConfig);
+
+ String sortedColumn = null;
+ List<String> columnSortOrder = genConfig.getColumnSortOrder();
+ if (columnSortOrder != null && !columnSortOrder.isEmpty()) {
+ sortedColumn = columnSortOrder.get(0);
+ }
+ int[] sortedDocIds =
+ sortedColumn != null ?
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn)
: null;
+
+ if (useCompactedReader) {
+ // Collect metrics for commit-time compaction
+ long compactionStartTime = System.currentTimeMillis();
+ int preCompactionRowCount = _realtimeSegmentImpl.getNumDocsIndexed();
+ // Track that commit-time compaction is enabled for this segment
+ if (serverMetrics != null) {
+ serverMetrics.addMeteredTableValue(_tableName,
ServerMeter.COMMIT_TIME_COMPACTION_ENABLED_SEGMENTS, 1L);
+ serverMetrics.addMeteredTableValue(_tableName,
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_PRE_COMPACTION,
+ preCompactionRowCount);
+ }
+
+ // Use CompactedPinotSegmentRecordReader to remove obsolete records
+ try (CompactedPinotSegmentRecordReader recordReader = new
CompactedPinotSegmentRecordReader(
+ _realtimeSegmentImpl.getValidDocIds(),
_realtimeSegmentImpl.getDeleteRecordColumn())) {
+ recordReader.init(_realtimeSegmentImpl, sortedDocIds);
+ RealtimeSegmentSegmentCreationDataSource dataSource =
+ new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
recordReader, sortedDocIds);
+ driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
+
+ if (!_enableColumnMajor) {
+ driver.build();
+ } else {
+ driver.buildByColumn(_realtimeSegmentImpl);
+ }
}
- int[] sortedDocIds =
- sortedColumn != null ?
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn)
: null;
- recordReader.init(_realtimeSegmentImpl, sortedDocIds);
- RealtimeSegmentSegmentCreationDataSource dataSource =
- new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
recordReader);
- driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
-
- if (!_enableColumnMajor) {
- driver.build();
- } else {
- driver.buildByColumn(_realtimeSegmentImpl);
+
+ // Collect and publish post-compaction metrics
+ if (serverMetrics != null) {
+ publishCompactionMetrics(serverMetrics, preCompactionRowCount, driver,
compactionStartTime);
+ }
+ } else {
+ // Use regular PinotSegmentRecordReader (existing behavior)
+ try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
Review Comment:
Moved common logic to buildSegmentWithReader and reused.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]