This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 40b6dbe760e7ceb9cde8bad1af7180446b51bb85
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Thu Jan 7 17:42:23 2021 -0800

    Remove new partition groups creation in commit
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 57 +++++++++-------------
 .../realtime/LLRealtimeSegmentDataManager.java     |  3 +-
 2 files changed, 23 insertions(+), 37 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9fa6850..9a0786b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -469,6 +469,8 @@ public class PinotLLCRealtimeSegmentManager {
   private void commitSegmentMetadataInternal(String realtimeTableName,
       CommittingSegmentDescriptor committingSegmentDescriptor) {
     String committingSegmentName = 
committingSegmentDescriptor.getSegmentName();
+    LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
+    int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
     LOGGER.info("Committing segment metadata for segment: {}", 
committingSegmentName);
 
     TableConfig tableConfig = getTableConfig(realtimeTableName);
@@ -495,51 +497,40 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Step-2
 
-    // Say we currently were consuming from 2 shards A, B. Of those, A is the 
one committing.
+    // Example: Say we currently were consuming from 2 shards A, B. Of those, 
A is the one committing.
 
-    // get current partition groups - this gives current state of latest 
segments for each partition [A - DONE], [B - IN_PROGRESS]
+    // Get current partition groups - this gives current state of latest 
segments for each partition [A - DONE], [B - IN_PROGRESS]
     List<PartitionGroupMetadata> currentPartitionGroupMetadataList = 
getCurrentPartitionGroupMetadataList(idealState);
     PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
         IngestionConfigUtils.getStreamConfigMap(tableConfig));
 
-    // find new partition groups [A],[B],[C],[D] (assume A split into C D)
+    // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
     // If segment has consumed all of A, we will receive B,C,D
     // If segment is still not reached last msg of A, we will receive A,B,C,D
+    // If there were no splits/merges we would receive A,B
     List<PartitionGroupInfo> newPartitionGroupInfoList =
         getPartitionGroupInfoList(streamConfig, 
currentPartitionGroupMetadataList);
     int numPartitions = newPartitionGroupInfoList.size();
 
-    // create new segment metadata, only if PartitionGroupInfo was returned 
for it in the newPartitionGroupInfoList
-    Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = 
currentPartitionGroupMetadataList.stream().collect(
-        Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
-
-    List<String> newConsumingSegmentNames = new ArrayList<>();
+    // Only if committingSegment's partitionGroup is present in the 
newPartitionGroupInfoList, we create new segment metadata
+    String newConsumingSegmentName = null;
     String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
     for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
-      int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
-      PartitionGroupMetadata currentPartitionGroupMetadata = 
currentGroupIdToMetadata.get(newPartitionGroupId);
-      if (currentPartitionGroupMetadata == null) { // not present in current 
state. New partition found.
-        // make new segment
-        // fixme: letting validation manager do this would be best, otherwise 
we risk creating multiple CONSUMING segments
-        String newLLCSegmentName =
-            setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupInfo, newSegmentCreationTimeMs,
-                instancePartitions, numPartitions, numReplicas);
-        newConsumingSegmentNames.add(newLLCSegmentName);
-      } else {
-        LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
-        // Update this only for committing segment. All other partitions 
should get updated by their own commit call
-        if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId()) 
{
-          
Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString()));
-          LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, 
newPartitionGroupId,
-              currentPartitionGroupMetadata.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
-          createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, newSegmentCreationTimeMs,
-              committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
-          newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
-        }
+      if (partitionGroupInfo.getPartitionGroupId() == 
committingSegmentPartitionGroupId) {
+        LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
+            committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
+        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
+            committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
+        newConsumingSegmentName = newLLCSegment.getSegmentName();
+        break;
       }
     }
 
+    // TODO: create new partition groups also here
+    //  Cannot do it at the moment, because of the timestamp suffix on the 
segment name.
+    //  Different committing segments could create a CONSUMING segment for 
same new partitionGroup, with different name
+
     // Step-3
     SegmentAssignment segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -554,7 +545,7 @@ public class PinotLLCRealtimeSegmentManager {
     Lock lock = _idealStateUpdateLocks[lockIndex];
     try {
       lock.lock();
-      updateIdealStateOnSegmentCompletion(realtimeTableName, 
committingSegmentName, newConsumingSegmentNames,
+      updateIdealStateOnSegmentCompletion(realtimeTableName, 
committingSegmentName, newConsumingSegmentName,
           segmentAssignment, instancePartitionsMap);
     } finally {
       lock.unlock();
@@ -846,7 +837,7 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   void updateIdealStateOnSegmentCompletion(String realtimeTableName, String 
committingSegmentName,
-      List<String> newSegmentNames, SegmentAssignment segmentAssignment,
+      String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
     HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState 
-> {
       assert idealState != null;
@@ -863,11 +854,7 @@ public class PinotLLCRealtimeSegmentManager {
             "Exceeded max segment completion time for segment " + 
committingSegmentName);
       }
       
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 committingSegmentName,
-          null, segmentAssignment, instancePartitionsMap);
-      for (String newSegmentName : newSegmentNames) {
-        
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 null,
-            newSegmentName, segmentAssignment, instancePartitionsMap);
-      }
+          newSegmentName, segmentAssignment, instancePartitionsMap);
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index c889193..bc49830 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -308,11 +308,10 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
         } else if (_endOfPartitionGroup) {
+          // FIXME: handle numDocsIndexed == 0 case
           segmentLogger.info("Stopping consumption due to end of 
partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = 
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
-          // fixme: what happens if reached endOfPartitionGroup but 
numDocsIndexed == 0
-          //  If we decide to only setupNewPartitions via ValidationManager, 
we don't need commit on endOfShard
           return true;
         }
         return false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to