This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 40b6dbe760e7ceb9cde8bad1af7180446b51bb85 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Thu Jan 7 17:42:23 2021 -0800 Remove new partition groups creation in commit --- .../realtime/PinotLLCRealtimeSegmentManager.java | 57 +++++++++------------- .../realtime/LLRealtimeSegmentDataManager.java | 3 +- 2 files changed, 23 insertions(+), 37 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 9fa6850..9a0786b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -469,6 +469,8 @@ public class PinotLLCRealtimeSegmentManager { private void commitSegmentMetadataInternal(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { String committingSegmentName = committingSegmentDescriptor.getSegmentName(); + LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName); + int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId(); LOGGER.info("Committing segment metadata for segment: {}", committingSegmentName); TableConfig tableConfig = getTableConfig(realtimeTableName); @@ -495,51 +497,40 @@ public class PinotLLCRealtimeSegmentManager { // Step-2 - // Say we currently were consuming from 2 shards A, B. Of those, A is the one committing. + // Example: Say we currently were consuming from 2 shards A, B. Of those, A is the one committing. - // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS] + // Get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS] List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); - // find new partition groups [A],[B],[C],[D] (assume A split into C D) + // Find new partition groups [A],[B],[C],[D] (assume A split into C D) // If segment has consumed all of A, we will receive B,C,D // If segment is still not reached last msg of A, we will receive A,B,C,D + // If there were no splits/merges we would receive A,B List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); int numPartitions = newPartitionGroupInfoList.size(); - // create new segment metadata, only if PartitionGroupInfo was returned for it in the newPartitionGroupInfoList - Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect( - Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); - - List<String> newConsumingSegmentNames = new ArrayList<>(); + // Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata + String newConsumingSegmentName = null; String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) { - int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId(); - PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId); - if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found. - // make new segment - // fixme: letting validation manager do this would be best, otherwise we risk creating multiple CONSUMING segments - String newLLCSegmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs, - instancePartitions, numPartitions, numReplicas); - newConsumingSegmentNames.add(newLLCSegmentName); - } else { - LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName); - // Update this only for committing segment. All other partitions should get updated by their own commit call - if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId()) { - Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString())); - LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId, - currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs, - committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); - newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName()); - } + if (partitionGroupInfo.getPartitionGroupId() == committingSegmentPartitionGroupId) { + LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, + committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); + createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, + committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); + newConsumingSegmentName = newLLCSegment.getSegmentName(); + break; } } + // TODO: create new partition groups also here + // Cannot do it at the moment, because of the timestamp suffix on the segment name. + // Different committing segments could create a CONSUMING segment for same new partitionGroup, with different name + // Step-3 SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = @@ -554,7 +545,7 @@ public class PinotLLCRealtimeSegmentManager { Lock lock = _idealStateUpdateLocks[lockIndex]; try { lock.lock(); - updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentNames, + updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName, segmentAssignment, instancePartitionsMap); } finally { lock.unlock(); @@ -846,7 +837,7 @@ public class PinotLLCRealtimeSegmentManager { */ @VisibleForTesting void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, - List<String> newSegmentNames, SegmentAssignment segmentAssignment, + String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; @@ -863,11 +854,7 @@ public class PinotLLCRealtimeSegmentManager { "Exceeded max segment completion time for segment " + committingSegmentName); } updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, - null, segmentAssignment, instancePartitionsMap); - for (String newSegmentName : newSegmentNames) { - updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), null, - newSegmentName, segmentAssignment, instancePartitionsMap); - } + newSegmentName, segmentAssignment, instancePartitionsMap); return idealState; }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index c889193..bc49830 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -308,11 +308,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT; return true; } else if (_endOfPartitionGroup) { + // FIXME: handle numDocsIndexed == 0 case segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP; - // fixme: what happens if reached endOfPartitionGroup but numDocsIndexed == 0 - // If we decide to only setupNewPartitions via ValidationManager, we don't need commit on endOfShard return true; } return false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org