This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c405193360a Add custom map metadata in segment metadata and upload
Segment command. (#17055)
c405193360a is described below
commit c405193360a56a31266296846938afbc91d5f4b9
Author: RAGHVENDRA KUMAR YADAV <[email protected]>
AuthorDate: Wed Oct 22 15:38:37 2025 -0700
Add custom map metadata in segment metadata and upload Segment command.
(#17055)
* Changes to add custom map metadata in segment metadata and upload Segment
command.
* fix unit tests.
* Update
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
Co-authored-by: Xiang Fu <[email protected]>
* Update
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
Co-authored-by: Xiang Fu <[email protected]>
---------
Co-authored-by: Xiang Fu <[email protected]>
---
.../core/data/manager/BaseTableDataManager.java | 24 ++++++-----
.../manager/realtime/RealtimeTableDataManager.java | 5 ++-
.../immutable/ImmutableSegmentLoader.java | 45 ++++++++++++++-------
.../loader/DefaultSegmentDirectoryLoader.java | 2 +-
.../segment/store/SegmentLocalFSDirectory.java | 23 +++++++++--
.../segment/store/SingleFileIndexDirectory.java | 12 +++++-
.../converter/RealtimeSegmentConverterTest.java | 3 +-
.../spi/loader/SegmentDirectoryLoaderContext.java | 16 +++++++-
.../tools/admin/command/UploadSegmentCommand.java | 47 ++++++++++++++++++++++
9 files changed, 144 insertions(+), 33 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 0536f9d6f30..b70419e4f51 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -449,7 +449,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
String segmentName = zkMetadata.getSegmentName();
_logger.info("Downloading and loading segment: {}", segmentName);
File indexDir = downloadSegment(zkMetadata);
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler), zkMetadata);
+ ImmutableSegment immutableSegment =
+ ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler, zkMetadata);
+ addSegment(immutableSegment, zkMetadata);
_logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}",
segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
}
@@ -860,7 +862,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_logger.info("Reloading existing segment: {} on tier: {}", segmentName,
TierConfigUtils.normalizeTierName(segmentTier));
SegmentDirectory segmentDirectory =
- initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
+ initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig, zkMetadata);
// We should first try to reuse existing segment directory
if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier,
segmentDirectory, indexLoadingConfig)) {
_logger.info("Reloading segment: {} using existing segment directory
as no reprocessing needed", segmentName);
@@ -886,7 +888,8 @@ public abstract class BaseTableDataManager implements
TableDataManager {
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
_logger.info("Loading segment: {} from indexDir: {} to tier: {}",
segmentName, indexDir,
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
- ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir,
indexLoadingConfig, _segmentOperationsThrottler);
+ ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir,
indexLoadingConfig,
+ _segmentOperationsThrottler, zkMetadata);
addSegment(segment, zkMetadata);
// Remove backup directory to mark the completion of segment reloading.
@@ -1233,7 +1236,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// Creates the SegmentDirectory object to access the segment metadata.
// The metadata is null if the segment doesn't exist yet.
SegmentDirectory segmentDirectory =
- tryInitSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
+ tryInitSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig, zkMetadata);
SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null :
segmentDirectory.getSegmentMetadata();
/*
@@ -1275,8 +1278,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
segmentDirectory.copyTo(indexDir);
// Close the stale SegmentDirectory object and recreate it with
reprocessed segment.
closeSegmentDirectoryQuietly(segmentDirectory);
- ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig,
_segmentOperationsThrottler);
- segmentDirectory = initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
+ ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig,
_segmentOperationsThrottler, zkMetadata);
+ segmentDirectory = initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()),
+ indexLoadingConfig, zkMetadata);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory,
indexLoadingConfig);
addSegment(segment, zkMetadata);
@@ -1293,9 +1297,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Nullable
private SegmentDirectory tryInitSegmentDirectory(String segmentName, String
segmentCrc,
- IndexLoadingConfig indexLoadingConfig) {
+ IndexLoadingConfig indexLoadingConfig, @Nullable SegmentZKMetadata
zkMetadata) {
try {
- return initSegmentDirectory(segmentName, segmentCrc, indexLoadingConfig);
+ return initSegmentDirectory(segmentName, segmentCrc, indexLoadingConfig,
zkMetadata);
} catch (Exception e) {
_logger.warn("Failed to initialize SegmentDirectory for segment: {} with
error: {}", segmentName, e.getMessage());
return null;
@@ -1608,8 +1612,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
private SegmentDirectory initSegmentDirectory(String segmentName, String
segmentCrc,
- IndexLoadingConfig indexLoadingConfig)
+ IndexLoadingConfig indexLoadingConfig, @Nullable SegmentZKMetadata
zkMetadata)
throws Exception {
+ Map<String, String> segmentCustomConfigs = zkMetadata != null ?
zkMetadata.getCustomMap() : new HashMap<>();
SegmentDirectoryLoaderContext loaderContext =
new
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
.setSchema(indexLoadingConfig.getSchema())
@@ -1620,6 +1625,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
.setSegmentTier(indexLoadingConfig.getSegmentTier())
.setInstanceTierConfigs(indexLoadingConfig.getInstanceTierConfigs())
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs())
+ .setSegmentCustomConfigs(segmentCustomConfigs)
.build();
SegmentDirectoryLoader segmentDirectoryLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 69184e2bfc0..ff0d428ca6e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -844,7 +844,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Get a new index loading config with latest table config and schema to
load the segment
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler), zkMetadata);
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler, zkMetadata),
+ zkMetadata);
_ingestionDelayTracker.markPartitionForVerification(segmentName);
_logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
}
@@ -869,7 +870,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Get a new index loading config with latest table config and schema to
load the segment
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
ImmutableSegment immutableSegment =
- ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler);
+ ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler, zkMetadata);
addSegment(immutableSegment, zkMetadata);
_ingestionDelayTracker.markPartitionForVerification(segmentName);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index e32610ba61e..8c9e94d522a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -21,10 +21,12 @@ package
org.apache.pinot.segment.local.indexsegment.immutable;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import java.io.File;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import
org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import
org.apache.pinot.segment.local.segment.index.converter.SegmentFormatConverterFactory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -68,7 +70,7 @@ public class ImmutableSegmentLoader {
throws Exception {
IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
defaultIndexLoadingConfig.setReadMode(readMode);
- return load(indexDir, defaultIndexLoadingConfig, false, null);
+ return load(indexDir, defaultIndexLoadingConfig, false, null, null);
}
/**
@@ -78,9 +80,8 @@ public class ImmutableSegmentLoader {
*/
public static ImmutableSegment load(File indexDir, IndexLoadingConfig
indexLoadingConfig)
throws Exception {
- return load(indexDir, indexLoadingConfig, true, null);
+ return load(indexDir, indexLoadingConfig, true, null, null);
}
-
/**
* Loads the segment with specified IndexLoadingConfig.
* This method modifies the segment like to convert segment format, add or
remove indices.
@@ -89,7 +90,18 @@ public class ImmutableSegmentLoader {
public static ImmutableSegment load(File indexDir, IndexLoadingConfig
indexLoadingConfig,
@Nullable SegmentOperationsThrottler segmentOperationsThrottler)
throws Exception {
- return load(indexDir, indexLoadingConfig, true,
segmentOperationsThrottler);
+ return load(indexDir, indexLoadingConfig, true,
segmentOperationsThrottler, null);
+ }
+
+ /**
+ * Loads the segment with specified IndexLoadingConfig.
+ * This method modifies the segment like to convert segment format, add or
remove indices.
+ * Mostly used by UT cases to add some specific index for testing purpose.
+ */
+ public static ImmutableSegment load(File indexDir, IndexLoadingConfig
indexLoadingConfig,
+ @Nullable SegmentOperationsThrottler segmentOperationsThrottler,
@Nullable SegmentZKMetadata zkMetadata)
+ throws Exception {
+ return load(indexDir, indexLoadingConfig, true,
segmentOperationsThrottler, zkMetadata);
}
/**
@@ -98,7 +110,7 @@ public class ImmutableSegmentLoader {
*/
public static ImmutableSegment load(File indexDir, IndexLoadingConfig
indexLoadingConfig, boolean needPreprocess)
throws Exception {
- return load(indexDir, indexLoadingConfig, needPreprocess, null);
+ return load(indexDir, indexLoadingConfig, needPreprocess, null, null);
}
/**
@@ -106,7 +118,7 @@ public class ImmutableSegmentLoader {
* modify the segment like to convert segment format, add or remove indices.
*/
public static ImmutableSegment load(File indexDir, IndexLoadingConfig
indexLoadingConfig, boolean needPreprocess,
- @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
+ @Nullable SegmentOperationsThrottler segmentOperationsThrottler,
@Nullable SegmentZKMetadata zkMetadata)
throws Exception {
Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s
does not exist or is not a directory",
indexDir);
@@ -116,9 +128,10 @@ public class ImmutableSegmentLoader {
return new EmptyIndexSegment(segmentMetadata);
}
if (needPreprocess) {
- preprocess(indexDir, indexLoadingConfig, segmentOperationsThrottler);
+ preprocess(indexDir, indexLoadingConfig, segmentOperationsThrottler,
zkMetadata);
}
String segmentName = segmentMetadata.getName();
+ Map<String, String> segmentCustomConfigs = zkMetadata != null ?
zkMetadata.getCustomMap() : new HashMap<>();
SegmentDirectoryLoaderContext segmentLoaderContext =
new
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
.setSchema(indexLoadingConfig.getSchema())
@@ -129,6 +142,7 @@ public class ImmutableSegmentLoader {
.setSegmentTier(indexLoadingConfig.getSegmentTier())
.setInstanceTierConfigs(indexLoadingConfig.getInstanceTierConfigs())
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs())
+ .setSegmentCustomConfigs(segmentCustomConfigs)
.build();
SegmentDirectoryLoader segmentLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
@@ -143,7 +157,7 @@ public class ImmutableSegmentLoader {
}
public static void preprocess(File indexDir, IndexLoadingConfig
indexLoadingConfig,
- @Nullable SegmentOperationsThrottler segmentOperationsThrottler)
+ @Nullable SegmentOperationsThrottler segmentOperationsThrottler,
SegmentZKMetadata zkMetadata)
throws Exception {
Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s
does not exist or is not a directory",
indexDir);
@@ -158,7 +172,7 @@ public class ImmutableSegmentLoader {
// Preprocess requires table config and schema
if (indexLoadingConfig.getTableConfig() != null &&
indexLoadingConfig.getSchema() != null) {
preprocessSegment(indexDir, segmentMetadata.getName(),
segmentMetadata.getCrc(), indexLoadingConfig,
- segmentOperationsThrottler);
+ segmentOperationsThrottler, zkMetadata);
}
} finally {
if (segmentOperationsThrottler != null) {
@@ -285,20 +299,22 @@ public class ImmutableSegmentLoader {
return;
}
String segmentName = indexDir.getName();
- LOGGER.info("Segment: {} needs to be converted from version: {} to {}",
segmentName, segmentVersionOnDisk,
- segmentVersionToLoad);
+ LOGGER.info("Segment: {} needs to be converted from version: {} to {}",
segmentName,
+ segmentVersionOnDisk, segmentVersionToLoad);
SegmentFormatConverter converter =
SegmentFormatConverterFactory.getConverter(segmentVersionOnDisk,
segmentVersionToLoad);
LOGGER.info("Using converter: {} to up-convert segment: {}",
converter.getClass().getSimpleName(), segmentName);
converter.convert(indexDir);
- LOGGER.info("Successfully up-converted segment: {} from version: {} to
{}", segmentName, segmentVersionOnDisk,
- segmentVersionToLoad);
+ LOGGER.info("Successfully up-converted segment: {} from version: {} to
{}", segmentName,
+ segmentVersionOnDisk, segmentVersionToLoad);
}
private static void preprocessSegment(File indexDir, String segmentName,
String segmentCrc,
- IndexLoadingConfig indexLoadingConfig, @Nullable
SegmentOperationsThrottler segmentOperationsThrottler)
+ IndexLoadingConfig indexLoadingConfig, @Nullable
SegmentOperationsThrottler segmentOperationsThrottler,
+ SegmentZKMetadata zkMetadata)
throws Exception {
PinotConfiguration segmentDirectoryConfigs =
indexLoadingConfig.getSegmentDirectoryConfigs();
+ Map<String, String> segmentCustomConfigs = zkMetadata != null ?
zkMetadata.getCustomMap() : new HashMap<>();
SegmentDirectoryLoaderContext segmentLoaderContext =
new
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
.setSchema(indexLoadingConfig.getSchema())
@@ -306,6 +322,7 @@ public class ImmutableSegmentLoader {
.setSegmentName(segmentName)
.setSegmentCrc(segmentCrc)
.setSegmentDirectoryConfigs(segmentDirectoryConfigs)
+ .setSegmentCustomConfigs(segmentCustomConfigs)
.build();
SegmentDirectory segmentDirectory =
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(indexDir.toURI(),
segmentLoaderContext);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
index d196d3b4d55..17f8cb6d92d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
@@ -55,7 +55,7 @@ public class DefaultSegmentDirectoryLoader implements
SegmentDirectoryLoader {
if (!directory.exists()) {
return new SegmentLocalFSDirectory(directory);
}
- return new SegmentLocalFSDirectory(directory,
+ return new SegmentLocalFSDirectory(directory, segmentLoaderContext,
ReadMode.valueOf(segmentDirectoryConfigs.getProperty(IndexLoadingConfig.READ_MODE_KEY)));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
index a24aced5e4c..a68271e34c9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
@@ -34,6 +34,7 @@ import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import
org.apache.pinot.segment.spi.index.multicolumntext.MultiColumnTextIndexConstants;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -62,6 +63,7 @@ public class SegmentLocalFSDirectory extends SegmentDirectory
{
private final File _segmentDirectory;
private final SegmentLock _segmentLock;
private final ReadMode _readMode;
+ private final SegmentDirectoryLoaderContext _segmentDirectoryLoaderContext;
private SegmentMetadataImpl _segmentMetadata;
private ColumnIndexDirectory _columnIndexDirectory;
private StarTreeIndexReader _starTreeIndexReader;
@@ -75,15 +77,29 @@ public class SegmentLocalFSDirectory extends
SegmentDirectory {
_segmentDirectory = null;
_segmentLock = new SegmentLock();
_readMode = null;
+ _segmentDirectoryLoaderContext = null;
}
public SegmentLocalFSDirectory(File directory, ReadMode readMode)
throws IOException, ConfigurationException {
- this(directory, new SegmentMetadataImpl(directory), readMode);
+ this(directory, new SegmentMetadataImpl(directory), null, readMode);
+ }
+
+ public SegmentLocalFSDirectory(File directory, @Nullable
SegmentDirectoryLoaderContext segmentDirectoryLoaderContext,
+ ReadMode readMode)
+ throws IOException, ConfigurationException {
+ this(directory, new SegmentMetadataImpl(directory),
segmentDirectoryLoaderContext, readMode);
+ }
+
+ public SegmentLocalFSDirectory(File directory, SegmentMetadataImpl metadata,
ReadMode readMode)
+ throws IOException, ConfigurationException {
+ this(directory, metadata, null, readMode);
}
@VisibleForTesting
- public SegmentLocalFSDirectory(File directoryFile, SegmentMetadataImpl
metadata, ReadMode readMode) {
+ public SegmentLocalFSDirectory(File directoryFile, SegmentMetadataImpl
metadata,
+ @Nullable SegmentDirectoryLoaderContext segmentDirectoryLoaderContext,
ReadMode readMode) {
+ _segmentDirectoryLoaderContext = segmentDirectoryLoaderContext;
Preconditions.checkNotNull(directoryFile);
Preconditions.checkNotNull(metadata);
@@ -260,7 +276,8 @@ public class SegmentLocalFSDirectory extends
SegmentDirectory {
break;
case v3:
try {
- _columnIndexDirectory = new
SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
+ _columnIndexDirectory = new
SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata,
+ _segmentDirectoryLoaderContext, _readMode);
} catch (ConfigurationException e) {
LOGGER.error("Failed to create columnar index directory", e);
throw new RuntimeException(e);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
index 768752315ac..49c303ac431 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
@@ -42,6 +43,7 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexUtils;
@@ -79,6 +81,7 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
private static final int MAX_ALLOCATION_SIZE = 2000 * 1024 * 1024;
private final File _segmentDirectory;
+ private final SegmentDirectoryLoaderContext _segmentDirectoryLoaderContext;
private SegmentMetadataImpl _segmentMetadata;
private final ReadMode _readMode;
private final File _indexFile;
@@ -92,12 +95,18 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory
{
// re-arranges the content in index file to keep it compact.
private boolean _shouldCleanupRemovedIndices;
+ public SingleFileIndexDirectory(File segmentDirectory, SegmentMetadataImpl
segmentMetadata, ReadMode readMode)
+ throws IOException, ConfigurationException {
+ this(segmentDirectory, segmentMetadata, null, readMode);
+ }
+
/**
* @param segmentDirectory File pointing to segment directory
* @param segmentMetadata segment metadata. Metadata must be fully
initialized
* @param readMode mmap vs heap mode
*/
- public SingleFileIndexDirectory(File segmentDirectory, SegmentMetadataImpl
segmentMetadata, ReadMode readMode)
+ public SingleFileIndexDirectory(File segmentDirectory, SegmentMetadataImpl
segmentMetadata,
+ @Nullable SegmentDirectoryLoaderContext segmentDirectoryLoaderContext,
ReadMode readMode)
throws IOException, ConfigurationException {
Preconditions.checkNotNull(segmentDirectory);
Preconditions.checkNotNull(readMode);
@@ -108,6 +117,7 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory
{
Preconditions.checkArgument(segmentDirectory.isDirectory(),
"SegmentDirectory: " + segmentDirectory.toString() + " is not a
directory");
+ _segmentDirectoryLoaderContext = segmentDirectoryLoaderContext;
_segmentDirectory = segmentDirectory;
_segmentMetadata = segmentMetadata;
_readMode = readMode;
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index fe22b3b9914..ea9042abc58 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -435,7 +436,7 @@ public class RealtimeSegmentConverterTest implements
PinotBuffersAfterMethodChec
private void testSegment(List<GenericRow> rows, File indexDir,
TableConfig tableConfig, SegmentMetadataImpl segmentMetadata)
- throws IOException {
+ throws IOException, ConfigurationException {
SegmentLocalFSDirectory segmentDir = new SegmentLocalFSDirectory(indexDir,
segmentMetadata, ReadMode.mmap);
SegmentDirectory.Reader segmentReader = segmentDir.createReader();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
index 312fa5ba17d..5fc6db87132 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java
@@ -38,10 +38,11 @@ public class SegmentDirectoryLoaderContext {
private final String _segmentTier;
private final Map<String, Map<String, String>> _instanceTierConfigs;
private final PinotConfiguration _segmentDirectoryConfigs;
+ private final Map<String, String> _segmentCustomConfigs;
private SegmentDirectoryLoaderContext(TableConfig tableConfig, Schema
schema, String instanceId, String tableDataDir,
String segmentName, String segmentCrc, String segmentTier, Map<String,
Map<String, String>> instanceTierConfigs,
- PinotConfiguration segmentDirectoryConfigs) {
+ PinotConfiguration segmentDirectoryConfigs, Map<String, String>
segmentCustomConfigs) {
_tableConfig = tableConfig;
_schema = schema;
_instanceId = instanceId;
@@ -51,6 +52,7 @@ public class SegmentDirectoryLoaderContext {
_segmentTier = segmentTier;
_instanceTierConfigs = instanceTierConfigs;
_segmentDirectoryConfigs = segmentDirectoryConfigs;
+ _segmentCustomConfigs = segmentCustomConfigs;
}
public TableConfig getTableConfig() {
@@ -89,6 +91,10 @@ public class SegmentDirectoryLoaderContext {
return _instanceTierConfigs;
}
+ public Map<String, String> getSegmentCustomConfigs() {
+ return _segmentCustomConfigs;
+ }
+
public static class Builder {
private TableConfig _tableConfig;
private Schema _schema;
@@ -99,6 +105,7 @@ public class SegmentDirectoryLoaderContext {
private String _segmentTier;
private Map<String, Map<String, String>> _instanceTierConfigs;
private PinotConfiguration _segmentDirectoryConfigs;
+ private Map<String, String> _segmentCustomConfigs;
public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
@@ -145,9 +152,14 @@ public class SegmentDirectoryLoaderContext {
return this;
}
+ public Builder setSegmentCustomConfigs(Map<String, String>
segmentCustomConfigs) {
+ _segmentCustomConfigs = segmentCustomConfigs;
+ return this;
+ }
+
public SegmentDirectoryLoaderContext build() {
return new SegmentDirectoryLoaderContext(_tableConfig, _schema,
_instanceId, _tableDataDir, _segmentName,
- _segmentCrc, _segmentTier, _instanceTierConfigs,
_segmentDirectoryConfigs);
+ _segmentCrc, _segmentTier, _instanceTierConfigs,
_segmentDirectoryConfigs, _segmentCustomConfigs);
}
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
index 3ca3fe87f97..5a35d886221 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java
@@ -23,9 +23,12 @@ import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.pinot.common.auth.AuthProviderUtils;
+import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.spi.auth.AuthProvider;
@@ -79,6 +82,15 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
description = "Table type to upload. Can be OFFLINE or REALTIME")
private TableType _tableType = TableType.OFFLINE;
+ @CommandLine.Option(names = {"-customMetadata"}, required = false, split =
",",
+ description = "Custom metadata to add to segment ZK metadata in
key=value format (e.g. key1=value1,key2=value2)")
+ private String[] _customMetadata = null;
+
+ @CommandLine.Option(names = {"-customMetadataMode"}, required = false,
+ description = "Mode for custom metadata modification. Can be REPLACE or
UPDATE. Default is UPDATE")
+ private SegmentZKMetadataCustomMapModifier.ModifyMode _customMetadataMode =
+ SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE;
+
private AuthProvider _authProvider;
@Override
@@ -145,6 +157,31 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
_tableType = tableType;
}
+ public UploadSegmentCommand setCustomMetadata(String[] customMetadata) {
+ _customMetadata = customMetadata;
+ return this;
+ }
+
+ public UploadSegmentCommand
setCustomMetadataMode(SegmentZKMetadataCustomMapModifier.ModifyMode
customMetadataMode) {
+ _customMetadataMode = customMetadataMode;
+ return this;
+ }
+
+ private Map<String, String> parseCustomMetadata() {
+ if (_customMetadata == null || _customMetadata.length == 0) {
+ return null;
+ }
+ Map<String, String> customMetadataMap = new java.util.HashMap<>();
+ for (String keyValue : _customMetadata) {
+ String[] parts = keyValue.split("=", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid custom metadata format.
Expected key=value, got: " + keyValue);
+ }
+ customMetadataMap.put(parts[0], parts[1]);
+ }
+ return customMetadataMap;
+ }
+
@Override
public boolean execute()
throws Exception {
@@ -182,6 +219,16 @@ public class UploadSegmentCommand extends
AbstractBaseAdminCommand implements Co
AuthProviderUtils.makeAuthHeaders(
AuthProviderUtils.makeAuthProvider(_authProvider,
_authTokenUrl, _authToken, _user, _password));
+ // Add custom metadata header if provided
+ Map<String, String> customMetadataMap = parseCustomMetadata();
+ if (customMetadataMap != null && !customMetadataMap.isEmpty()) {
+ SegmentZKMetadataCustomMapModifier modifier = new
SegmentZKMetadataCustomMapModifier(_customMetadataMode,
+ customMetadataMap);
+ headerList.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+ modifier.toJsonString()));
+ LOGGER.info("Added custom metadata modifier: {}",
modifier.toJsonString());
+ }
+
FileInputStream fileInputStream = new FileInputStream(segmentTarFile);
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(),
fileInputStream, headerList, null, _tableName, _tableType);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]