Jackie-Jiang commented on code in PR #17191:
URL: https://github.com/apache/pinot/pull/17191#discussion_r2600601787


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java:
##########
@@ -0,0 +1,982 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import 
org.apache.pinot.segment.local.segment.index.converter.SegmentFormatConverterFactory;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexPlugin;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.MultiColumnTextIndexHandler;
+import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
+import org.apache.pinot.segment.local.utils.CrcUtils;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
+import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.SegmentCreator;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexHandler;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.CommonsConfigurationUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
+
+
+/**
+ * Abstract base class for segment creators containing common functionality 
and metadata handling.
+ */
+public abstract class BaseSegmentCreator implements SegmentCreator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseSegmentCreator.class);
+  // Allow at most 512 characters for the metadata property
+  private static final int METADATA_PROPERTY_LENGTH_LIMIT = 512;
+
+  protected Schema _schema;
+  protected Map<String, ColumnIndexCreators> _colIndexes;
+  protected NavigableMap<String, ColumnIndexCreationInfo> 
_indexCreationInfoMap;
+
+  private int _totalDocs;
+  private SegmentGeneratorConfig _config;
+  private String _segmentName;
+  private File _indexDir;
+  @Nullable
+  private InstanceType _instanceType;
+
+  /**
+   * Common initialization logic for setting up directory and basic fields.
+   */
+  protected void initializeCommon(SegmentGeneratorConfig segmentCreationSpec, 
SegmentIndexCreationInfo creationInfo,
+      NavigableMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, 
Schema schema, File outDir,
+      Map<String, ColumnIndexCreators> colIndexes, @Nullable int[] 
immutableToMutableIdMap,
+      @Nullable InstanceType instanceType)
+      throws Exception {
+    // Check that the output directory does not exist
+    Preconditions.checkState(!outDir.exists(), "Segment output directory: %s 
already exists", outDir);
+    Preconditions.checkState(outDir.mkdirs(), "Failed to create output 
directory: %s", outDir);
+
+    _config = segmentCreationSpec;
+    _colIndexes = colIndexes;
+    _indexCreationInfoMap = indexCreationInfoMap;
+    _indexDir = outDir;
+    _schema = schema;
+    _totalDocs = creationInfo.getTotalDocs();
+    _instanceType = instanceType;
+
+    initColSegmentCreationInfo(immutableToMutableIdMap);
+  }
+
+  private void initColSegmentCreationInfo(@Nullable int[] 
immutableToMutableIdMap)
+      throws Exception {
+    Map<String, FieldIndexConfigs> indexConfigs = 
_config.getIndexConfigsByColName();
+    for (String columnName : indexConfigs.keySet()) {
+      if (canColumnBeIndexed(columnName) && _totalDocs > 0 && 
_indexCreationInfoMap.containsKey(columnName)) {
+        ColumnIndexCreators result = createColIndexCreators(columnName, 
immutableToMutableIdMap);
+        _colIndexes.put(columnName, result);
+      } else {
+        FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+        _colIndexes.put(columnName,
+            new ColumnIndexCreators(columnName, fieldSpec, null, List.of(), 
null));
+      }
+    }
+  }
+
+  /**
+   * Initializes a single column's dictionary and index creators.
+   * This encapsulates the common logic shared between different segment 
creator implementations.
+   */
+  protected ColumnIndexCreators createColIndexCreators(String columnName,
+      @Nullable int[] immutableToMutableIdMap)
+      throws Exception {
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+
+    FieldIndexConfigs originalConfig = 
_config.getIndexConfigsByColName().get(columnName);
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(columnName);
+    Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index 
creation info for column: %s", columnName);
+
+    boolean dictEnabledColumn = 
createDictionaryForColumn(columnIndexCreationInfo, _config, fieldSpec);
+    if (originalConfig.getConfig(StandardIndexes.inverted()).isEnabled()) {
+      Preconditions.checkState(dictEnabledColumn,
+          "Cannot create inverted index for raw index column: %s", columnName);
+    }
+    IndexCreationContext.Common context =
+        getIndexCreationContext(fieldSpec, dictEnabledColumn, 
immutableToMutableIdMap);
+
+    FieldIndexConfigs config = adaptConfig(columnName, originalConfig, 
columnIndexCreationInfo, _config);
+
+    SegmentDictionaryCreator dictionaryCreator = null;
+    if (dictEnabledColumn) {
+      dictionaryCreator = getDictionaryCreator(columnName, originalConfig, 
context);
+    }
+
+    List<IndexCreator> indexCreators = getIndexCreatorsByColumn(fieldSpec, 
context, config, dictEnabledColumn);
+
+    return new ColumnIndexCreators(columnName, fieldSpec, dictionaryCreator,
+        indexCreators, getNullValueCreator(fieldSpec));
+  }
+
+  private IndexCreationContext.Common getIndexCreationContext(FieldSpec 
fieldSpec, boolean dictEnabledColumn,
+      @Nullable int[] immutableToMutableIdMap) {
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(fieldSpec.getName());
+    FieldIndexConfigs fieldIndexConfig = 
_config.getIndexConfigsByColName().get(fieldSpec.getName());
+    boolean forwardIndexDisabled = 
!fieldIndexConfig.getConfig(StandardIndexes.forward()).isEnabled();
+
+    return IndexCreationContext.builder()
+        .withIndexDir(_indexDir)
+        .withDictionary(dictEnabledColumn)
+        .withFieldSpec(fieldSpec)
+        .withTotalDocs(_totalDocs)
+        .withColumnIndexCreationInfo(columnIndexCreationInfo)
+        .withOptimizedDictionary(_config.isOptimizeDictionary()
+            || _config.isOptimizeDictionaryForMetrics() && 
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
+        .onHeap(_config.isOnHeap())
+        .withForwardIndexDisabled(forwardIndexDisabled)
+        .withTextCommitOnClose(true)
+        .withImmutableToMutableIdMap(immutableToMutableIdMap)
+        .withRealtimeConversion(_config.isRealtimeConversion())
+        .withConsumerDir(_config.getConsumerDir())
+        .withTableNameWithType(_config.getTableConfig().getTableName())
+        .withContinueOnError(_config.isContinueOnError())
+        .build();
+  }
+
+  private SegmentDictionaryCreator getDictionaryCreator(String columnName, 
FieldIndexConfigs config,
+      IndexCreationContext.Common context)
+      throws IOException {
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(columnName);
+
+    // Create dictionary-encoded index
+    // Initialize dictionary creator
+    // TODO: Dictionary creator holds all unique values on heap. Consider 
keeping dictionary instead of creator
+    //       which uses off-heap memory.
+    DictionaryIndexConfig dictConfig = 
config.getConfig(StandardIndexes.dictionary());
+    if (!dictConfig.isEnabled()) {
+      LOGGER.info("Creating dictionary index in column {}.{} even when it is 
disabled in config",
+          _config.getTableName(), columnName);
+    }
+
+    // override dictionary type if configured to do so
+    if (_config.isOptimizeDictionaryType()) {
+      LOGGER.info("Overriding dictionary type for column: {} using var-length 
dictionary: {}", columnName,
+          columnIndexCreationInfo.isUseVarLengthDictionary());
+      dictConfig = new DictionaryIndexConfig(dictConfig, 
columnIndexCreationInfo.isUseVarLengthDictionary());
+    }
+
+    SegmentDictionaryCreator dictionaryCreator =
+        new DictionaryIndexPlugin().getIndexType().createIndexCreator(context, 
dictConfig);
+
+    try {
+      dictionaryCreator.build(context.getSortedUniqueElementsArray());
+    } catch (Exception e) {
+      LOGGER.error("Error building dictionary for field: {}, cardinality: {}, 
number of bytes per entry: {}",
+          context.getFieldSpec().getName(), context.getCardinality(), 
dictionaryCreator.getNumBytesPerEntry());
+      throw e;
+    }
+    return dictionaryCreator;
+  }
+
+  private List<IndexCreator> getIndexCreatorsByColumn(FieldSpec fieldSpec, 
IndexCreationContext.Common context,
+      FieldIndexConfigs config, boolean dictEnabledColumn)
+      throws Exception {
+    Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
+        
Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size());
+    for (IndexType<?, ?, ?> index : 
IndexService.getInstance().getAllIndexes()) {
+      if (index.getIndexBuildLifecycle() != 
IndexType.BuildLifecycle.DURING_SEGMENT_CREATION) {
+        continue;
+      }
+      tryCreateIndexCreator(creatorsByIndex, index, context, config);
+    }
+
+    // TODO: Remove this when values stored as ForwardIndex stop depending on 
TextIndex config
+    IndexCreator oldFwdCreator = 
creatorsByIndex.get(StandardIndexes.forward());
+    if (oldFwdCreator != null) {
+      Object fakeForwardValue = 
calculateRawValueForTextIndex(dictEnabledColumn, config, fieldSpec);
+      if (fakeForwardValue != null) {
+        ForwardIndexCreator castedOldFwdCreator = (ForwardIndexCreator) 
oldFwdCreator;
+        SameValueForwardIndexCreator fakeValueFwdCreator =
+            new SameValueForwardIndexCreator(fakeForwardValue, 
castedOldFwdCreator);
+        creatorsByIndex.put(StandardIndexes.forward(), fakeValueFwdCreator);
+      }
+    }
+    return new ArrayList<>(creatorsByIndex.values());
+  }
+
+  private NullValueVectorCreator getNullValueCreator(FieldSpec fieldSpec) {
+    // Although NullValueVector is implemented as an index, it needs to be 
treated in a different way than other indexes
+    String columnName = fieldSpec.getName();
+    if (isNullable(fieldSpec)) {
+      // Initialize Null value vector map
+      LOGGER.info("Column: {} is nullable", columnName);
+      return new NullValueVectorCreator(_indexDir, columnName);
+    } else {
+      LOGGER.info("Column: {} is not nullable", columnName);
+      return null;
+    }
+  }
+
+  protected boolean canColumnBeIndexed(String columnName) {
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+    Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in 
the schema", columnName);
+    if (fieldSpec.isVirtualColumn()) {
+      LOGGER.warn("Ignoring index creation for virtual column {}", columnName);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Checks if a field is nullable based on schema and config settings.
+   */
+  private boolean isNullable(FieldSpec fieldSpec) {
+    return _schema.isEnableColumnBasedNullHandling() ? fieldSpec.isNullable() 
: _config.isDefaultNullHandlingEnabled();
+  }
+
+  /**
+   * Adapts field index configs based on column properties.
+   */
+  private FieldIndexConfigs adaptConfig(String columnName, FieldIndexConfigs 
config,
+      ColumnIndexCreationInfo columnIndexCreationInfo, SegmentGeneratorConfig 
segmentCreationSpec) {
+    FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder(config);
+    // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
+    ForwardIndexConfig fwdConfig = config.getConfig(StandardIndexes.forward());
+    if (!fwdConfig.isEnabled() && columnIndexCreationInfo.isSorted()) {
+      builder.add(StandardIndexes.forward(),
+          new 
ForwardIndexConfig.Builder(fwdConfig).withLegacyProperties(segmentCreationSpec.getColumnProperties(),
+              columnName).build());
+    }
+    // Initialize inverted index creator; skip creating inverted index if 
sorted
+    if (columnIndexCreationInfo.isSorted()) {
+      builder.undeclare(StandardIndexes.inverted());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Creates the {@link IndexCreator} in a type safe way.
+   *
+   * This code needs to be in a specific method instead of inlined in the main 
loop in order to be able to use the
+   * limited generic capabilities of Java.
+   */
+  private <C extends IndexConfig> void tryCreateIndexCreator(Map<IndexType<?, 
?, ?>, IndexCreator> creatorsByIndex,
+      IndexType<C, ?, ?> index, IndexCreationContext.Common context, 
FieldIndexConfigs fieldIndexConfigs)
+      throws Exception {
+    C config = fieldIndexConfigs.getConfig(index);
+    if (config.isEnabled()) {
+      creatorsByIndex.put(index, index.createIndexCreator(context, config));
+    }
+  }
+
+  /**
+   * Returns true if dictionary should be created for a column, false 
otherwise.
+   * Currently there are two sources for this config:
+   * <ul>
+   *   <li> ColumnIndexCreationInfo (this is currently hard-coded to always 
return dictionary). </li>
+   *   <li> SegmentGeneratorConfig</li>
+   * </ul>
+   *
+   * This method gives preference to the SegmentGeneratorConfig first.
+   *
+   * @param info Column index creation info
+   * @param config Segment generation config
+   * @param spec Field spec for the column
+   * @return True if dictionary should be created for the column, false 
otherwise
+   */
+  private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, 
SegmentGeneratorConfig config,
+      FieldSpec spec) {
+    if (spec instanceof ComplexFieldSpec) {
+      return false;
+    }
+
+    String column = spec.getName();
+    FieldIndexConfigs fieldIndexConfigs = 
config.getIndexConfigsByColName().get(column);
+    if 
(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isDisabled()) {
+      return false;
+    }
+
+    return 
DictionaryIndexType.ignoreDictionaryOverride(config.isOptimizeDictionary(),
+        config.isOptimizeDictionaryForMetrics(), 
config.getNoDictionarySizeRatioThreshold(),
+        config.getNoDictionaryCardinalityRatioThreshold(), spec, 
fieldIndexConfigs, info.getDistinctValueCount(),
+        info.getTotalNumberOfEntries());
+  }
+
+  /**
+   * Calculates the raw value to be used for text index when forward index is 
disabled.
+   */
+  @Nullable
+  private Object calculateRawValueForTextIndex(boolean dictEnabledColumn, 
FieldIndexConfigs configs,
+      FieldSpec fieldSpec) {
+    if (dictEnabledColumn) {
+      return null;
+    }
+    org.apache.pinot.segment.spi.index.TextIndexConfig textConfig = 
configs.getConfig(StandardIndexes.text());
+    if (!textConfig.isEnabled()) {
+      return null;
+    }
+
+    Object rawValue = textConfig.getRawValueForTextIndex();
+
+    if (rawValue == null) {
+      return null;
+    } else if (!fieldSpec.isSingleValueField()) {
+      if (fieldSpec.getDataType().getStoredType() == 
FieldSpec.DataType.STRING) {
+        if (!(rawValue instanceof String[])) {
+          rawValue = new String[]{String.valueOf(rawValue)};
+        }
+      } else if (fieldSpec.getDataType().getStoredType() == 
FieldSpec.DataType.BYTES) {
+        if (!(rawValue instanceof String[])) {
+          rawValue = new 
byte[][]{String.valueOf(rawValue).getBytes(StandardCharsets.UTF_8)};
+        }
+      } else {
+        throw new RuntimeException("Text Index is only supported for STRING 
and BYTES stored type");
+      }
+    }
+    return rawValue;
+  }
+
+  /**
+   * Writes segment metadata to disk.
+   */
+  protected void writeMetadata()
+      throws ConfigurationException {
+    File metadataFile = new File(_indexDir, 
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+    PropertiesConfiguration properties = 
CommonsConfigurationUtils.fromFile(metadataFile);
+
+    properties.setProperty(SEGMENT_CREATOR_VERSION, 
_config.getCreatorVersion());
+    properties.setProperty(SEGMENT_PADDING_CHARACTER, 
String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR));
+    properties.setProperty(SEGMENT_NAME, _segmentName);
+    properties.setProperty(TABLE_NAME, _config.getTableName());
+    properties.setProperty(DIMENSIONS, _config.getDimensions());
+    properties.setProperty(METRICS, _config.getMetrics());
+    properties.setProperty(DATETIME_COLUMNS, _config.getDateTimeColumnNames());
+    properties.setProperty(COMPLEX_COLUMNS, _config.getComplexColumnNames());
+    String timeColumnName = _config.getTimeColumnName();
+    properties.setProperty(TIME_COLUMN_NAME, timeColumnName);
+    properties.setProperty(SEGMENT_TOTAL_DOCS, String.valueOf(_totalDocs));
+
+    // Write time related metadata (start time, end time, time unit)
+    if (timeColumnName != null) {
+      ColumnIndexCreationInfo timeColumnIndexCreationInfo = 
_indexCreationInfoMap.get(timeColumnName);
+      if (timeColumnIndexCreationInfo != null) {
+        try {
+          long startTime;
+          long endTime;
+          TimeUnit timeUnit;
+
+          // Use start/end time in config if defined
+          if (_config.getStartTime() != null) {
+            startTime = Long.parseLong(_config.getStartTime());
+            endTime = Long.parseLong(_config.getEndTime());
+            timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+          } else {
+            if (_totalDocs > 0) {
+              String startTimeStr = 
timeColumnIndexCreationInfo.getMin().toString();
+              String endTimeStr = 
timeColumnIndexCreationInfo.getMax().toString();
+
+              if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+                // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value 
into millis since epoch
+                // Use DateTimeFormatter from DateTimeFormatSpec to handle 
default time zone consistently.
+                DateTimeFormatSpec formatSpec = 
_config.getDateTimeFormatSpec();
+                Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec 
must exist for SimpleDate");
+                DateTimeFormatter dateTimeFormatter = 
formatSpec.getDateTimeFormatter();
+                startTime = dateTimeFormatter.parseMillis(startTimeStr);
+                endTime = dateTimeFormatter.parseMillis(endTimeStr);
+                timeUnit = TimeUnit.MILLISECONDS;
+              } else {
+                // by default, time column type is TimeColumnType.EPOCH
+                startTime = Long.parseLong(startTimeStr);
+                endTime = Long.parseLong(endTimeStr);
+                timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+              }
+            } else {
+              // No records in segment. Use current time as start/end
+              long now = System.currentTimeMillis();
+              if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+                startTime = now;
+                endTime = now;
+                timeUnit = TimeUnit.MILLISECONDS;
+              } else {
+                timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+                startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+                endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+              }
+            }
+          }
+
+          if (!_config.isSkipTimeValueCheck()) {
+            Interval timeInterval =
+                new Interval(timeUnit.toMillis(startTime), 
timeUnit.toMillis(endTime), DateTimeZone.UTC);
+            
Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
+                "Invalid segment start/end time: %s (in millis: %s/%s) for 
time column: %s, must be between: %s",
+                timeInterval, timeInterval.getStartMillis(), 
timeInterval.getEndMillis(), timeColumnName,
+                TimeUtils.VALID_TIME_INTERVAL);
+          }
+
+          properties.setProperty(SEGMENT_START_TIME, startTime);
+          properties.setProperty(SEGMENT_END_TIME, endTime);
+          properties.setProperty(TIME_UNIT, timeUnit);
+        } catch (Exception e) {
+          if (!_config.isContinueOnError()) {
+            throw e;
+          }
+          TimeUnit timeUnit;
+          long now = System.currentTimeMillis();
+          long convertedStartTime;
+          long convertedEndTime;
+          if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+            convertedEndTime = now;
+            convertedStartTime = TimeUtils.getValidMinTimeMillis();
+            timeUnit = TimeUnit.MILLISECONDS;
+          } else {
+            timeUnit = _config.getSegmentTimeUnit();
+            if (timeUnit != null) {
+              convertedEndTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+              convertedStartTime = 
timeUnit.convert(TimeUtils.getValidMinTimeMillis(), TimeUnit.MILLISECONDS);
+            } else {
+              // Use millis as the time unit if not able to infer from config
+              timeUnit = TimeUnit.MILLISECONDS;
+              convertedEndTime = now;
+              convertedStartTime = TimeUtils.getValidMinTimeMillis();
+            }
+          }
+          LOGGER.warn(
+              "Caught exception while writing time metadata for segment: {}, 
time column: {}, total docs: {}. "
+                  + "Continuing using current time ({}) as the end time, and 
min valid time ({}) as the start time "
+                  + "for the segment (time unit: {}).",
+              _segmentName, timeColumnName, _totalDocs, convertedEndTime, 
convertedStartTime, timeUnit, e);
+          properties.setProperty(SEGMENT_START_TIME, convertedStartTime);
+          properties.setProperty(SEGMENT_END_TIME, convertedEndTime);
+          properties.setProperty(TIME_UNIT, timeUnit);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry : 
_config.getCustomProperties().entrySet()) {
+      properties.setProperty(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, ColumnIndexCreationInfo> entry : 
_indexCreationInfoMap.entrySet()) {
+      String column = entry.getKey();
+      ColumnIndexCreationInfo columnIndexCreationInfo = entry.getValue();
+      SegmentDictionaryCreator dictionaryCreator = 
_colIndexes.get(column).getDictionaryCreator();
+      int dictionaryElementSize = (dictionaryCreator != null) ? 
dictionaryCreator.getNumBytesPerEntry() : 0;
+      addColumnMetadataInfo(properties, column, columnIndexCreationInfo, 
_totalDocs, _schema.getFieldSpecFor(column),
+          dictionaryCreator != null, dictionaryElementSize);
+    }
+
+    SegmentZKPropsConfig segmentZKPropsConfig = 
_config.getSegmentZKPropsConfig();
+    if (segmentZKPropsConfig != null) {
+      properties.setProperty(Realtime.START_OFFSET, 
segmentZKPropsConfig.getStartOffset());
+      properties.setProperty(Realtime.END_OFFSET, 
segmentZKPropsConfig.getEndOffset());
+    }
+    CommonsConfigurationUtils.saveToFile(properties, metadataFile);
+  }
+
+  /**
+   * Adds column metadata information to the properties configuration.
+   */
+  public static void addColumnMetadataInfo(PropertiesConfiguration properties, 
String column,
+      ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, 
FieldSpec fieldSpec, boolean hasDictionary,
+      int dictionaryElementSize) {
+    int cardinality = columnIndexCreationInfo.getDistinctValueCount();
+    properties.setProperty(getKeyFor(column, CARDINALITY), 
String.valueOf(cardinality));
+    properties.setProperty(getKeyFor(column, TOTAL_DOCS), 
String.valueOf(totalDocs));
+    DataType dataType = fieldSpec.getDataType();
+    properties.setProperty(getKeyFor(column, DATA_TYPE), 
String.valueOf(dataType));
+    // TODO: When the column is raw (no dictionary), we should set 
BITS_PER_ELEMENT to -1 (invalid). Currently we set
+    //       it regardless of whether dictionary is created or not for 
backward compatibility because
+    //       ForwardIndexHandler doesn't update this value when converting a 
raw column to dictionary encoded.
+    //       Consider changing it after releasing 1.5.0.
+    //       See https://github.com/apache/pinot/pull/16921 for details
+    properties.setProperty(getKeyFor(column, BITS_PER_ELEMENT),
+        
String.valueOf(org.apache.pinot.segment.local.io.util.PinotDataBitSet.getNumBitsPerValue(cardinality
 - 1)));
+    FieldType fieldType = fieldSpec.getFieldType();
+    properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), 
String.valueOf(dictionaryElementSize));
+    properties.setProperty(getKeyFor(column, COLUMN_TYPE), 
String.valueOf(fieldType));
+    properties.setProperty(getKeyFor(column, IS_SORTED), 
String.valueOf(columnIndexCreationInfo.isSorted()));
+    properties.setProperty(getKeyFor(column, HAS_DICTIONARY), 
String.valueOf(hasDictionary));
+    properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), 
String.valueOf(fieldSpec.isSingleValueField()));
+    properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS),
+        
String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
+    properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
+        String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
+    properties.setProperty(getKeyFor(column, IS_AUTO_GENERATED),
+        String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
+    DataType storedType = dataType.getStoredType();
+    if (storedType == DataType.STRING || storedType == DataType.BYTES) {
+      properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH), 
fieldSpec.getEffectiveMaxLength());
+      // TODO let's revisit writing effective maxLengthStrategy into metadata, 
as changing it right now may affect
+      //  segment's CRC value
+      FieldSpec.MaxLengthExceedStrategy maxLengthStrategy = 
fieldSpec.getMaxLengthExceedStrategy();
+      if (maxLengthStrategy != null) {
+        properties.setProperty(getKeyFor(column, 
SCHEMA_MAX_LENGTH_EXCEED_STRATEGY),
+            fieldSpec.getMaxLengthExceedStrategy());
+      }
+    }
+
+    org.apache.pinot.segment.spi.partition.PartitionFunction partitionFunction 
=
+        columnIndexCreationInfo.getPartitionFunction();
+    if (partitionFunction != null) {
+      properties.setProperty(getKeyFor(column, PARTITION_FUNCTION), 
partitionFunction.getName());
+      properties.setProperty(getKeyFor(column, NUM_PARTITIONS), 
columnIndexCreationInfo.getNumPartitions());
+      properties.setProperty(getKeyFor(column, PARTITION_VALUES), 
columnIndexCreationInfo.getPartitions());
+      if (columnIndexCreationInfo.getPartitionFunctionConfig() != null) {
+        for (Map.Entry<String, String> entry : 
columnIndexCreationInfo.getPartitionFunctionConfig().entrySet()) {
+          properties.setProperty(getKeyFor(column, String.format("%s.%s", 
PARTITION_FUNCTION_CONFIG, entry.getKey())),
+              entry.getValue());
+        }
+      }
+    }
+
+    // Datetime field
+    if (fieldType == FieldType.DATE_TIME) {
+      DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, DATETIME_FORMAT), 
dateTimeFieldSpec.getFormat());
+      properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), 
dateTimeFieldSpec.getGranularity());
+    }
+
+    if (fieldType != FieldType.COMPLEX) {
+      // Regular (non-complex) field
+      if (totalDocs > 0) {
+        Object min = columnIndexCreationInfo.getMin();
+        Object max = columnIndexCreationInfo.getMax();
+        // NOTE:
+        // Min/max could be null for real-time aggregate metrics. We don't 
directly call addColumnMinMaxValueInfo() to
+        // avoid setting MIN_MAX_VALUE_INVALID flag, which will prevent 
ColumnMinMaxValueGenerator from generating them
+        // when loading the segment.
+        if (min != null && max != null) {
+          addColumnMinMaxValueInfo(properties, column, min, max, storedType);
+        }
+      }
+    } else {
+      // Complex field
+      ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES),
+          new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet()));
+      for (Map.Entry<String, FieldSpec> entry : 
complexFieldSpec.getChildFieldSpecs().entrySet()) {
+        addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, 
entry.getKey()), entry.getValue());
+      }
+    }
+
+    // TODO: Revisit whether we should set default null value for complex field
+    String defaultNullValue = 
columnIndexCreationInfo.getDefaultNullValue().toString();
+    if (storedType == DataType.STRING) {
+      // NOTE: Do not limit length of default null value because we need exact 
value to determine whether the default
+      //       null value changes
+      defaultNullValue = 
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(defaultNullValue);
+    }
+    if (defaultNullValue != null) {
+      properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), 
defaultNullValue);
+    }
+  }
+
+  /**
+   * In order to persist complex field metadata, we need to recursively add 
child field specs
+   * So, each complex field spec will have a property for its child field 
names and each child field will have its
+   * own properties of the detailed field spec.
+   * E.g. a COMPLEX type `intMap` of Map<String, Integer> has 2 child fields:
+   *   - key in STRING type and value in INT type.
+   *   Then we will have the following properties to define a COMPLEX field:
+   *     column.intMap.childFieldNames = [key, value]
+   *     column.intMap$$key.columnType = DIMENSION
+   *     column.intMap$$key.dataType = STRING
+   *     column.intMap$$key.isSingleValued = true
+   *     column.intMap$$value.columnType = DIMENSION
+   *     column.intMap$$value.dataType = INT
+   *     column.intMap$$value.isSingleValued = true
+   */
+  public static void addFieldSpec(PropertiesConfiguration properties, String 
column, FieldSpec fieldSpec) {
+    properties.setProperty(getKeyFor(column, COLUMN_TYPE), 
String.valueOf(fieldSpec.getFieldType()));
+    if (!column.equals(fieldSpec.getName())) {
+      properties.setProperty(getKeyFor(column, COLUMN_NAME), 
String.valueOf(fieldSpec.getName()));
+    }
+    DataType dataType = fieldSpec.getDataType();
+    properties.setProperty(getKeyFor(column, DATA_TYPE), 
String.valueOf(dataType));
+    properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), 
String.valueOf(fieldSpec.isSingleValueField()));
+    if (dataType.equals(DataType.STRING) || dataType.equals(DataType.BYTES) || 
dataType.equals(DataType.JSON)) {
+      properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH), 
fieldSpec.getEffectiveMaxLength());
+      FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = 
fieldSpec.getEffectiveMaxLengthExceedStrategy();
+      if (maxLengthExceedStrategy != null) {
+        properties.setProperty(getKeyFor(column, 
SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), maxLengthExceedStrategy);
+      }
+    }
+
+    // datetime field
+    if (fieldSpec.getFieldType() == FieldType.DATE_TIME) {
+      DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, DATETIME_FORMAT), 
dateTimeFieldSpec.getFormat());
+      properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), 
dateTimeFieldSpec.getGranularity());
+    }
+
+    // complex field
+    if (fieldSpec.getFieldType() == FieldType.COMPLEX) {
+      ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES),
+          new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet()));
+      for (Map.Entry<String, FieldSpec> entry : 
complexFieldSpec.getChildFieldSpecs().entrySet()) {
+        addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, 
entry.getKey()), entry.getValue());
+      }
+    }
+  }
+
+  /**
+   * Adds column min/max value information to the properties configuration.
+   */
+  public static void addColumnMinMaxValueInfo(PropertiesConfiguration 
properties, String column,
+      @Nullable Object minValue, @Nullable Object maxValue, DataType 
storedType) {
+    String validMinValue = minValue != null ? 
getValidPropertyValue(minValue.toString(), storedType) : null;
+    if (validMinValue != null) {
+      properties.setProperty(getKeyFor(column, MIN_VALUE), validMinValue);
+    }
+    String validMaxValue = maxValue != null ? 
getValidPropertyValue(maxValue.toString(), storedType) : null;
+    if (validMaxValue != null) {
+      properties.setProperty(getKeyFor(column, MAX_VALUE), validMaxValue);
+    }
+    if (validMinValue == null && validMaxValue == null) {
+      properties.setProperty(getKeyFor(column, MIN_MAX_VALUE_INVALID), true);
+    }
+  }
+
+  /**
+   * Helper method to get the valid value for setting min/max. Returns {@code 
null} if the value is too long (longer
+   * than 512 characters), or is not supported in {@link 
PropertiesConfiguration}, e.g. contains character with
+   * surrogate.
+   */
+  @Nullable
+  private static String getValidPropertyValue(String value, DataType 
storedType) {
+    if (value.length() > METADATA_PROPERTY_LENGTH_LIMIT) {
+      return null;
+    }
+    return storedType == DataType.STRING ? 
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(value)
+        : value;
+  }
+
+  /**
+   * Removes column metadata information from the properties configuration.
+   */
+  public static void removeColumnMetadataInfo(PropertiesConfiguration 
properties, String column) {
+    properties.subset(COLUMN_PROPS_KEY_PREFIX + column).clear();
+  }
+
+  @Override
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  /**
+   * Writes the individual column index files to disk.
+   */
+  protected abstract void flushColIndexes() throws Exception;
+
+  @Override
+  public void seal()
+      throws Exception {
+    _segmentName = generateSegmentName();
+    try {
+      // Write the index files to disk
+      flushColIndexes();
+    } finally {
+      close();
+    }
+    LOGGER.info("Finished segment seal for: {}", _segmentName);
+
+    // Format conversion
+    convertFormatIfNecessary(_indexDir);
+
+    // Build indexes if there are documents
+    if (_totalDocs > 0) {
+      buildStarTreeV2IfNecessary(_indexDir);
+      buildMultiColumnTextIndex(_indexDir);
+    }
+
+    // Update post-creation indexes
+    updatePostSegmentCreationIndexes(_indexDir);
+
+    // Persist creation metadata
+    persistCreationMeta(_indexDir);
+
+    LOGGER.info("Successfully created segment: {} in {}", _segmentName, 
_indexDir);
+  }
+
+  /**
+   * Generate segment name based on configuration and statistics.
+   * @return Generated segment name
+   */
+  private String generateSegmentName() {
+    ColumnStatistics timeStats = null;
+    String timeColumn = _config.getTimeColumnName();
+    if (timeColumn != null) {
+      timeStats = _indexCreationInfoMap.get(timeColumn).getColumnStatistics();
+    }
+
+    if (timeStats != null) {
+      if (_totalDocs > 0) {
+        return _config.getSegmentNameGenerator()
+            .generateSegmentName(_config.getSequenceId(), 
timeStats.getMinValue(), timeStats.getMaxValue());
+      } else {
+        // When totalDoc is 0, check whether 'failOnEmptySegment' option is 
true
+        Preconditions.checkArgument(!_config.isFailOnEmptySegment(),
+            "Failing the empty segment creation as the option 
'failOnEmptySegment' is set to: "
+                + _config.isFailOnEmptySegment());
+        // Generate a unique name for a segment with no rows
+        long now = System.currentTimeMillis();
+        return 
_config.getSegmentNameGenerator().generateSegmentName(_config.getSequenceId(), 
now, now);
+      }
+    } else {
+      return 
_config.getSegmentNameGenerator().generateSegmentName(_config.getSequenceId(), 
null, null);
+    }
+  }
+
+  // Explanation of why we are using format converter:
+  // There are 3 options to correctly generate segments to v3 format
+  // 1. Generate v3 directly: This is efficient but v3 index writer needs to 
know buffer size upfront.
+  // Inverted, star and raw indexes don't have the index size upfront. This is 
also the least flexible approach
+  // if we add more indexes in the future.
+  // 2. Hold data in-memory: One way to work around predeclaring sizes in (1) 
is to allocate "large" buffer (2GB?)
+  // and hold the data in memory and write the buffer at the end. The memory 
requirement in this case increases linearly
+  // with the number of columns. Variation of that is to mmap data to separate 
files...which is what we are doing here
+  // 3. Another option is to generate dictionary and fwd indexes in v3 and 
generate inverted, star and raw indexes in
+  // separate files. Then add those files to v3 index file. This leads to lot 
of hodgepodge code to
+  // handle multiple segment formats.
+  // Using converter is similar to option (2), plus it's battle-tested code. 
We will roll out with
+  // this change to keep changes limited. Once we've migrated we can implement 
approach (1) with option to
+  // copy for indexes for which we don't know sizes upfront.
+  private void convertFormatIfNecessary(File segmentDirectory)
+      throws Exception {
+    SegmentVersion versionToGenerate = _config.getSegmentVersion();
+    if (versionToGenerate.equals(SegmentVersion.v1)) {
+      // v1 by default
+      return;
+    }
+    SegmentFormatConverter converter =
+        SegmentFormatConverterFactory.getConverter(SegmentVersion.v1, 
SegmentVersion.v3);
+    converter.convert(segmentDirectory);
+  }
+
+  /**
+   * Build star-tree V2 index if configured.
+   */
+  private void buildStarTreeV2IfNecessary(File indexDir)
+      throws Exception {
+    List<StarTreeIndexConfig> starTreeIndexConfigs = 
_config.getStarTreeIndexConfigs();
+    boolean enableDefaultStarTree = _config.isEnableDefaultStarTree();
+    if (CollectionUtils.isNotEmpty(starTreeIndexConfigs) || 
enableDefaultStarTree) {
+      MultipleTreesBuilder.BuildMode buildMode =
+          _config.isOnHeap() ? MultipleTreesBuilder.BuildMode.ON_HEAP : 
MultipleTreesBuilder.BuildMode.OFF_HEAP;
+      MultipleTreesBuilder builder = new 
MultipleTreesBuilder(starTreeIndexConfigs, enableDefaultStarTree, indexDir,
+          buildMode);
+      // We don't create the builder using the try-with-resources pattern 
because builder.close() performs
+      // some clean-up steps to roll back the star-tree index to the previous 
state if it exists. If this goes wrong
+      // the star-tree index can be in an inconsistent state. To prevent that, 
when builder.close() throws an
+      // exception we want to propagate that up instead of ignoring it. This 
can get clunky when using
+      // try-with-resources as in this scenario the close() exception will be 
added to the suppressed exception list
+      // rather than thrown as the main exception, even though the original 
exception thrown on build() is ignored.
+      try {
+        builder.build();
+      } catch (Exception e) {
+        String tableNameWithType = _config.getTableConfig().getTableName();
+        LOGGER.error("Failed to build star-tree index for table: {}, 
skipping", tableNameWithType, e);
+        // Track metrics only if instance type is provided
+        if (_instanceType != null) {
+          if (_instanceType == InstanceType.MINION) {
+            MinionMetrics.get().addMeteredTableValue(tableNameWithType, 
MinionMeter.STAR_TREE_INDEX_BUILD_FAILURES, 1);
+          } else {
+            ServerMetrics.get().addMeteredTableValue(tableNameWithType, 
ServerMeter.STAR_TREE_INDEX_BUILD_FAILURES, 1);
+          }
+        }
+      } finally {
+        builder.close();
+      }
+    }
+  }
+
+  /**
+   * Build multi-column text index if configured.
+   */
+  private void buildMultiColumnTextIndex(File segmentOutputDir)
+      throws Exception {
+    if (_config.getMultiColumnTextIndexConfig() != null) {
+      PinotConfiguration segmentDirectoryConfigs =
+          new PinotConfiguration(Map.of(IndexLoadingConfig.READ_MODE_KEY, 
ReadMode.mmap));
+
+      TableConfig tableConfig = _config.getTableConfig();
+      Schema schema = _config.getSchema();
+      SegmentDirectoryLoaderContext segmentLoaderContext =
+          new SegmentDirectoryLoaderContext.Builder()
+              .setTableConfig(tableConfig)
+              .setSchema(schema)
+              .setSegmentName(_segmentName)
+              .setSegmentDirectoryConfigs(segmentDirectoryConfigs)
+              .build();
+
+      IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
tableConfig, schema);
+
+      try (SegmentDirectory segmentDirectory = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+          .load(segmentOutputDir.toURI(), segmentLoaderContext);
+          SegmentDirectory.Writer segmentWriter = 
segmentDirectory.createWriter()) {
+        MultiColumnTextIndexHandler handler = new 
MultiColumnTextIndexHandler(segmentDirectory, indexLoadingConfig,
+            _config.getMultiColumnTextIndexConfig());
+        handler.updateIndices(segmentWriter);
+        handler.postUpdateIndicesCleanup(segmentWriter);
+      }
+    }
+  }
+
+  /**
+   * Update indexes that are created post-segment creation.
+   */
+  private void updatePostSegmentCreationIndexes(File indexDir)
+      throws Exception {
+    Set<IndexType> postSegCreationIndexes = 
IndexService.getInstance().getAllIndexes().stream()
+        .filter(indexType -> indexType.getIndexBuildLifecycle() == 
IndexType.BuildLifecycle.POST_SEGMENT_CREATION)
+        .collect(Collectors.toSet());
+
+    if (!postSegCreationIndexes.isEmpty()) {
+      // Build other indexes
+      Map<String, Object> props = new HashMap<>();
+      props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap);
+      PinotConfiguration segmentDirectoryConfigs = new 
PinotConfiguration(props);
+
+      TableConfig tableConfig = _config.getTableConfig();
+      Schema schema = _config.getSchema();
+      SegmentDirectoryLoaderContext segmentLoaderContext =
+          new SegmentDirectoryLoaderContext.Builder()
+              .setTableConfig(tableConfig)
+              .setSchema(schema)
+              .setSegmentName(_segmentName)
+              .setSegmentDirectoryConfigs(segmentDirectoryConfigs)
+              .build();
+
+      IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
tableConfig, schema);
+
+      try (SegmentDirectory segmentDirectory = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+          .load(indexDir.toURI(), segmentLoaderContext);
+          SegmentDirectory.Writer segmentWriter = 
segmentDirectory.createWriter()) {
+        for (IndexType indexType : postSegCreationIndexes) {
+          IndexHandler handler =
+              indexType.createIndexHandler(segmentDirectory, 
indexLoadingConfig.getFieldIndexConfigByColName(), schema,
+                  tableConfig);
+          handler.updateIndices(segmentWriter);
+        }
+      }
+    }
+  }
+
+  /**
+   * Compute CRC and creation time, and persist to segment metadata file.
+   *
+   * @param indexDir Segment index directory
+   * @throws IOException If writing metadata fails
+   */
+  private void persistCreationMeta(File indexDir)
+      throws IOException {
+    long crc = CrcUtils.forAllFilesInFolder(indexDir).computeCrc();
+    long creationTime;
+    String creationTimeInConfig = _config.getCreationTime();
+    if (creationTimeInConfig != null) {
+      try {
+        creationTime = Long.parseLong(creationTimeInConfig);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while parsing creation time in config, 
use current time as creation time");
+        creationTime = System.currentTimeMillis();
+      }
+    } else {
+      creationTime = System.currentTimeMillis();
+    }
+    File segmentDir = SegmentDirectoryPaths.findSegmentDirectory(indexDir);
+    File creationMetaFile = new File(segmentDir, 
V1Constants.SEGMENT_CREATION_META);
+    try (DataOutputStream output = new DataOutputStream(new 
FileOutputStream(creationMetaFile))) {
+      output.writeLong(crc);
+      output.writeLong(creationTime);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    List<Closeable> creators = new ArrayList<>();
+    for (ColumnIndexCreators colIndexes : _colIndexes.values()) {
+      if (colIndexes.getDictionaryCreator() != null) {
+        creators.add(colIndexes.getDictionaryCreator());
+      }
+      if (colIndexes.getNullValueVectorCreator() != null) {
+        creators.add(colIndexes.getNullValueVectorCreator());
+      }
+      creators.addAll(colIndexes.getIndexCreators());
+    }
+    org.apache.pinot.common.utils.FileUtils.close(creators);

Review Comment:
   (minor) Import the class



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java:
##########
@@ -0,0 +1,982 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import 
org.apache.pinot.segment.local.segment.index.converter.SegmentFormatConverterFactory;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexPlugin;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.MultiColumnTextIndexHandler;
+import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
+import org.apache.pinot.segment.local.utils.CrcUtils;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
+import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.SegmentCreator;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexHandler;
+import org.apache.pinot.segment.spi.index.IndexService;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.CommonsConfigurationUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
+
+
+/**
+ * Abstract base class for segment creators containing common functionality 
and metadata handling.
+ */
+public abstract class BaseSegmentCreator implements SegmentCreator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseSegmentCreator.class);
+  // Allow at most 512 characters for the metadata property
+  private static final int METADATA_PROPERTY_LENGTH_LIMIT = 512;
+
+  protected Schema _schema;
+  protected Map<String, ColumnIndexCreators> _colIndexes;
+  protected NavigableMap<String, ColumnIndexCreationInfo> 
_indexCreationInfoMap;
+
+  private int _totalDocs;
+  private SegmentGeneratorConfig _config;
+  private String _segmentName;
+  private File _indexDir;
+  @Nullable
+  private InstanceType _instanceType;
+
+  /**
+   * Common initialization logic for setting up directory and basic fields.
+   */
+  protected void initializeCommon(SegmentGeneratorConfig segmentCreationSpec, 
SegmentIndexCreationInfo creationInfo,
+      NavigableMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, 
Schema schema, File outDir,
+      Map<String, ColumnIndexCreators> colIndexes, @Nullable int[] 
immutableToMutableIdMap,
+      @Nullable InstanceType instanceType)
+      throws Exception {
+    // Check that the output directory does not exist
+    Preconditions.checkState(!outDir.exists(), "Segment output directory: %s 
already exists", outDir);
+    Preconditions.checkState(outDir.mkdirs(), "Failed to create output 
directory: %s", outDir);
+
+    _config = segmentCreationSpec;
+    _colIndexes = colIndexes;
+    _indexCreationInfoMap = indexCreationInfoMap;
+    _indexDir = outDir;
+    _schema = schema;
+    _totalDocs = creationInfo.getTotalDocs();
+    _instanceType = instanceType;
+
+    initColSegmentCreationInfo(immutableToMutableIdMap);
+  }
+
+  private void initColSegmentCreationInfo(@Nullable int[] 
immutableToMutableIdMap)
+      throws Exception {
+    Map<String, FieldIndexConfigs> indexConfigs = 
_config.getIndexConfigsByColName();
+    for (String columnName : indexConfigs.keySet()) {
+      if (canColumnBeIndexed(columnName) && _totalDocs > 0 && 
_indexCreationInfoMap.containsKey(columnName)) {
+        ColumnIndexCreators result = createColIndexCreators(columnName, 
immutableToMutableIdMap);
+        _colIndexes.put(columnName, result);
+      } else {
+        FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+        _colIndexes.put(columnName,
+            new ColumnIndexCreators(columnName, fieldSpec, null, List.of(), 
null));
+      }
+    }
+  }
+
+  /**
+   * Initializes a single column's dictionary and index creators.
+   * This encapsulates the common logic shared between different segment 
creator implementations.
+   */
+  protected ColumnIndexCreators createColIndexCreators(String columnName,
+      @Nullable int[] immutableToMutableIdMap)
+      throws Exception {
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+
+    FieldIndexConfigs originalConfig = 
_config.getIndexConfigsByColName().get(columnName);
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(columnName);
+    Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index 
creation info for column: %s", columnName);
+
+    boolean dictEnabledColumn = 
createDictionaryForColumn(columnIndexCreationInfo, _config, fieldSpec);
+    if (originalConfig.getConfig(StandardIndexes.inverted()).isEnabled()) {
+      Preconditions.checkState(dictEnabledColumn,
+          "Cannot create inverted index for raw index column: %s", columnName);
+    }
+    IndexCreationContext.Common context =
+        getIndexCreationContext(fieldSpec, dictEnabledColumn, 
immutableToMutableIdMap);
+
+    FieldIndexConfigs config = adaptConfig(columnName, originalConfig, 
columnIndexCreationInfo, _config);
+
+    SegmentDictionaryCreator dictionaryCreator = null;
+    if (dictEnabledColumn) {
+      dictionaryCreator = getDictionaryCreator(columnName, originalConfig, 
context);
+    }
+
+    List<IndexCreator> indexCreators = getIndexCreatorsByColumn(fieldSpec, 
context, config, dictEnabledColumn);
+
+    return new ColumnIndexCreators(columnName, fieldSpec, dictionaryCreator,
+        indexCreators, getNullValueCreator(fieldSpec));
+  }
+
+  private IndexCreationContext.Common getIndexCreationContext(FieldSpec 
fieldSpec, boolean dictEnabledColumn,
+      @Nullable int[] immutableToMutableIdMap) {
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(fieldSpec.getName());
+    FieldIndexConfigs fieldIndexConfig = 
_config.getIndexConfigsByColName().get(fieldSpec.getName());
+    boolean forwardIndexDisabled = 
!fieldIndexConfig.getConfig(StandardIndexes.forward()).isEnabled();
+
+    return IndexCreationContext.builder()
+        .withIndexDir(_indexDir)
+        .withDictionary(dictEnabledColumn)
+        .withFieldSpec(fieldSpec)
+        .withTotalDocs(_totalDocs)
+        .withColumnIndexCreationInfo(columnIndexCreationInfo)
+        .withOptimizedDictionary(_config.isOptimizeDictionary()
+            || _config.isOptimizeDictionaryForMetrics() && 
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
+        .onHeap(_config.isOnHeap())
+        .withForwardIndexDisabled(forwardIndexDisabled)
+        .withTextCommitOnClose(true)
+        .withImmutableToMutableIdMap(immutableToMutableIdMap)
+        .withRealtimeConversion(_config.isRealtimeConversion())
+        .withConsumerDir(_config.getConsumerDir())
+        .withTableNameWithType(_config.getTableConfig().getTableName())
+        .withContinueOnError(_config.isContinueOnError())
+        .build();
+  }
+
+  private SegmentDictionaryCreator getDictionaryCreator(String columnName, 
FieldIndexConfigs config,
+      IndexCreationContext.Common context)
+      throws IOException {
+    ColumnIndexCreationInfo columnIndexCreationInfo = 
_indexCreationInfoMap.get(columnName);
+
+    // Create dictionary-encoded index
+    // Initialize dictionary creator
+    // TODO: Dictionary creator holds all unique values on heap. Consider 
keeping dictionary instead of creator
+    //       which uses off-heap memory.
+    DictionaryIndexConfig dictConfig = 
config.getConfig(StandardIndexes.dictionary());
+    if (!dictConfig.isEnabled()) {
+      LOGGER.info("Creating dictionary index in column {}.{} even when it is 
disabled in config",
+          _config.getTableName(), columnName);
+    }
+
+    // override dictionary type if configured to do so
+    if (_config.isOptimizeDictionaryType()) {
+      LOGGER.info("Overriding dictionary type for column: {} using var-length 
dictionary: {}", columnName,
+          columnIndexCreationInfo.isUseVarLengthDictionary());
+      dictConfig = new DictionaryIndexConfig(dictConfig, 
columnIndexCreationInfo.isUseVarLengthDictionary());
+    }
+
+    SegmentDictionaryCreator dictionaryCreator =
+        new DictionaryIndexPlugin().getIndexType().createIndexCreator(context, 
dictConfig);
+
+    try {
+      dictionaryCreator.build(context.getSortedUniqueElementsArray());
+    } catch (Exception e) {
+      LOGGER.error("Error building dictionary for field: {}, cardinality: {}, 
number of bytes per entry: {}",
+          context.getFieldSpec().getName(), context.getCardinality(), 
dictionaryCreator.getNumBytesPerEntry());
+      throw e;
+    }
+    return dictionaryCreator;
+  }
+
+  private List<IndexCreator> getIndexCreatorsByColumn(FieldSpec fieldSpec, 
IndexCreationContext.Common context,
+      FieldIndexConfigs config, boolean dictEnabledColumn)
+      throws Exception {
+    Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
+        
Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size());
+    for (IndexType<?, ?, ?> index : 
IndexService.getInstance().getAllIndexes()) {
+      if (index.getIndexBuildLifecycle() != 
IndexType.BuildLifecycle.DURING_SEGMENT_CREATION) {
+        continue;
+      }
+      tryCreateIndexCreator(creatorsByIndex, index, context, config);
+    }
+
+    // TODO: Remove this when values stored as ForwardIndex stop depending on 
TextIndex config
+    IndexCreator oldFwdCreator = 
creatorsByIndex.get(StandardIndexes.forward());
+    if (oldFwdCreator != null) {
+      Object fakeForwardValue = 
calculateRawValueForTextIndex(dictEnabledColumn, config, fieldSpec);
+      if (fakeForwardValue != null) {
+        ForwardIndexCreator castedOldFwdCreator = (ForwardIndexCreator) 
oldFwdCreator;
+        SameValueForwardIndexCreator fakeValueFwdCreator =
+            new SameValueForwardIndexCreator(fakeForwardValue, 
castedOldFwdCreator);
+        creatorsByIndex.put(StandardIndexes.forward(), fakeValueFwdCreator);
+      }
+    }
+    return new ArrayList<>(creatorsByIndex.values());
+  }
+
+  private NullValueVectorCreator getNullValueCreator(FieldSpec fieldSpec) {
+    // Although NullValueVector is implemented as an index, it needs to be 
treated in a different way than other indexes
+    String columnName = fieldSpec.getName();
+    if (isNullable(fieldSpec)) {
+      // Initialize Null value vector map
+      LOGGER.info("Column: {} is nullable", columnName);
+      return new NullValueVectorCreator(_indexDir, columnName);
+    } else {
+      LOGGER.info("Column: {} is not nullable", columnName);
+      return null;
+    }
+  }
+
+  protected boolean canColumnBeIndexed(String columnName) {
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+    Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in 
the schema", columnName);
+    if (fieldSpec.isVirtualColumn()) {
+      LOGGER.warn("Ignoring index creation for virtual column {}", columnName);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Checks if a field is nullable based on schema and config settings.
+   */
+  private boolean isNullable(FieldSpec fieldSpec) {
+    return _schema.isEnableColumnBasedNullHandling() ? fieldSpec.isNullable() 
: _config.isDefaultNullHandlingEnabled();
+  }
+
+  /**
+   * Adapts field index configs based on column properties.
+   */
+  private FieldIndexConfigs adaptConfig(String columnName, FieldIndexConfigs 
config,
+      ColumnIndexCreationInfo columnIndexCreationInfo, SegmentGeneratorConfig 
segmentCreationSpec) {
+    FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder(config);
+    // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
+    ForwardIndexConfig fwdConfig = config.getConfig(StandardIndexes.forward());
+    if (!fwdConfig.isEnabled() && columnIndexCreationInfo.isSorted()) {
+      builder.add(StandardIndexes.forward(),
+          new 
ForwardIndexConfig.Builder(fwdConfig).withLegacyProperties(segmentCreationSpec.getColumnProperties(),
+              columnName).build());
+    }
+    // Initialize inverted index creator; skip creating inverted index if 
sorted
+    if (columnIndexCreationInfo.isSorted()) {
+      builder.undeclare(StandardIndexes.inverted());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Creates the {@link IndexCreator} in a type safe way.
+   *
+   * This code needs to be in a specific method instead of inlined in the main 
loop in order to be able to use the
+   * limited generic capabilities of Java.
+   */
+  private <C extends IndexConfig> void tryCreateIndexCreator(Map<IndexType<?, 
?, ?>, IndexCreator> creatorsByIndex,
+      IndexType<C, ?, ?> index, IndexCreationContext.Common context, 
FieldIndexConfigs fieldIndexConfigs)
+      throws Exception {
+    C config = fieldIndexConfigs.getConfig(index);
+    if (config.isEnabled()) {
+      creatorsByIndex.put(index, index.createIndexCreator(context, config));
+    }
+  }
+
+  /**
+   * Returns true if dictionary should be created for a column, false 
otherwise.
+   * Currently there are two sources for this config:
+   * <ul>
+   *   <li> ColumnIndexCreationInfo (this is currently hard-coded to always 
return dictionary). </li>
+   *   <li> SegmentGeneratorConfig</li>
+   * </ul>
+   *
+   * This method gives preference to the SegmentGeneratorConfig first.
+   *
+   * @param info Column index creation info
+   * @param config Segment generation config
+   * @param spec Field spec for the column
+   * @return True if dictionary should be created for the column, false 
otherwise
+   */
+  private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, 
SegmentGeneratorConfig config,
+      FieldSpec spec) {
+    if (spec instanceof ComplexFieldSpec) {
+      return false;
+    }
+
+    String column = spec.getName();
+    FieldIndexConfigs fieldIndexConfigs = 
config.getIndexConfigsByColName().get(column);
+    if 
(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isDisabled()) {
+      return false;
+    }
+
+    return 
DictionaryIndexType.ignoreDictionaryOverride(config.isOptimizeDictionary(),
+        config.isOptimizeDictionaryForMetrics(), 
config.getNoDictionarySizeRatioThreshold(),
+        config.getNoDictionaryCardinalityRatioThreshold(), spec, 
fieldIndexConfigs, info.getDistinctValueCount(),
+        info.getTotalNumberOfEntries());
+  }
+
+  /**
+   * Calculates the raw value to be used for text index when forward index is 
disabled.
+   */
+  @Nullable
+  private Object calculateRawValueForTextIndex(boolean dictEnabledColumn, 
FieldIndexConfigs configs,
+      FieldSpec fieldSpec) {
+    if (dictEnabledColumn) {
+      return null;
+    }
+    org.apache.pinot.segment.spi.index.TextIndexConfig textConfig = 
configs.getConfig(StandardIndexes.text());
+    if (!textConfig.isEnabled()) {
+      return null;
+    }
+
+    Object rawValue = textConfig.getRawValueForTextIndex();
+
+    if (rawValue == null) {
+      return null;
+    } else if (!fieldSpec.isSingleValueField()) {
+      if (fieldSpec.getDataType().getStoredType() == 
FieldSpec.DataType.STRING) {
+        if (!(rawValue instanceof String[])) {
+          rawValue = new String[]{String.valueOf(rawValue)};
+        }
+      } else if (fieldSpec.getDataType().getStoredType() == 
FieldSpec.DataType.BYTES) {
+        if (!(rawValue instanceof String[])) {
+          rawValue = new 
byte[][]{String.valueOf(rawValue).getBytes(StandardCharsets.UTF_8)};
+        }
+      } else {
+        throw new RuntimeException("Text Index is only supported for STRING 
and BYTES stored type");
+      }
+    }
+    return rawValue;
+  }
+
+  /**
+   * Writes segment metadata to disk.
+   */
+  protected void writeMetadata()
+      throws ConfigurationException {
+    File metadataFile = new File(_indexDir, 
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+    PropertiesConfiguration properties = 
CommonsConfigurationUtils.fromFile(metadataFile);
+
+    properties.setProperty(SEGMENT_CREATOR_VERSION, 
_config.getCreatorVersion());
+    properties.setProperty(SEGMENT_PADDING_CHARACTER, 
String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR));
+    properties.setProperty(SEGMENT_NAME, _segmentName);
+    properties.setProperty(TABLE_NAME, _config.getTableName());
+    properties.setProperty(DIMENSIONS, _config.getDimensions());
+    properties.setProperty(METRICS, _config.getMetrics());
+    properties.setProperty(DATETIME_COLUMNS, _config.getDateTimeColumnNames());
+    properties.setProperty(COMPLEX_COLUMNS, _config.getComplexColumnNames());
+    String timeColumnName = _config.getTimeColumnName();
+    properties.setProperty(TIME_COLUMN_NAME, timeColumnName);
+    properties.setProperty(SEGMENT_TOTAL_DOCS, String.valueOf(_totalDocs));
+
+    // Write time related metadata (start time, end time, time unit)
+    if (timeColumnName != null) {
+      ColumnIndexCreationInfo timeColumnIndexCreationInfo = 
_indexCreationInfoMap.get(timeColumnName);
+      if (timeColumnIndexCreationInfo != null) {
+        try {
+          long startTime;
+          long endTime;
+          TimeUnit timeUnit;
+
+          // Use start/end time in config if defined
+          if (_config.getStartTime() != null) {
+            startTime = Long.parseLong(_config.getStartTime());
+            endTime = Long.parseLong(_config.getEndTime());
+            timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+          } else {
+            if (_totalDocs > 0) {
+              String startTimeStr = 
timeColumnIndexCreationInfo.getMin().toString();
+              String endTimeStr = 
timeColumnIndexCreationInfo.getMax().toString();
+
+              if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+                // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value 
into millis since epoch
+                // Use DateTimeFormatter from DateTimeFormatSpec to handle 
default time zone consistently.
+                DateTimeFormatSpec formatSpec = 
_config.getDateTimeFormatSpec();
+                Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec 
must exist for SimpleDate");
+                DateTimeFormatter dateTimeFormatter = 
formatSpec.getDateTimeFormatter();
+                startTime = dateTimeFormatter.parseMillis(startTimeStr);
+                endTime = dateTimeFormatter.parseMillis(endTimeStr);
+                timeUnit = TimeUnit.MILLISECONDS;
+              } else {
+                // by default, time column type is TimeColumnType.EPOCH
+                startTime = Long.parseLong(startTimeStr);
+                endTime = Long.parseLong(endTimeStr);
+                timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+              }
+            } else {
+              // No records in segment. Use current time as start/end
+              long now = System.currentTimeMillis();
+              if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+                startTime = now;
+                endTime = now;
+                timeUnit = TimeUnit.MILLISECONDS;
+              } else {
+                timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+                startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+                endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+              }
+            }
+          }
+
+          if (!_config.isSkipTimeValueCheck()) {
+            Interval timeInterval =
+                new Interval(timeUnit.toMillis(startTime), 
timeUnit.toMillis(endTime), DateTimeZone.UTC);
+            
Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
+                "Invalid segment start/end time: %s (in millis: %s/%s) for 
time column: %s, must be between: %s",
+                timeInterval, timeInterval.getStartMillis(), 
timeInterval.getEndMillis(), timeColumnName,
+                TimeUtils.VALID_TIME_INTERVAL);
+          }
+
+          properties.setProperty(SEGMENT_START_TIME, startTime);
+          properties.setProperty(SEGMENT_END_TIME, endTime);
+          properties.setProperty(TIME_UNIT, timeUnit);
+        } catch (Exception e) {
+          if (!_config.isContinueOnError()) {
+            throw e;
+          }
+          TimeUnit timeUnit;
+          long now = System.currentTimeMillis();
+          long convertedStartTime;
+          long convertedEndTime;
+          if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+            convertedEndTime = now;
+            convertedStartTime = TimeUtils.getValidMinTimeMillis();
+            timeUnit = TimeUnit.MILLISECONDS;
+          } else {
+            timeUnit = _config.getSegmentTimeUnit();
+            if (timeUnit != null) {
+              convertedEndTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+              convertedStartTime = 
timeUnit.convert(TimeUtils.getValidMinTimeMillis(), TimeUnit.MILLISECONDS);
+            } else {
+              // Use millis as the time unit if not able to infer from config
+              timeUnit = TimeUnit.MILLISECONDS;
+              convertedEndTime = now;
+              convertedStartTime = TimeUtils.getValidMinTimeMillis();
+            }
+          }
+          LOGGER.warn(
+              "Caught exception while writing time metadata for segment: {}, 
time column: {}, total docs: {}. "
+                  + "Continuing using current time ({}) as the end time, and 
min valid time ({}) as the start time "
+                  + "for the segment (time unit: {}).",
+              _segmentName, timeColumnName, _totalDocs, convertedEndTime, 
convertedStartTime, timeUnit, e);
+          properties.setProperty(SEGMENT_START_TIME, convertedStartTime);
+          properties.setProperty(SEGMENT_END_TIME, convertedEndTime);
+          properties.setProperty(TIME_UNIT, timeUnit);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry : 
_config.getCustomProperties().entrySet()) {
+      properties.setProperty(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, ColumnIndexCreationInfo> entry : 
_indexCreationInfoMap.entrySet()) {
+      String column = entry.getKey();
+      ColumnIndexCreationInfo columnIndexCreationInfo = entry.getValue();
+      SegmentDictionaryCreator dictionaryCreator = 
_colIndexes.get(column).getDictionaryCreator();
+      int dictionaryElementSize = (dictionaryCreator != null) ? 
dictionaryCreator.getNumBytesPerEntry() : 0;
+      addColumnMetadataInfo(properties, column, columnIndexCreationInfo, 
_totalDocs, _schema.getFieldSpecFor(column),
+          dictionaryCreator != null, dictionaryElementSize);
+    }
+
+    SegmentZKPropsConfig segmentZKPropsConfig = 
_config.getSegmentZKPropsConfig();
+    if (segmentZKPropsConfig != null) {
+      properties.setProperty(Realtime.START_OFFSET, 
segmentZKPropsConfig.getStartOffset());
+      properties.setProperty(Realtime.END_OFFSET, 
segmentZKPropsConfig.getEndOffset());
+    }
+    CommonsConfigurationUtils.saveToFile(properties, metadataFile);
+  }
+
+  /**
+   * Adds column metadata information to the properties configuration.
+   */
+  public static void addColumnMetadataInfo(PropertiesConfiguration properties, 
String column,
+      ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, 
FieldSpec fieldSpec, boolean hasDictionary,
+      int dictionaryElementSize) {
+    int cardinality = columnIndexCreationInfo.getDistinctValueCount();
+    properties.setProperty(getKeyFor(column, CARDINALITY), 
String.valueOf(cardinality));
+    properties.setProperty(getKeyFor(column, TOTAL_DOCS), 
String.valueOf(totalDocs));
+    DataType dataType = fieldSpec.getDataType();
+    properties.setProperty(getKeyFor(column, DATA_TYPE), 
String.valueOf(dataType));
+    // TODO: When the column is raw (no dictionary), we should set 
BITS_PER_ELEMENT to -1 (invalid). Currently we set
+    //       it regardless of whether dictionary is created or not for 
backward compatibility because
+    //       ForwardIndexHandler doesn't update this value when converting a 
raw column to dictionary encoded.
+    //       Consider changing it after releasing 1.5.0.
+    //       See https://github.com/apache/pinot/pull/16921 for details
+    properties.setProperty(getKeyFor(column, BITS_PER_ELEMENT),
+        
String.valueOf(org.apache.pinot.segment.local.io.util.PinotDataBitSet.getNumBitsPerValue(cardinality
 - 1)));
+    FieldType fieldType = fieldSpec.getFieldType();
+    properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), 
String.valueOf(dictionaryElementSize));
+    properties.setProperty(getKeyFor(column, COLUMN_TYPE), 
String.valueOf(fieldType));
+    properties.setProperty(getKeyFor(column, IS_SORTED), 
String.valueOf(columnIndexCreationInfo.isSorted()));
+    properties.setProperty(getKeyFor(column, HAS_DICTIONARY), 
String.valueOf(hasDictionary));
+    properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), 
String.valueOf(fieldSpec.isSingleValueField()));
+    properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS),
+        
String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
+    properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
+        String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
+    properties.setProperty(getKeyFor(column, IS_AUTO_GENERATED),
+        String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
+    DataType storedType = dataType.getStoredType();
+    if (storedType == DataType.STRING || storedType == DataType.BYTES) {
+      properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH), 
fieldSpec.getEffectiveMaxLength());
+      // TODO let's revisit writing effective maxLengthStrategy into metadata, 
as changing it right now may affect
+      //  segment's CRC value
+      FieldSpec.MaxLengthExceedStrategy maxLengthStrategy = 
fieldSpec.getMaxLengthExceedStrategy();
+      if (maxLengthStrategy != null) {
+        properties.setProperty(getKeyFor(column, 
SCHEMA_MAX_LENGTH_EXCEED_STRATEGY),
+            fieldSpec.getMaxLengthExceedStrategy());
+      }
+    }
+
+    org.apache.pinot.segment.spi.partition.PartitionFunction partitionFunction 
=
+        columnIndexCreationInfo.getPartitionFunction();
+    if (partitionFunction != null) {
+      properties.setProperty(getKeyFor(column, PARTITION_FUNCTION), 
partitionFunction.getName());
+      properties.setProperty(getKeyFor(column, NUM_PARTITIONS), 
columnIndexCreationInfo.getNumPartitions());
+      properties.setProperty(getKeyFor(column, PARTITION_VALUES), 
columnIndexCreationInfo.getPartitions());
+      if (columnIndexCreationInfo.getPartitionFunctionConfig() != null) {
+        for (Map.Entry<String, String> entry : 
columnIndexCreationInfo.getPartitionFunctionConfig().entrySet()) {
+          properties.setProperty(getKeyFor(column, String.format("%s.%s", 
PARTITION_FUNCTION_CONFIG, entry.getKey())),
+              entry.getValue());
+        }
+      }
+    }
+
+    // Datetime field
+    if (fieldType == FieldType.DATE_TIME) {
+      DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, DATETIME_FORMAT), 
dateTimeFieldSpec.getFormat());
+      properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), 
dateTimeFieldSpec.getGranularity());
+    }
+
+    if (fieldType != FieldType.COMPLEX) {
+      // Regular (non-complex) field
+      if (totalDocs > 0) {
+        Object min = columnIndexCreationInfo.getMin();
+        Object max = columnIndexCreationInfo.getMax();
+        // NOTE:
+        // Min/max could be null for real-time aggregate metrics. We don't 
directly call addColumnMinMaxValueInfo() to
+        // avoid setting MIN_MAX_VALUE_INVALID flag, which will prevent 
ColumnMinMaxValueGenerator from generating them
+        // when loading the segment.
+        if (min != null && max != null) {
+          addColumnMinMaxValueInfo(properties, column, min, max, storedType);
+        }
+      }
+    } else {
+      // Complex field
+      ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES),
+          new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet()));
+      for (Map.Entry<String, FieldSpec> entry : 
complexFieldSpec.getChildFieldSpecs().entrySet()) {
+        addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, 
entry.getKey()), entry.getValue());
+      }
+    }
+
+    // TODO: Revisit whether we should set default null value for complex field
+    String defaultNullValue = 
columnIndexCreationInfo.getDefaultNullValue().toString();
+    if (storedType == DataType.STRING) {
+      // NOTE: Do not limit length of default null value because we need exact 
value to determine whether the default
+      //       null value changes
+      defaultNullValue = 
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(defaultNullValue);
+    }
+    if (defaultNullValue != null) {
+      properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), 
defaultNullValue);
+    }
+  }
+
+  /**
+   * In order to persist complex field metadata, we need to recursively add 
child field specs
+   * So, each complex field spec will have a property for its child field 
names and each child field will have its
+   * own properties of the detailed field spec.
+   * E.g. a COMPLEX type `intMap` of Map<String, Integer> has 2 child fields:
+   *   - key in STRING type and value in INT type.
+   *   Then we will have the following properties to define a COMPLEX field:
+   *     column.intMap.childFieldNames = [key, value]
+   *     column.intMap$$key.columnType = DIMENSION
+   *     column.intMap$$key.dataType = STRING
+   *     column.intMap$$key.isSingleValued = true
+   *     column.intMap$$value.columnType = DIMENSION
+   *     column.intMap$$value.dataType = INT
+   *     column.intMap$$value.isSingleValued = true
+   */
+  public static void addFieldSpec(PropertiesConfiguration properties, String 
column, FieldSpec fieldSpec) {
+    properties.setProperty(getKeyFor(column, COLUMN_TYPE), 
String.valueOf(fieldSpec.getFieldType()));
+    if (!column.equals(fieldSpec.getName())) {
+      properties.setProperty(getKeyFor(column, COLUMN_NAME), 
String.valueOf(fieldSpec.getName()));
+    }
+    DataType dataType = fieldSpec.getDataType();
+    properties.setProperty(getKeyFor(column, DATA_TYPE), 
String.valueOf(dataType));
+    properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), 
String.valueOf(fieldSpec.isSingleValueField()));
+    if (dataType.equals(DataType.STRING) || dataType.equals(DataType.BYTES) || 
dataType.equals(DataType.JSON)) {
+      properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH), 
fieldSpec.getEffectiveMaxLength());
+      FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = 
fieldSpec.getEffectiveMaxLengthExceedStrategy();
+      if (maxLengthExceedStrategy != null) {
+        properties.setProperty(getKeyFor(column, 
SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), maxLengthExceedStrategy);
+      }
+    }
+
+    // datetime field
+    if (fieldSpec.getFieldType() == FieldType.DATE_TIME) {
+      DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, DATETIME_FORMAT), 
dateTimeFieldSpec.getFormat());
+      properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), 
dateTimeFieldSpec.getGranularity());
+    }
+
+    // complex field
+    if (fieldSpec.getFieldType() == FieldType.COMPLEX) {
+      ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES),
+          new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet()));
+      for (Map.Entry<String, FieldSpec> entry : 
complexFieldSpec.getChildFieldSpecs().entrySet()) {
+        addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, 
entry.getKey()), entry.getValue());
+      }
+    }
+  }
+
+  /**
+   * Adds column min/max value information to the properties configuration.
+   */
+  public static void addColumnMinMaxValueInfo(PropertiesConfiguration 
properties, String column,
+      @Nullable Object minValue, @Nullable Object maxValue, DataType 
storedType) {
+    String validMinValue = minValue != null ? 
getValidPropertyValue(minValue.toString(), storedType) : null;
+    if (validMinValue != null) {
+      properties.setProperty(getKeyFor(column, MIN_VALUE), validMinValue);
+    }
+    String validMaxValue = maxValue != null ? 
getValidPropertyValue(maxValue.toString(), storedType) : null;
+    if (validMaxValue != null) {
+      properties.setProperty(getKeyFor(column, MAX_VALUE), validMaxValue);
+    }
+    if (validMinValue == null && validMaxValue == null) {
+      properties.setProperty(getKeyFor(column, MIN_MAX_VALUE_INVALID), true);
+    }
+  }
+
+  /**
+   * Helper method to get the valid value for setting min/max. Returns {@code 
null} if the value is too long (longer
+   * than 512 characters), or is not supported in {@link 
PropertiesConfiguration}, e.g. contains character with
+   * surrogate.
+   */
+  @Nullable
+  private static String getValidPropertyValue(String value, DataType 
storedType) {
+    if (value.length() > METADATA_PROPERTY_LENGTH_LIMIT) {
+      return null;
+    }
+    return storedType == DataType.STRING ? 
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(value)
+        : value;
+  }
+
+  /**
+   * Removes column metadata information from the properties configuration.
+   */
+  public static void removeColumnMetadataInfo(PropertiesConfiguration 
properties, String column) {
+    properties.subset(COLUMN_PROPS_KEY_PREFIX + column).clear();
+  }
+
+  @Override
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  /**
+   * Writes the individual column index files to disk.
+   */
+  protected abstract void flushColIndexes() throws Exception;
+
+  @Override
+  public void seal()
+      throws Exception {
+    _segmentName = generateSegmentName();
+    try {
+      // Write the index files to disk
+      flushColIndexes();
+    } finally {
+      close();

Review Comment:
   This is anti-pattern. `close()` should be invoked by the caller.
   We can consider introducing `closeColIndexes()`, and make it idempotent. 
Then invoke it both here and in `close()`. We should still make the driver call 
`close()` in case of failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to