This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8afc48f8752c9044af791279e54568f4a124759e
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Thu Dec 31 15:49:33 2020 -0800

    Checnges in test to make it complie
---
 .../controller/helix/core/PinotHelixResourceManager.java  |  4 ++--
 .../core/realtime/PinotLLCRealtimeSegmentManager.java     |  2 +-
 .../core/realtime/PinotLLCRealtimeSegmentManagerTest.java | 15 +++++++++++----
 3 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index c86f14c..b2949e7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1356,7 +1356,7 @@ public class PinotHelixResourceManager {
       idealState = PinotTableIdealStateBuilder
           .buildLowLevelRealtimeIdealStateFor(realtimeTableName, 
realtimeTableConfig, idealState,
               _enableBatchMessageMode);
-      _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, 
idealState);
+      _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, 
idealState);
       LOGGER.info("Successfully setup table for SHARDED consumers for {} ", 
realtimeTableName);
     } else {
 
@@ -1385,7 +1385,7 @@ public class PinotHelixResourceManager {
           idealState = PinotTableIdealStateBuilder
               .buildLowLevelRealtimeIdealStateFor(realtimeTableName, 
realtimeTableConfig, idealState,
                   _enableBatchMessageMode);
-          _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, 
idealState);
+          _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, 
idealState);
           LOGGER.info("Successfully added Helix entries for low-level 
consumers for {} ", realtimeTableName);
         } else {
           LOGGER.info("LLC is already set up for table {}, not configuring 
again", realtimeTableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 0654a38..a6ef625 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -207,7 +207,7 @@ public class PinotLLCRealtimeSegmentManager {
   /**
    * Sets up the realtime table ideal state for a table of consumer type 
SHARDED
    */
-  public void setupNewTable(TableConfig tableConfig, IdealState idealState) {
+  public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 42bdedc..75c8057 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -907,15 +907,22 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     @Override
     void updateIdealStateOnSegmentCompletion(String realtimeTableName, String 
committingSegmentName,
-        String newSegmentName, SegmentAssignment segmentAssignment,
+        List<String> newSegmentNames, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) 
{
       
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
 committingSegmentName,
-          newSegmentName, segmentAssignment, instancePartitionsMap);
+          null, segmentAssignment, instancePartitionsMap);
+      for (String segmentName : newSegmentNames) {
+        
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
 null,
+            segmentName, segmentAssignment, instancePartitionsMap);
+      }
     }
 
     @Override
-    List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig 
streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
-      return IntStream.range(0, 
_numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList());
+    List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig 
streamConfig,
+        List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+      return IntStream.range(0, _numPartitions).mapToObj(i -> new 
PartitionGroupInfo(i,
+          getPartitionOffset(streamConfig, 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA, i).toString()))
+          .collect(Collectors.toList());
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to