This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cedb6c66c22 [bugfix] Fix per-stream partition count in segment
metadata for consuming segments in multi-stream tables (#18401)
cedb6c66c22 is described below
commit cedb6c66c22f42648db3240121eaaf348bca86d6
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue May 5 18:19:47 2026 -0700
[bugfix] Fix per-stream partition count in segment metadata for consuming
segments in multi-stream tables (#18401)
* [bugfix] Fix per-stream partition count in segment metadata for consuming
segments in multi-stream tables
* Fixed unnecessary change
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 29 ++++++++---
.../PinotLLCRealtimeSegmentManagerTest.java | 58 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 8 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9894de6aa82..a81a9e16112 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1135,7 +1135,8 @@ public class PinotLLCRealtimeSegmentManager implements
PinotClusterConfigChangeL
}
@Nullable
- private SegmentPartitionMetadata
getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId,
+ @VisibleForTesting
+ SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig
tableConfig, int partitionId,
int numPartitionGroups) {
SegmentPartitionConfig partitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (partitionConfig == null) {
@@ -1145,18 +1146,30 @@ public class PinotLLCRealtimeSegmentManager implements
PinotClusterConfigChangeL
if (columnPartitionMap.size() == 1) {
Map.Entry<String, ColumnPartitionConfig> entry =
columnPartitionMap.entrySet().iterator().next();
ColumnPartitionConfig columnPartitionConfig = entry.getValue();
- if (numPartitionGroups != columnPartitionConfig.getNumPartitions()) {
- LOGGER.warn("Number of partition groups fetched from the stream '{}'
is different than "
- + "columnPartitionConfig.numPartitions '{}' in the table
config. The stream partition count is used. "
- + "Please update the table config accordingly.",
numPartitionGroups,
- columnPartitionConfig.getNumPartitions());
- }
// For multi-stream tables, convert Pinot partition ID (which includes
padding offset) to stream partition ID.
// This ensures the partition metadata stored in ZK matches what the
broker's partition function computes
// during query pruning. For example, stream 1 partition 5 has Pinot
partition ID 10005, but should store 5.
int streamPartitionId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(tableConfig,
partitionId);
+ // If there are multiple streams, we assume the partition groups are
evenly distributed across the streams, and
+ // compute the per-stream partition group count accordingly.
+ // This is needed for the partition function to correctly compute the
stream partition id for pruning.
+ int numStreams =
IngestionConfigUtils.getStreamConfigs(tableConfig).size();
+ if (numStreams > 1 && numPartitionGroups % numStreams != 0) {
+ LOGGER.warn("Number of partition groups '{}' is not divisible by
number of streams '{}'. This might lead to "
+ + "incorrect pruning if the partition function is based on
stream partition id. Please update the "
+ + "table config to ensure the partition groups are evenly
distributed across the streams.",
+ numPartitionGroups, numStreams);
+ return null;
+ }
+ int perStreamNumPartitions = numStreams > 1 ? numPartitionGroups /
numStreams : numPartitionGroups;
+ if (perStreamNumPartitions != columnPartitionConfig.getNumPartitions()) {
+ LOGGER.warn("Number of partitions per stream '{}' is different than "
+ + "columnPartitionConfig.numPartitions '{}' in the table
config. "
+ + "The stream partition count is used. Please update the table
config accordingly.",
+ perStreamNumPartitions, columnPartitionConfig.getNumPartitions());
+ }
ColumnPartitionMetadata columnPartitionMetadata =
- new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(),
numPartitionGroups,
+ new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(),
perStreamNumPartitions,
Collections.singleton(streamPartitionId),
columnPartitionConfig.getFunctionConfig());
return new
SegmentPartitionMetadata(Collections.singletonMap(entry.getKey(),
columnPartitionMetadata));
} else {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 58ec7039c67..e84a6d357cb 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -58,6 +58,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.BatchConfig;
@@ -74,12 +75,17 @@ import
org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
import org.apache.pinot.spi.config.table.PauseState;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -2319,6 +2325,58 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertEquals(segmentManager.getMaxSegmentCompletionTimeMillis(), 600_000L);
}
+ @Test
+ public void testGetPartitionMetadataFromTableConfig() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ Map<String, String> singleStreamConfigMap =
+
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+ SegmentPartitionConfig partitionConfig = new SegmentPartitionConfig(
+ Collections.singletonMap("col", new ColumnPartitionConfig("Modulo", 4,
null)));
+
+ // No SegmentPartitionConfig → null
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setStreamConfigs(singleStreamConfigMap).build();
+ assertNull(segmentManager.getPartitionMetadataFromTableConfig(tableConfig,
2, 4));
+
+ // Single-stream: perStreamNumPartitions = numPartitionGroups
+ tableConfig.getIndexingConfig().setSegmentPartitionConfig(partitionConfig);
+ SegmentPartitionMetadata metadata =
segmentManager.getPartitionMetadataFromTableConfig(tableConfig, 2, 4);
+ assertNotNull(metadata);
+ ColumnPartitionMetadata colMetadata =
metadata.getColumnPartitionMap().get("col");
+ assertEquals(colMetadata.getNumPartitions(), 4);
+ assertEquals(colMetadata.getPartitions(), Collections.singleton(2));
+
+ // Multi-stream, even distribution: perStreamNumPartitions =
numPartitionGroups / numStreams
+ // 2 streams × 4 partitions each = 8 total partition groups
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(
+ new StreamIngestionConfig(Arrays.asList(singleStreamConfigMap,
singleStreamConfigMap)));
+ TableConfig multiStreamTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
+ .build();
+
multiStreamTableConfig.getIndexingConfig().setSegmentPartitionConfig(partitionConfig);
+
+ // Stream 0, partition 2: Pinot partition ID = 0 * 10000 + 2 = 2
+ metadata =
segmentManager.getPartitionMetadataFromTableConfig(multiStreamTableConfig, 2,
8);
+ assertNotNull(metadata);
+ colMetadata = metadata.getColumnPartitionMap().get("col");
+ assertEquals(colMetadata.getNumPartitions(), 4,
+ "Multi-stream partition count must be per-stream (numPartitionGroups /
numStreams), not total");
+ assertEquals(colMetadata.getPartitions(), Collections.singleton(2));
+
+ // Stream 1, partition 3: Pinot partition ID = 1 * 10000 + 3 = 10003
+ int stream1Partition3 = IngestionConfigUtils.PARTITION_PADDING_OFFSET + 3;
+ metadata =
segmentManager.getPartitionMetadataFromTableConfig(multiStreamTableConfig,
stream1Partition3, 8);
+ assertNotNull(metadata);
+ colMetadata = metadata.getColumnPartitionMap().get("col");
+ assertEquals(colMetadata.getNumPartitions(), 4);
+ assertEquals(colMetadata.getPartitions(), Collections.singleton(3));
+
+ // Multi-stream, uneven distribution: numPartitionGroups not divisible by
numStreams → null
+
assertNull(segmentManager.getPartitionMetadataFromTableConfig(multiStreamTableConfig,
0, 7),
+ "Uneven partition distribution across streams must return null");
+ }
+
//////////////////////////////////////////////////////////////////////////////////
// Fake classes
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]