Copilot commented on code in PR #17191:
URL: https://github.com/apache/pinot/pull/17191#discussion_r2522558173


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/AbstractSegmentCreator.java:
##########
@@ -0,0 +1,724 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.pinot.common.utils.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexPlugin;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.SegmentCreator;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.CommonsConfigurationUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
+
+
+/**
+ * Abstract base class for segment creators containing common functionality 
and metadata handling.
+ */
+public abstract class AbstractSegmentCreator implements SegmentCreator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSegmentCreator.class);
+  // Allow at most 512 characters for the metadata property
+  private static final int METADATA_PROPERTY_LENGTH_LIMIT = 512;
+
+  protected Schema _schema;
+  protected Map<String, ColumnIndexCreators> _colIndexes;
+  protected NavigableMap<String, ColumnIndexCreationInfo> 
_indexCreationInfoMap;
+
+  private int _totalDocs;
+  private SegmentGeneratorConfig _config;
+  private String _segmentName;
+  private File _indexDir;
+
+  /**
+   * Common initialization logic for setting up directory and basic fields.
+   */
+  protected void initializeCommon(SegmentGeneratorConfig segmentCreationSpec, 
SegmentIndexCreationInfo creationInfo,
+      NavigableMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, 
Schema schema, File outDir,
+      Map<String, ColumnIndexCreators> colIndexes, @Nullable int[] 
immutableToMutableIdMap)
+      throws Exception {
+    // Check that the output directory does not exist
+    Preconditions.checkState(!outDir.exists(), "Segment output directory: %s 
already exists", outDir);
+    Preconditions.checkState(outDir.mkdirs(), "Failed to create output 
directory: %s", outDir);
+
+    _config = segmentCreationSpec;
+    _colIndexes = colIndexes;
+    _indexCreationInfoMap = indexCreationInfoMap;
+    _indexDir = outDir;
+    _schema = schema;
+    _totalDocs = creationInfo.getTotalDocs();
+
+    initColSegmentCreationInfo(immutableToMutableIdMap);
+  }
+
+  private void initColSegmentCreationInfo(@Nullable int[] 
immutableToMutableIdMap)
+      throws Exception {
+    Map<String, FieldIndexConfigs> indexConfigs = 
_config.getIndexConfigsByColName();
+    for (String columnName : indexConfigs.keySet()) {
+      if (canColumnBeIndexed(columnName) && _totalDocs > 0) {
+        ColumnIndexCreators result = createColIndexeCreators(columnName, 
immutableToMutableIdMap);
+        _colIndexes.put(columnName, result);
+      } else {
+        FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+        _colIndexes.put(columnName,
+            new ColumnIndexCreators(columnName, fieldSpec, null, List.of(), 
null));
+      }
+    }
+  }
+
+  /**
+   * Initializes a single column's dictionary and index creators.
+   * This encapsulates the common logic shared between different segment 
creator implementations.
+   */
+  protected ColumnIndexCreators createColIndexeCreators(String columnName,
+      @Nullable int[] immutableToMutableIdMap)
+      throws Exception {
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+
+    FieldIndexConfigs originalConfig = 
_config.getIndexConfigsByColName().get(columnName);
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(columnName);
+    Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index 
creation info for column: %s", columnName);
+
+    boolean dictEnabledColumn = 
createDictionaryForColumn(columnIndexCreationInfo, _config, fieldSpec);
+    if (originalConfig.getConfig(StandardIndexes.inverted()).isEnabled()) {
+      Preconditions.checkState(dictEnabledColumn,
+          "Cannot create inverted index for raw index column: %s", columnName);
+    }
+    IndexCreationContext.Common context =
+        getIndexCreationContext(fieldSpec, dictEnabledColumn, 
immutableToMutableIdMap);
+
+    FieldIndexConfigs config = adaptConfig(columnName, originalConfig, 
columnIndexCreationInfo, _config);
+
+    SegmentDictionaryCreator dictionaryCreator = null;
+    if (dictEnabledColumn) {
+      dictionaryCreator = getDictionaryCreator(columnName, originalConfig, 
context);
+    }
+
+    List<IndexCreator> indexCreators = getIndexCreatorsByColumn(fieldSpec, 
context, config, dictEnabledColumn);
+
+    return new ColumnIndexCreators(columnName, fieldSpec, dictionaryCreator,
+        indexCreators, getNullValueCreator(fieldSpec));
+  }
+
+  private IndexCreationContext.Common getIndexCreationContext(FieldSpec 
fieldSpec, boolean dictEnabledColumn,
+      @Nullable int[] immutableToMutableIdMap) {
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(fieldSpec.getName());
+    FieldIndexConfigs fieldIndexConfig = 
_config.getIndexConfigsByColName().get(fieldSpec.getName());
+    boolean forwardIndexDisabled = 
!fieldIndexConfig.getConfig(StandardIndexes.forward()).isEnabled();
+
+    return IndexCreationContext.builder()
+        .withIndexDir(_indexDir)
+        .withDictionary(dictEnabledColumn)
+        .withFieldSpec(fieldSpec)
+        .withTotalDocs(_totalDocs)
+        .withColumnIndexCreationInfo(columnIndexCreationInfo)
+        .withOptimizedDictionary(_config.isOptimizeDictionary()
+            || _config.isOptimizeDictionaryForMetrics() && 
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
+        .onHeap(_config.isOnHeap())
+        .withForwardIndexDisabled(forwardIndexDisabled)
+        .withTextCommitOnClose(true)
+        .withImmutableToMutableIdMap(immutableToMutableIdMap)
+        .withRealtimeConversion(_config.isRealtimeConversion())
+        .withConsumerDir(_config.getConsumerDir())
+        .withTableNameWithType(_config.getTableConfig().getTableName())
+        .withContinueOnError(_config.isContinueOnError())
+        .build();
+  }
+
+  private SegmentDictionaryCreator getDictionaryCreator(String columnName, 
FieldIndexConfigs config,
+      IndexCreationContext.Common context)
+      throws IOException {
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(columnName);
+
+    // Create dictionary-encoded index
+    // Initialize dictionary creator
+    // TODO: Dictionary creator holds all unique values on heap. Consider 
keeping dictionary instead of creator
+    //       which uses off-heap memory.

Review Comment:
   This TODO comment should include a reference to track the work or be 
converted to an actionable task.
   ```suggestion
       // Note: The dictionary creator currently holds all unique values on 
heap. An alternative design could keep the dictionary itself off-heap to reduce 
heap usage.
       //       This may be considered for future optimization.
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentCreationFinalizer.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import 
org.apache.pinot.segment.local.segment.index.converter.SegmentFormatConverterFactory;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.MultiColumnTextIndexHandler;
+import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
+import org.apache.pinot.segment.local.utils.CrcUtils;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentCreator;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.IndexHandler;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Comprehensive segment finalization handler that manages all post-creation 
segment operations.
+ * This class encapsulates segment name generation, sealing, format 
conversion, index building,
+ * and metadata persistence.
+ */
+public class SegmentCreationFinalizer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentCreationFinalizer.class);
+
+  private final SegmentGeneratorConfig _config;
+  private final InstanceType _instanceType;
+  private final SegmentCreator _indexCreator;
+  private final ColumnStatistics _timeStats;
+  private final String _segmentName;
+  private final int _totalDocs;
+
+  /**
+   * @param config Segment generator configuration
+   * @param instanceType Instance type for metrics tracking (nullable - if 
null, metrics will not be tracked)
+   * @param indexCreator Segment creator instance
+   * @param timeStats Time column statistics (can be null)
+   */
+  public SegmentCreationFinalizer(SegmentGeneratorConfig config, @Nullable 
InstanceType instanceType,
+      SegmentCreator indexCreator, @Nullable ColumnStatistics timeStats, int 
totalDocs) {
+    _config = config;
+    _instanceType = instanceType;
+    _indexCreator = indexCreator;
+    _timeStats = timeStats;
+    _totalDocs = totalDocs;
+    _segmentName = generateSegmentName();
+  }
+
+  /**
+   * Get the generated segment name.
+   *
+   * @return Segment name
+   */
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  /**
+   * Finalize the segment with all post-creation operations.
+   * This method handles:
+   * - Sealing and closing the index creator
+   * - Directory operations (move temp to final location)
+   * - Format conversion (v1 to v3 if configured)
+   * - Star-tree V2 index building
+   * - Multi-column text index building
+   * - Post-segment creation index updates
+   * - CRC computation and metadata persistence
+   *
+   * @param currentIndexDir Current directory where indexes are built
+   * @return Final segment directory
+   * @throws Exception If finalization fails
+   */
+  public File finalizeSegment(File currentIndexDir)
+      throws Exception {
+    try {
+      // Write the index files to disk
+      _indexCreator.setSegmentName(_segmentName);
+      _indexCreator.seal();
+    } finally {
+      _indexCreator.close();
+    }
+    LOGGER.info("Finished segment seal for: {}", _segmentName);
+
+    // Delete the directory named after the segment name, if it exists
+    final File outputDir = new File(_config.getOutDir());
+    final File segmentOutputDir = new File(outputDir, _segmentName);
+    if (segmentOutputDir.exists()) {
+      FileUtils.deleteDirectory(segmentOutputDir);
+    }
+    // Move the temporary directory into its final location
+    FileUtils.moveDirectory(currentIndexDir, segmentOutputDir);
+    FileUtils.deleteQuietly(currentIndexDir);
+
+    // Format conversion
+    convertFormatIfNecessary(segmentOutputDir);
+
+    // Build indexes if there are documents
+    if (_totalDocs > 0) {
+      buildStarTreeV2IfNecessary(segmentOutputDir);
+      buildMultiColumnTextIndex(segmentOutputDir);
+    }
+
+    // Update post-creation indexes
+    updatePostSegmentCreationIndexes(segmentOutputDir);
+
+    // Persist creation metadata
+    persistCreationMeta(segmentOutputDir);
+
+    LOGGER.info("Successfully finalized segment: {}", _segmentName);
+    return segmentOutputDir;
+  }
+
+  /**
+   * Generate segment name based on configuration and statistics.
+   *
+   * @return Generated segment name
+   */
+  private String generateSegmentName() {
+    if (_timeStats != null) {
+      if (_totalDocs > 0) {
+        return _config.getSegmentNameGenerator()
+            .generateSegmentName(_config.getSequenceId(), 
_timeStats.getMinValue(), _timeStats.getMaxValue());
+      } else {
+        // When totalDoc is 0, check whether 'failOnEmptySegment' option is 
true
+        Preconditions.checkArgument(!_config.isFailOnEmptySegment(),
+            "Failing the empty segment creation as the option 
'failOnEmptySegment' is set to: "
+                + _config.isFailOnEmptySegment());
+        // Generate a unique name for a segment with no rows
+        long now = System.currentTimeMillis();
+        return 
_config.getSegmentNameGenerator().generateSegmentName(_config.getSequenceId(), 
now, now);
+      }
+    } else {
+      return 
_config.getSegmentNameGenerator().generateSegmentName(_config.getSequenceId(), 
null, null);
+    }
+  }
+
+  // Explanation of why we are using format converter:
+  // There are 3 options to correctly generate segments to v3 format
+  // 1. Generate v3 directly: This is efficient but v3 index writer needs to 
know buffer size upfront.
+  // Inverted, star and raw indexes don't have the index size upfront. This is 
also least flexible approach
+  // if we add more indexes in the future.

Review Comment:
   Corrected spelling of 'future' from 'the future' (removing 'the' for 
consistency with original comment style)
   ```suggestion
     // if we add more indexes in future.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to