This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c405193360a Add custom map metadata in segment metadata and upload 
Segment command. (#17055)
c405193360a is described below

commit c405193360a56a31266296846938afbc91d5f4b9
Author: RAGHVENDRA KUMAR YADAV <[email protected]>
AuthorDate: Wed Oct 22 15:38:37 2025 -0700

    Add custom map metadata in segment metadata and upload Segment command. 
(#17055)
    
    * Changes to add custom map metadata in segment metadata and upload Segment 
command.
    
    * fix unit tests.
    
    * Update 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
    
    Co-authored-by: Xiang Fu <[email protected]>
    
    * Update 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
    
    Co-authored-by: Xiang Fu <[email protected]>
    
    ---------
    
    Co-authored-by: Xiang Fu <[email protected]>
---
 .../core/data/manager/BaseTableDataManager.java    | 24 ++++++-----
 .../manager/realtime/RealtimeTableDataManager.java |  5 ++-
 .../immutable/ImmutableSegmentLoader.java          | 45 ++++++++++++++-------
 .../loader/DefaultSegmentDirectoryLoader.java      |  2 +-
 .../segment/store/SegmentLocalFSDirectory.java     | 23 +++++++++--
 .../segment/store/SingleFileIndexDirectory.java    | 12 +++++-
 .../converter/RealtimeSegmentConverterTest.java    |  3 +-
 .../spi/loader/SegmentDirectoryLoaderContext.java  | 16 +++++++-
 .../tools/admin/command/UploadSegmentCommand.java  | 47 ++++++++++++++++++++++
 9 files changed, 144 insertions(+), 33 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 0536f9d6f30..b70419e4f51 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -449,7 +449,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     String segmentName = zkMetadata.getSegmentName();
     _logger.info("Downloading and loading segment: {}", segmentName);
     File indexDir = downloadSegment(zkMetadata);
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler), zkMetadata);
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler, zkMetadata);
+    addSegment(immutableSegment, zkMetadata);
     _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", 
segmentName, zkMetadata.getCrc(),
         TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
   }
@@ -860,7 +862,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         _logger.info("Reloading existing segment: {} on tier: {}", segmentName,
             TierConfigUtils.normalizeTierName(segmentTier));
         SegmentDirectory segmentDirectory =
-            initSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
+            initSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig, zkMetadata);
         // We should first try to reuse existing segment directory
         if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier, 
segmentDirectory, indexLoadingConfig)) {
           _logger.info("Reloading segment: {} using existing segment directory 
as no reprocessing needed", segmentName);
@@ -886,7 +888,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
       _logger.info("Loading segment: {} from indexDir: {} to tier: {}", 
segmentName, indexDir,
           TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
-      ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig, _segmentOperationsThrottler);
+      ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig,
+          _segmentOperationsThrottler, zkMetadata);
       addSegment(segment, zkMetadata);
 
       // Remove backup directory to mark the completion of segment reloading.
@@ -1233,7 +1236,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     // Creates the SegmentDirectory object to access the segment metadata.
     // The metadata is null if the segment doesn't exist yet.
     SegmentDirectory segmentDirectory =
-        tryInitSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
+        tryInitSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig, zkMetadata);
     SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null : 
segmentDirectory.getSegmentMetadata();
 
     /*
@@ -1275,8 +1278,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         segmentDirectory.copyTo(indexDir);
         // Close the stale SegmentDirectory object and recreate it with 
reprocessed segment.
         closeSegmentDirectoryQuietly(segmentDirectory);
-        ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler);
-        segmentDirectory = initSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
+        ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler, zkMetadata);
+        segmentDirectory = initSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()),
+            indexLoadingConfig, zkMetadata);
       }
       ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig);
       addSegment(segment, zkMetadata);
@@ -1293,9 +1297,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
   @Nullable
   private SegmentDirectory tryInitSegmentDirectory(String segmentName, String 
segmentCrc,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, @Nullable SegmentZKMetadata 
zkMetadata) {
     try {
-      return initSegmentDirectory(segmentName, segmentCrc, indexLoadingConfig);
+      return initSegmentDirectory(segmentName, segmentCrc, indexLoadingConfig, 
zkMetadata);
     } catch (Exception e) {
       _logger.warn("Failed to initialize SegmentDirectory for segment: {} with 
error: {}", segmentName, e.getMessage());
       return null;
@@ -1608,8 +1612,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   }
 
   private SegmentDirectory initSegmentDirectory(String segmentName, String 
segmentCrc,
-      IndexLoadingConfig indexLoadingConfig)
+      IndexLoadingConfig indexLoadingConfig, @Nullable SegmentZKMetadata 
zkMetadata)
       throws Exception {
+    Map<String, String> segmentCustomConfigs = zkMetadata != null ? 
zkMetadata.getCustomMap() : new HashMap<>();
     SegmentDirectoryLoaderContext loaderContext =
         new 
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
             .setSchema(indexLoadingConfig.getSchema())
@@ -1620,6 +1625,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
             .setSegmentTier(indexLoadingConfig.getSegmentTier())
             
.setInstanceTierConfigs(indexLoadingConfig.getInstanceTierConfigs())
             
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs())
+            .setSegmentCustomConfigs(segmentCustomConfigs)
             .build();
     SegmentDirectoryLoader segmentDirectoryLoader =
         
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 69184e2bfc0..ff0d428ca6e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -844,7 +844,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Get a new index loading config with latest table config and schema to 
load the segment
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
     indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler), zkMetadata);
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler, zkMetadata),
+        zkMetadata);
     _ingestionDelayTracker.markPartitionForVerification(segmentName);
     _logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
   }
@@ -869,7 +870,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Get a new index loading config with latest table config and schema to 
load the segment
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
     ImmutableSegment immutableSegment =
-        ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler);
+        ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler, zkMetadata);
 
     addSegment(immutableSegment, zkMetadata);
     _ingestionDelayTracker.markPartitionForVerification(segmentName);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index e32610ba61e..8c9e94d522a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -21,10 +21,12 @@ package 
org.apache.pinot.segment.local.indexsegment.immutable;
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import java.io.File;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import 
org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
 import 
org.apache.pinot.segment.local.segment.index.converter.SegmentFormatConverterFactory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -68,7 +70,7 @@ public class ImmutableSegmentLoader {
       throws Exception {
     IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
     defaultIndexLoadingConfig.setReadMode(readMode);
-    return load(indexDir, defaultIndexLoadingConfig, false, null);
+    return load(indexDir, defaultIndexLoadingConfig, false, null, null);
   }
 
   /**
@@ -78,9 +80,8 @@ public class ImmutableSegmentLoader {
    */
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig)
       throws Exception {
-    return load(indexDir, indexLoadingConfig, true, null);
+    return load(indexDir, indexLoadingConfig, true, null, null);
   }
-
   /**
    * Loads the segment with specified IndexLoadingConfig.
    * This method modifies the segment like to convert segment format, add or 
remove indices.
@@ -89,7 +90,18 @@ public class ImmutableSegmentLoader {
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
       throws Exception {
-    return load(indexDir, indexLoadingConfig, true, 
segmentOperationsThrottler);
+    return load(indexDir, indexLoadingConfig, true, 
segmentOperationsThrottler, null);
+  }
+
+  /**
+   * Loads the segment with specified IndexLoadingConfig.
+   * This method modifies the segment like to convert segment format, add or 
remove indices.
+   * Mostly used by UT cases to add some specific index for testing purpose.
+   */
+  public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig,
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, 
@Nullable SegmentZKMetadata zkMetadata)
+      throws Exception {
+    return load(indexDir, indexLoadingConfig, true, 
segmentOperationsThrottler, zkMetadata);
   }
 
   /**
@@ -98,7 +110,7 @@ public class ImmutableSegmentLoader {
    */
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, boolean needPreprocess)
       throws Exception {
-    return load(indexDir, indexLoadingConfig, needPreprocess, null);
+    return load(indexDir, indexLoadingConfig, needPreprocess, null, null);
   }
 
   /**
@@ -106,7 +118,7 @@ public class ImmutableSegmentLoader {
    * modify the segment like to convert segment format, add or remove indices.
    */
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, boolean needPreprocess,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, 
@Nullable SegmentZKMetadata zkMetadata)
       throws Exception {
     Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s 
does not exist or is not a directory",
         indexDir);
@@ -116,9 +128,10 @@ public class ImmutableSegmentLoader {
       return new EmptyIndexSegment(segmentMetadata);
     }
     if (needPreprocess) {
-      preprocess(indexDir, indexLoadingConfig, segmentOperationsThrottler);
+      preprocess(indexDir, indexLoadingConfig, segmentOperationsThrottler, 
zkMetadata);
     }
     String segmentName = segmentMetadata.getName();
+    Map<String, String> segmentCustomConfigs = zkMetadata != null ? 
zkMetadata.getCustomMap() : new HashMap<>();
     SegmentDirectoryLoaderContext segmentLoaderContext =
         new 
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
             .setSchema(indexLoadingConfig.getSchema())
@@ -129,6 +142,7 @@ public class ImmutableSegmentLoader {
             .setSegmentTier(indexLoadingConfig.getSegmentTier())
             
.setInstanceTierConfigs(indexLoadingConfig.getInstanceTierConfigs())
             
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs())
+            .setSegmentCustomConfigs(segmentCustomConfigs)
             .build();
     SegmentDirectoryLoader segmentLoader =
         
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
@@ -143,7 +157,7 @@ public class ImmutableSegmentLoader {
   }
 
   public static void preprocess(File indexDir, IndexLoadingConfig 
indexLoadingConfig,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, 
SegmentZKMetadata zkMetadata)
       throws Exception {
     Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s 
does not exist or is not a directory",
         indexDir);
@@ -158,7 +172,7 @@ public class ImmutableSegmentLoader {
         // Preprocess requires table config and schema
         if (indexLoadingConfig.getTableConfig() != null && 
indexLoadingConfig.getSchema() != null) {
           preprocessSegment(indexDir, segmentMetadata.getName(), 
segmentMetadata.getCrc(), indexLoadingConfig,
-              segmentOperationsThrottler);
+              segmentOperationsThrottler, zkMetadata);
         }
       } finally {
         if (segmentOperationsThrottler != null) {
@@ -285,20 +299,22 @@ public class ImmutableSegmentLoader {
       return;
     }
     String segmentName = indexDir.getName();
-    LOGGER.info("Segment: {} needs to be converted from version: {} to {}", 
segmentName, segmentVersionOnDisk,
-        segmentVersionToLoad);
+    LOGGER.info("Segment: {} needs to be converted from version: {} to {}", 
segmentName,
+        segmentVersionOnDisk, segmentVersionToLoad);
     SegmentFormatConverter converter =
         SegmentFormatConverterFactory.getConverter(segmentVersionOnDisk, 
segmentVersionToLoad);
     LOGGER.info("Using converter: {} to up-convert segment: {}", 
converter.getClass().getSimpleName(), segmentName);
     converter.convert(indexDir);
-    LOGGER.info("Successfully up-converted segment: {} from version: {} to 
{}", segmentName, segmentVersionOnDisk,
-        segmentVersionToLoad);
+    LOGGER.info("Successfully up-converted segment: {} from version: {} to 
{}", segmentName,
+        segmentVersionOnDisk, segmentVersionToLoad);
   }
 
   private static void preprocessSegment(File indexDir, String segmentName, 
String segmentCrc,
-      IndexLoadingConfig indexLoadingConfig, @Nullable 
SegmentOperationsThrottler segmentOperationsThrottler)
+      IndexLoadingConfig indexLoadingConfig, @Nullable 
SegmentOperationsThrottler segmentOperationsThrottler,
+      SegmentZKMetadata zkMetadata)
       throws Exception {
     PinotConfiguration segmentDirectoryConfigs = 
indexLoadingConfig.getSegmentDirectoryConfigs();
+    Map<String, String> segmentCustomConfigs = zkMetadata != null ? 
zkMetadata.getCustomMap() : new HashMap<>();
     SegmentDirectoryLoaderContext segmentLoaderContext =
         new 
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
             .setSchema(indexLoadingConfig.getSchema())
@@ -306,6 +322,7 @@ public class ImmutableSegmentLoader {
             .setSegmentName(segmentName)
             .setSegmentCrc(segmentCrc)
             .setSegmentDirectoryConfigs(segmentDirectoryConfigs)
+            .setSegmentCustomConfigs(segmentCustomConfigs)
             .build();
     SegmentDirectory segmentDirectory =
         
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(indexDir.toURI(),
 segmentLoaderContext);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
index d196d3b4d55..17f8cb6d92d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
@@ -55,7 +55,7 @@ public class DefaultSegmentDirectoryLoader implements 
SegmentDirectoryLoader {
     if (!directory.exists()) {
       return new SegmentLocalFSDirectory(directory);
     }
-    return new SegmentLocalFSDirectory(directory,
+    return new SegmentLocalFSDirectory(directory, segmentLoaderContext,
         
ReadMode.valueOf(segmentDirectoryConfigs.getProperty(IndexLoadingConfig.READ_MODE_KEY)));
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
index a24aced5e4c..a68271e34c9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
@@ -34,6 +34,7 @@ import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import 
org.apache.pinot.segment.spi.index.multicolumntext.MultiColumnTextIndexConstants;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -62,6 +63,7 @@ public class SegmentLocalFSDirectory extends SegmentDirectory 
{
   private final File _segmentDirectory;
   private final SegmentLock _segmentLock;
   private final ReadMode _readMode;
+  private final SegmentDirectoryLoaderContext _segmentDirectoryLoaderContext;
   private SegmentMetadataImpl _segmentMetadata;
   private ColumnIndexDirectory _columnIndexDirectory;
   private StarTreeIndexReader _starTreeIndexReader;
@@ -75,15 +77,29 @@ public class SegmentLocalFSDirectory extends 
SegmentDirectory {
     _segmentDirectory = null;
     _segmentLock = new SegmentLock();
     _readMode = null;
+    _segmentDirectoryLoaderContext = null;
   }
 
   public SegmentLocalFSDirectory(File directory, ReadMode readMode)
       throws IOException, ConfigurationException {
-    this(directory, new SegmentMetadataImpl(directory), readMode);
+    this(directory, new SegmentMetadataImpl(directory), null, readMode);
+  }
+
+  public SegmentLocalFSDirectory(File directory, @Nullable 
SegmentDirectoryLoaderContext segmentDirectoryLoaderContext,
+      ReadMode readMode)
+      throws IOException, ConfigurationException {
+    this(directory, new SegmentMetadataImpl(directory), 
segmentDirectoryLoaderContext, readMode);
+  }
+
+  public SegmentLocalFSDirectory(File directory, SegmentMetadataImpl metadata, 
ReadMode readMode)
+      throws IOException, ConfigurationException {
+    this(directory, metadata, null, readMode);
   }
 
   @VisibleForTesting
-  public SegmentLocalFSDirectory(File directoryFile, SegmentMetadataImpl 
metadata, ReadMode readMode) {
+  public SegmentLocalFSDirectory(File directoryFile, SegmentMetadataImpl 
metadata,
+      @Nullable SegmentDirectoryLoaderContext segmentDirectoryLoaderContext, 
ReadMode readMode) {
+    _segmentDirectoryLoaderContext = segmentDirectoryLoaderContext;
 
     Preconditions.checkNotNull(directoryFile);
     Preconditions.checkNotNull(metadata);
@@ -260,7 +276,8 @@ public class SegmentLocalFSDirectory extends 
SegmentDirectory {
         break;
       case v3:
         try {
-          _columnIndexDirectory = new 
SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
+          _columnIndexDirectory = new 
SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata,
+              _segmentDirectoryLoaderContext, _readMode);
         } catch (ConfigurationException e) {
           LOGGER.error("Failed to create columnar index directory", e);
           throw new RuntimeException(e);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
index 768752315ac..49c303ac431 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
@@ -42,6 +43,7 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
 import org.apache.pinot.segment.spi.store.ColumnIndexUtils;
@@ -79,6 +81,7 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
   private static final int MAX_ALLOCATION_SIZE = 2000 * 1024 * 1024;
 
   private final File _segmentDirectory;
+  private final SegmentDirectoryLoaderContext _segmentDirectoryLoaderContext;
   private SegmentMetadataImpl _segmentMetadata;
   private final ReadMode _readMode;
   private final File _indexFile;
@@ -92,12 +95,18 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory 
{
   // re-arranges the content in index file to keep it compact.
   private boolean _shouldCleanupRemovedIndices;
 
+  public SingleFileIndexDirectory(File segmentDirectory, SegmentMetadataImpl 
segmentMetadata, ReadMode readMode)
+      throws IOException, ConfigurationException {
+    this(segmentDirectory, segmentMetadata, null, readMode);
+  }
+
   /**
    * @param segmentDirectory File pointing to segment directory
    * @param segmentMetadata segment metadata. Metadata must be fully 
initialized
    * @param readMode mmap vs heap mode
    */
-  public SingleFileIndexDirectory(File segmentDirectory, SegmentMetadataImpl 
segmentMetadata, ReadMode readMode)
+  public SingleFileIndexDirectory(File segmentDirectory, SegmentMetadataImpl 
segmentMetadata,
+      @Nullable SegmentDirectoryLoaderContext segmentDirectoryLoaderContext, 
ReadMode readMode)
       throws IOException, ConfigurationException {
     Preconditions.checkNotNull(segmentDirectory);
     Preconditions.checkNotNull(readMode);
@@ -108,6 +117,7 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory 
{
     Preconditions.checkArgument(segmentDirectory.isDirectory(),
         "SegmentDirectory: " + segmentDirectory.toString() + " is not a 
directory");
 
+    _segmentDirectoryLoaderContext = segmentDirectoryLoaderContext;
     _segmentDirectory = segmentDirectory;
     _segmentMetadata = segmentMetadata;
     _readMode = readMode;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index fe22b3b9914..ea9042abc58 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -435,7 +436,7 @@ public class RealtimeSegmentConverterTest implements 
PinotBuffersAfterMethodChec
 
   private void testSegment(List<GenericRow> rows, File indexDir,
       TableConfig tableConfig, SegmentMetadataImpl segmentMetadata)
-      throws IOException {
+      throws IOException, ConfigurationException {
     SegmentLocalFSDirectory segmentDir = new SegmentLocalFSDirectory(indexDir, 
segmentMetadata, ReadMode.mmap);
     SegmentDirectory.Reader segmentReader = segmentDir.createReader();
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
index 312fa5ba17d..5fc6db87132 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
@@ -38,10 +38,11 @@ public class SegmentDirectoryLoaderContext {
   private final String _segmentTier;
   private final Map<String, Map<String, String>> _instanceTierConfigs;
   private final PinotConfiguration _segmentDirectoryConfigs;
+  private final Map<String, String> _segmentCustomConfigs;
 
   private SegmentDirectoryLoaderContext(TableConfig tableConfig, Schema 
schema, String instanceId, String tableDataDir,
       String segmentName, String segmentCrc, String segmentTier, Map<String, 
Map<String, String>> instanceTierConfigs,
-      PinotConfiguration segmentDirectoryConfigs) {
+      PinotConfiguration segmentDirectoryConfigs, Map<String, String> 
segmentCustomConfigs) {
     _tableConfig = tableConfig;
     _schema = schema;
     _instanceId = instanceId;
@@ -51,6 +52,7 @@ public class SegmentDirectoryLoaderContext {
     _segmentTier = segmentTier;
     _instanceTierConfigs = instanceTierConfigs;
     _segmentDirectoryConfigs = segmentDirectoryConfigs;
+    _segmentCustomConfigs = segmentCustomConfigs;
   }
 
   public TableConfig getTableConfig() {
@@ -89,6 +91,10 @@ public class SegmentDirectoryLoaderContext {
     return _instanceTierConfigs;
   }
 
+  public Map<String, String> getSegmentCustomConfigs() {
+    return _segmentCustomConfigs;
+  }
+
   public static class Builder {
     private TableConfig _tableConfig;
     private Schema _schema;
@@ -99,6 +105,7 @@ public class SegmentDirectoryLoaderContext {
     private String _segmentTier;
     private Map<String, Map<String, String>> _instanceTierConfigs;
     private PinotConfiguration _segmentDirectoryConfigs;
+    private Map<String, String> _segmentCustomConfigs;
 
     public Builder setTableConfig(TableConfig tableConfig) {
       _tableConfig = tableConfig;
@@ -145,9 +152,14 @@ public class SegmentDirectoryLoaderContext {
       return this;
     }
 
+    public Builder setSegmentCustomConfigs(Map<String, String> 
segmentCustomConfigs) {
+      _segmentCustomConfigs = segmentCustomConfigs;
+      return this;
+    }
+
     public SegmentDirectoryLoaderContext build() {
       return new SegmentDirectoryLoaderContext(_tableConfig, _schema, 
_instanceId, _tableDataDir, _segmentName,
-          _segmentCrc, _segmentTier, _instanceTierConfigs, 
_segmentDirectoryConfigs);
+          _segmentCrc, _segmentTier, _instanceTierConfigs, 
_segmentDirectoryConfigs, _segmentCustomConfigs);
     }
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index 3ca3fe87f97..5a35d886221 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -23,9 +23,12 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.net.URI;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.message.BasicHeader;
 import org.apache.pinot.common.auth.AuthProviderUtils;
+import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.spi.auth.AuthProvider;
@@ -79,6 +82,15 @@ public class UploadSegmentCommand extends 
AbstractBaseAdminCommand implements Co
       description = "Table type to upload. Can be OFFLINE or REALTIME")
   private TableType _tableType = TableType.OFFLINE;
 
+  @CommandLine.Option(names = {"-customMetadata"}, required = false, split = 
",",
+      description = "Custom metadata to add to segment ZK metadata in 
key=value format (e.g. key1=value1,key2=value2)")
+  private String[] _customMetadata = null;
+
+  @CommandLine.Option(names = {"-customMetadataMode"}, required = false,
+      description = "Mode for custom metadata modification. Can be REPLACE or 
UPDATE. Default is UPDATE")
+  private SegmentZKMetadataCustomMapModifier.ModifyMode _customMetadataMode =
+      SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE;
+
   private AuthProvider _authProvider;
 
   @Override
@@ -145,6 +157,31 @@ public class UploadSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     _tableType = tableType;
   }
 
+  public UploadSegmentCommand setCustomMetadata(String[] customMetadata) {
+    _customMetadata = customMetadata;
+    return this;
+  }
+
+  public UploadSegmentCommand 
setCustomMetadataMode(SegmentZKMetadataCustomMapModifier.ModifyMode 
customMetadataMode) {
+    _customMetadataMode = customMetadataMode;
+    return this;
+  }
+
+  private Map<String, String> parseCustomMetadata() {
+    if (_customMetadata == null || _customMetadata.length == 0) {
+      return null;
+    }
+    Map<String, String> customMetadataMap = new java.util.HashMap<>();
+    for (String keyValue : _customMetadata) {
+      String[] parts = keyValue.split("=", 2);
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Invalid custom metadata format. 
Expected key=value, got: " + keyValue);
+      }
+      customMetadataMap.put(parts[0], parts[1]);
+    }
+    return customMetadataMap;
+  }
+
   @Override
   public boolean execute()
       throws Exception {
@@ -182,6 +219,16 @@ public class UploadSegmentCommand extends 
AbstractBaseAdminCommand implements Co
             AuthProviderUtils.makeAuthHeaders(
                 AuthProviderUtils.makeAuthProvider(_authProvider, 
_authTokenUrl, _authToken, _user, _password));
 
+        // Add custom metadata header if provided
+        Map<String, String> customMetadataMap = parseCustomMetadata();
+        if (customMetadataMap != null && !customMetadataMap.isEmpty()) {
+          SegmentZKMetadataCustomMapModifier modifier = new 
SegmentZKMetadataCustomMapModifier(_customMetadataMode,
+              customMetadataMap);
+          headerList.add(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+              modifier.toJsonString()));
+          LOGGER.info("Added custom metadata modifier: {}", 
modifier.toJsonString());
+        }
+
         FileInputStream fileInputStream = new FileInputStream(segmentTarFile);
         fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(),
             fileInputStream, headerList, null, _tableName, _tableType);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to