npawar commented on code in PR #8663:
URL: https://github.com/apache/pinot/pull/8663#discussion_r876412122
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
}
+
+ /**
+ * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to
resume realtime table consumption
+ * @param tableName
+ */
+ public void resumeRealtimeTableConsumption(String tableName) {
+ TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName,
TableType.REALTIME);
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(),
tableConfig,
+ InstancePartitionsType.CONSUMING);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ Collections.singletonMap(InstancePartitionsType.CONSUMING,
instancePartitions);
+ IdealState idealState =
+
HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(),
realtimeTableName);
+ int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+ Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ // Read the smallest offset when a new partition is detected
+ OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+ streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ streamConfig.setOffsetCriteria(originalOffsetCriteria);
+
+ int numPartitionGroups = newPartitionGroupMetadataList.size();
+ Map<Integer, SegmentZKMetadata> partitionToLatestSegment =
getLatestSegmentZKMetadataMap(realtimeTableName);
+ for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ int partitionId = partitionGroupMetadata.getPartitionGroupId();
+ if (partitionToLatestSegment.containsKey(partitionId)
+ &&
idealState.getInstanceStateMap(partitionToLatestSegment.get(partitionId).getSegmentName())
+ .containsValue(SegmentStateModel.CONSUMING)) {
+ // IN_PROGRESS segment already exists, NO-OP
+ LOGGER.info("Skipping partitionGroupId {}, IN_PROGRESS segment already
exists", partitionId);
+ } else {
+ // Step 1: Create PROPERTYSTORE nodes for each partitionId
(IN_PROGRESS)
+ String offset = partitionGroupMetadata.getStartOffset().toString();
+ long newSegmentCreationTimeMs = System.currentTimeMillis();
+
+ SegmentZKMetadata latestSegmentMeta =
partitionToLatestSegment.get(partitionId);
+ int seqNumber = latestSegmentMeta == null ? STARTING_SEQUENCE_NUMBER
+ : new
LLCSegmentName(latestSegmentMeta.getSegmentName()).getSequenceNumber() + 1;
+
+ LLCSegmentName newLLCSegment = new LLCSegmentName(tableName,
partitionId, seqNumber, newSegmentCreationTimeMs);
+ SegmentZKMetadata newSegmentZKMetadata = new
SegmentZKMetadata(newLLCSegment.getSegmentName());
+
newSegmentZKMetadata.setCreationTime(newLLCSegment.getCreationTimeMs());
+ newSegmentZKMetadata.setStartOffset(offset);
+ newSegmentZKMetadata.setNumReplicas(numReplicas);
+
newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ SegmentPartitionMetadata segmentPartitionMetadata =
+ getPartitionMetadataFromTableConfig(tableConfig, partitionId);
+ if (segmentPartitionMetadata != null) {
+ newSegmentZKMetadata.setPartitionMetadata(segmentPartitionMetadata);
+ }
+
+ FlushThresholdUpdater flushThresholdUpdater =
_flushThresholdUpdateManager
+ .getFlushThresholdUpdater(streamConfig);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null, offset, 0);
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
+ newSegmentZKMetadata, committingSegmentDescriptor, null,
+ getMaxNumPartitionsPerInstance(instancePartitions,
numPartitionGroups, numReplicas),
+ newPartitionGroupMetadataList);
+
+ _helixResourceManager.getPropertyStore().set(
Review Comment:
synchronization is still needed for same controller yes.. but the other case
is more concerning. for instance, on the 2 controllers (1 via periodic task
thread other via resumeConsumption thread), we could enter
setupNewPartitionGroup method at once, and end up with 2 zk metadata and 2
CONSUMING segments. Many more such cases could come up.
Wondering now, if adding a back door entry to this method from any
controller is a good idea. This won't happen if only periodic task is allowed
to execute that method (be it scheduled or manual). So 1) should this flag just
be part of manual periodic task trigger options and then 2) this API internally
just invokes the periodic task
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]