This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8afc48f8752c9044af791279e54568f4a124759e Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Thu Dec 31 15:49:33 2020 -0800 Checnges in test to make it complie --- .../controller/helix/core/PinotHelixResourceManager.java | 4 ++-- .../core/realtime/PinotLLCRealtimeSegmentManager.java | 2 +- .../core/realtime/PinotLLCRealtimeSegmentManagerTest.java | 15 +++++++++++---- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index c86f14c..b2949e7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1356,7 +1356,7 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName); } else { @@ -1385,7 +1385,7 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); } else { LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 0654a38..a6ef625 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -207,7 +207,7 @@ public class PinotLLCRealtimeSegmentManager { /** * Sets up the realtime table ideal state for a table of consumer type SHARDED */ - public void setupNewTable(TableConfig tableConfig, IdealState idealState) { + public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 42bdedc..75c8057 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -907,15 +907,22 @@ public class PinotLLCRealtimeSegmentManagerTest { @Override void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, - String newSegmentName, SegmentAssignment segmentAssignment, + List<String> newSegmentNames, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, - newSegmentName, segmentAssignment, instancePartitionsMap); + null, segmentAssignment, instancePartitionsMap); + for (String segmentName : newSegmentNames) { + updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, + segmentName, segmentAssignment, instancePartitionsMap); + } } @Override - List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { - return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList()); + List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, + List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, + getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, i).toString())) + .collect(Collectors.toList()); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org