This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fd39662b083 Fix auto reset and optimize multi-topic ingestion (#16833)
fd39662b083 is described below
commit fd39662b0832d07b6748add2e38f2026ba6e3077
Author: lnbest0707 <[email protected]>
AuthorDate: Wed Sep 17 08:31:57 2025 -0700
Fix auto reset and optimize multi-topic ingestion (#16833)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 44 +++++++++++++---------
.../apache/pinot/spi/config/table/PauseState.java | 7 +++-
2 files changed, 31 insertions(+), 20 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e956d0bf5b3..beadd59f089 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -390,8 +390,10 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ int streamConfigIdx =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+ partitionGroupMetadata.getPartitionGroupId());
String segmentName =
- setupNewPartitionGroup(tableConfig, streamConfigs.get(0),
partitionGroupMetadata, currentTimeMs,
+ setupNewPartitionGroup(tableConfig,
streamConfigs.get(streamConfigIdx), partitionGroupMetadata, currentTimeMs,
instancePartitions, numPartitionGroups, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
segmentName, segmentAssignment,
instancePartitionsMap);
@@ -785,6 +787,8 @@ public class PinotLLCRealtimeSegmentManager {
if (!isTablePaused(idealState) && !isTopicPaused(idealState,
committingSegmentName)) {
LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
+ int streamConfigIdx =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+ committingSegmentPartitionGroupId);
List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
@@ -795,9 +799,9 @@ public class PinotLLCRealtimeSegmentManager {
LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0),
newLLCSegment, newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
- numReplicas);
+ createNewSegmentZKMetadata(tableConfig,
streamConfigs.get(streamConfigIdx), newLLCSegment,
+ newSegmentCreationTimeMs, committingSegmentDescriptor,
committingSegmentZKMetadata, instancePartitions,
+ partitionIds.size(), numReplicas);
newConsumingSegmentName = newLLCSegment.getSegmentName();
LOGGER.info("Created new segment metadata for segment: {} with status:
{}.", newConsumingSegmentName,
Status.IN_PROGRESS);
@@ -989,6 +993,7 @@ public class PinotLLCRealtimeSegmentManager {
timeThreshold, offsetThreshold);
return nextOffset;
}
+ int streamPartitionId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionId);
String clientId = getTableTopicUniqueClientId(streamConfig);
StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
StreamPartitionMsgOffsetFactory offsetFactory =
consumerFactory.createStreamMsgOffsetFactory();
@@ -996,7 +1001,7 @@ public class PinotLLCRealtimeSegmentManager {
StreamPartitionMsgOffset offsetAtSLA = null;
StreamPartitionMsgOffset latestOffset;
try (StreamMetadataProvider metadataProvider =
consumerFactory.createPartitionMetadataProvider(clientId,
- partitionId)) {
+ streamPartitionId)) {
// Fetching timestamp from an offset is an expensive operation which
requires reading the data,
// while fetching offset from timestamp is lightweight and only needs to
read metadata.
// Hence, instead of checking if latestOffset's time - nextOffset's time
< SLA, we would check
@@ -1005,13 +1010,13 @@ public class PinotLLCRealtimeSegmentManager {
// get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
latestOffset =
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
STREAM_FETCH_TIMEOUT_MS);
- LOGGER.info("Latest offset of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
+ LOGGER.info("Latest offset of topic {} and partition {} is {}",
streamConfig.getTopicName(), streamPartitionId,
latestOffset);
if (timeThreshold > 0) {
offsetAtSLA =
- metadataProvider.getOffsetAtTimestamp(partitionId,
System.currentTimeMillis() - timeThreshold * 1000,
+ metadataProvider.getOffsetAtTimestamp(streamPartitionId,
System.currentTimeMillis() - timeThreshold * 1000,
STREAM_FETCH_TIMEOUT_MS);
- LOGGER.info("Offset at SLA of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
+ LOGGER.info("Offset at SLA of topic {} and partition {} is {}",
streamConfig.getTopicName(), streamPartitionId,
offsetAtSLA);
}
} catch (Exception e) {
@@ -1021,13 +1026,13 @@ public class PinotLLCRealtimeSegmentManager {
try {
if (timeThreshold > 0 && offsetAtSLA != null &&
offsetAtSLA.compareTo(nextOffsetWithType) > 0) {
LOGGER.info("Auto reset offset from {} to {} on partition {} because
time threshold reached", nextOffset,
- latestOffset, partitionId);
+ latestOffset, streamPartitionId);
return latestOffset.toString();
}
if (offsetThreshold > 0
&& Long.parseLong(latestOffset.toString()) -
Long.parseLong(nextOffset) > offsetThreshold) {
LOGGER.info("Auto reset offset from {} to {} on partition {} because
number of offsets threshold reached",
- nextOffset, latestOffset, partitionId);
+ nextOffset, latestOffset, streamPartitionId);
return latestOffset.toString();
}
} catch (Exception e) {
@@ -1660,6 +1665,7 @@ public class PinotLLCRealtimeSegmentManager {
SegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
LLCSegmentName latestLLCSegmentName = new
LLCSegmentName(latestSegmentName);
+ int streamConfigIdx =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
Map<String, String> instanceStateMap =
instanceStatesMap.get(latestSegmentName);
if (instanceStateMap != null) {
@@ -1683,7 +1689,8 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegmentName,
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
- createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0),
newLLCSegmentName, currentTimeMs,
+ createNewSegmentZKMetadata(tableConfig,
streamConfigs.get(streamConfigIdx), newLLCSegmentName,
+ currentTimeMs,
committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, newSegmentName,
segmentAssignment, instancePartitionsMap);
@@ -1733,16 +1740,16 @@ public class PinotLLCRealtimeSegmentManager {
selectStartOffset(offsetCriteria, partitionId,
partitionIdToStartOffset, partitionIdToSmallestOffset,
tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getStartOffset()); // segments are
OFFLINE; start from beginning
- createNewConsumingSegment(tableConfig, streamConfigs.get(0),
latestSegmentZKMetadata, currentTimeMs,
- partitionGroupMetadataList, instancePartitions,
instanceStatesMap, segmentAssignment,
+ createNewConsumingSegment(tableConfig,
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
+ currentTimeMs, partitionGroupMetadataList, instancePartitions,
instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
} else {
LOGGER.info("Resuming consumption for partition: {} of table: {}",
partitionId, realtimeTableName);
StreamPartitionMsgOffset startOffset =
selectStartOffset(offsetCriteria, partitionId,
partitionIdToStartOffset, partitionIdToSmallestOffset,
tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getEndOffset());
- createNewConsumingSegment(tableConfig, streamConfigs.get(0),
latestSegmentZKMetadata, currentTimeMs,
- partitionGroupMetadataList, instancePartitions,
instanceStatesMap, segmentAssignment,
+ createNewConsumingSegment(tableConfig,
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
+ currentTimeMs, partitionGroupMetadataList, instancePartitions,
instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
}
}
@@ -1786,10 +1793,11 @@ public class PinotLLCRealtimeSegmentManager {
// Set up new partitions if not exist
for (PartitionGroupMetadata partitionGroupMetadata :
partitionGroupMetadataList) {
int partitionId = partitionGroupMetadata.getPartitionGroupId();
+ int streamConfigIdx =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
String newSegmentName =
- setupNewPartitionGroup(tableConfig, streamConfigs.get(0),
partitionGroupMetadata, currentTimeMs,
- instancePartitions, numPartitions, numReplicas);
+ setupNewPartitionGroup(tableConfig,
streamConfigs.get(streamConfigIdx), partitionGroupMetadata,
+ currentTimeMs, instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
newSegmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -2541,7 +2549,7 @@ public class PinotLLCRealtimeSegmentManager {
Set<String> consumingSegments = new TreeSet<>();
idealState.getRecord().getMapFields().forEach((segmentName,
instanceToStateMap) -> {
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
- if (llcSegmentName != null && topicIndices.contains(
+ if (llcSegmentName != null && !topicIndices.contains(
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId())))
{
return;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
index 50ae0404854..18af6c329b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
@@ -29,7 +29,7 @@ public class PauseState extends BaseJsonConfig {
private String _comment;
private String _timestamp;
// List of inactive topic indices. Index is the index of the topic in the
streamConfigMaps.
- private List<Integer> _indexOfInactiveTopics;
+ private List<Integer> _indexOfInactiveTopics = new ArrayList<>();
public PauseState() {
}
@@ -80,7 +80,10 @@ public class PauseState extends BaseJsonConfig {
}
public void setIndexOfInactiveTopics(List<Integer> indexOfInactiveTopics) {
- _indexOfInactiveTopics = indexOfInactiveTopics == null ? new ArrayList<>()
: indexOfInactiveTopics;
+ _indexOfInactiveTopics.clear();
+ if (indexOfInactiveTopics != null) {
+ _indexOfInactiveTopics.addAll(indexOfInactiveTopics);
+ }
}
public enum ReasonCode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]